Kotlin

스케줄러를 통한 멀티스레딩, subscribeOn, observeOn

스케줄러(Scheduler)

스케줄러는 Reactive X에서 멀티스레딩을 수행할 수 있도록 스레드 풀을 생성하고 다른 스레드에서 실행할 수 있게 해준다.

 

Reactive X에서는 미리 정의된 스케줄러를 제공하며, 아래와 같은 종류와 사용 용도를 가진다.

 

Schedulers.io() : * I/O 관련 작업을 수행할 수 있는 무제한의 워커 스레드. 무제한으로 워커 스레드를 생성할 수 있어 OutOfMemory를 야기할 수 있으니 주의하여 용도에 맞게 사용하여야 한다.

* I/O 관련 작업 : 파일 시스템, DB 등의 작업

 

Schedulers.computation() : CPU의 코어 수와 동일한 수의 스레드를 가지는 스레드 풀을 제공한다. CPU 코어를 사용하는 복잡한 계산 작업에 사용하는 스케줄러이다. CPU 코어를 사용 상태로 두기 때문에 앱의 속도를 저하시킬 수 있으므로 CPU를 사용하는 계산작업이 아닌 경우 해당 스케줄러를 사용하지 않는 것이 좋다.

 

Schedulers.newThread() : io와 비슷하게 각 작업마다 새로운 스레드를 생성한다. 하지만 io의 경우는 스레드 풀의 각 스레드마다 유휴 스레드가 존재하는지 여부를 판단 후 기존 스레드를 사용할 수 없으면 새로운 스레드를 생성함에 반해 newThread 스케줄러의 경우는 스레드 풀의 유휴 여부와 관계없이 새로운 작업마다 항상 새로운 스레드를 생성하므로 cost 낭비가 존재한다.

웬만해선 상황에 따라 computation 또는 io 스케줄러를 사용할 수 있도록 하는 것이 좋다.

 

Schedulers.single() : 하나의 스레드만 포함하는 스케줄러로, single을 호출하는 모든 상황에서 동일한 스레드를 제공한다. 순차적인 작업이 보장되어야 할 때 사용한다.

 

Schedulers.trampoline() : single과 마찬가지로 순차적인 작업이 보장되어야 할 때 사용한다.

 

 

+ single과 trampoline의 차이점

위 정의에 따르면 single과 trampoline의 차이는 느껴지지 않는다.

그러나 single은 호출된 스레드와 병렬적으로 처리될 수 있고, trampoline은 호출된 스레드와도 순차적으로 처리된다.

코드와 결과를 보며 비교해본다.

 

fun main() {
    val observable = Observable.range(1, 3)
    observable
        .subscribeOn(Schedulers.single())
        .subscribe {
            runBlocking { delay(100) }
            println("onNext(single) = $it")
        }

    for(i in 0..3) {
        runBlocking { delay(100) }
        println("main Thread = $i")
    }

    runBlocking {
        delay(3000)
    }
}

//onNext(single) = 1
//main Thread = 0
//main Thread = 1
//onNext(single) = 2
//main Thread = 2
//onNext(single) = 3
//main Thread = 3

 

single의 경우 메인 스레드의 작업과 병렬적으로 처리된다.

 

 

fun main() {
    val observable = Observable.range(1, 3)
    observable
        .subscribeOn(Schedulers.trampoline())
        .subscribe {
            runBlocking { delay(100) }
            println("onNext(trampoline) = $it")
        }

    for(i in 0..3) {
        runBlocking { delay(100) }
        println("main Thread = $i")
    }

    runBlocking {
        delay(3000)
    }
}

//onNext(trampoline) = 1
//onNext(trampoline) = 2
//onNext(trampoline) = 3
//main Thread = 0
//main Thread = 1
//main Thread = 2
//main Thread = 3

 

trampoline의 경우 호출 순서에 따라 스레드 간 작업 역시 순차적으로 처리된다.

 

 

이러한 스케줄러를 어떻게 적용할 수 있을까? subscribeOn과 observeOn 연산자를 이용할 수 있다. 

 

스케줄러의 사용
1. subscribeOn

 

기본적으로 observable은 다음 결과에서 볼 수 있듯이 메인 스레드에서 모든 연산과 구독을 처리한다.

 

fun main() {
    val observable = Observable.range(1, 3)
    observable
        .map {
            println("map Thread - ${Thread.currentThread().name}")
        }
        .subscribe {
            println("onNext Thread - ${Thread.currentThread().name}")
        }
}

//map Thread - main
//onNext Thread - main
//map Thread - main
//onNext Thread - main
//map Thread - main
//onNext Thread - main

 

subscribeOn 연산자는 구독(subscribe) 스레드를 변경한다.

즉, observable이 값을 발행할 때의 스레드를 변경한다.

이후 설명할 observeOn을 통해 스레드를 변경해주지 않으면 모든 구독과 연산은 해당 subscribeOn에서 지정한 스레드로 실행된다.

 

fun main() {

    val observable = Observable.create<String> {
        it.onNext("onNext - ${Thread.currentThread().name}")
    }

    observable
        .map {
            println("map - ${Thread.currentThread().name}")
            return@map it
        }
        .subscribeOn(Schedulers.computation())
        .subscribe {
        println(it)
    }

    runBlocking {
        delay(1000)
    }
}

//map - RxComputationThreadPool-1
//onNext - RxComputationThreadPool-1

 

subscribeOn을 통해 computation thread로 설정한다. onNext (구독)와 map (연산) 모두 subscribeOn에서 설정한 스케줄러를 통해 수행된다.

 

해당 subscribeOn 연산자는 최초에 설정한 Scheduler만이 유효하며, 이후 중복되게 subscribeOn으로 다른 Scheduler를 설정해도 스레드가 바뀌지 않는다. 또한 subscribeOn을 호출하는 위치에 관계없이 구독 시점의 스레드가 변경된다.

 

구독 스레드와 연산을 수행하는 스레드를 다르게 가져가고 싶으면 어떻게 할까? 

observeOn을 사용한다.

 

 

스케줄러의 사용
2. observeOn

 

한 번 설정하면 구독 스레드를 변경 할 수 없는 subscribeOn을 도와주기 위한 친구로 observeOn이 존재한다.

observeOn은 해당 연산자가 호출 된 이후의 동작이 수행될 스레드를 지정한다.

중복으로 사용할 수 있으며, 사용 할 때마다 다음 동작의 스레드가 변경된다.

호출 된 위치의 다음 동작의 스레드가 변경되므로, 호출 위치에 따라 스레드의 변경이 달라진다.

 

fun main() {

    val observable = Observable.create<String> {
        it.onNext("onNext - ${Thread.currentThread().name}")
    }

    observable
        .observeOn(Schedulers.io())
        .map {
            println("map - ${Thread.currentThread().name}")
            return@map it
        }
        .observeOn(Schedulers.single())
        .map {
            println("map2 - ${Thread.currentThread().name}")
            return@map it
        }
        .subscribeOn(Schedulers.computation())
        .subscribe {
            println(it)
        }

    runBlocking {
        delay(1000)
    }
}

//map - RxCachedThreadScheduler-1
//map2 - RxSingleScheduler-1
//onNext - RxComputationThreadPool-1

 

subscribeOn을 통해 구독 스레드는 computation 스레드로 지정되어 onNext - computation thread가 출력된다.

첫 번째 map 연산은 observeOn을 통해 io 스레드로 지정되어 map - io thread가 출력된다.

두 번째 map 연산은 observeOn을 통해 single 스레드로 지정되어 map2 - single thread가 출력된다.

 

 

스케줄러를 활용한 멀티스레딩으로 기본적으로는 단일 스레드에서 동기적으로 동작하는 Reactive X에 대해 비동기 처리를 수행할 수 있다. 

 

 

 

 

책 : http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9791161752976

 

코틀린 리액티브 프로그래밍 - 교보문고

RxKotlin을 사용한 리액티브 프로그래밍 | ★ 이 책에서 다루는 내용 ★▣ 리액티브 프로그래밍 패러다임과 기존 프로젝트 향상 방법▣ RxKotlin 2.0과 ReactiveX 프레임워크▣ 안드로이드에서 RxKotlin 사

www.kyobobook.co.kr

코틀린 리액티브 프로그래밍 - 리부 차크라보티 지음 | 조승진 옮김 | 에이콘출판

 

* 책의 내용이 아닌 책의 내용을 통한 개인의 학습을 정리한 글입니다.

'Kotlin' 카테고리의 다른 글

연산자 - 결합, 그룹핑, 필터링 응용, 오류처리  (0) 2021.12.27
연산자 - 필터링, 변환, 축소  (0) 2021.12.24
Cold Observable / Hot Observable  (0) 2021.12.23
Observable / Observer  (0) 2021.12.23
Reactive Programming  (0) 2021.12.22