Kotlin

연산자 - 결합, 그룹핑, 필터링 응용, 오류처리

1. startWith

 

startWith 연산자를 통해 기존 observable의 앞에 새로운 observable을 결합 할 수 있다.

 

fun main() {
    val observable = Observable.range(11, 10)
    val observable2 = Observable.range(1, 10)
    observable
        .startWith(observable2)
        .subscribe { result ->
            println("onNext = $result")
        }
}

//onNext = 1
//onNext = 2
//onNext = 3
//onNext = 4
//onNext = 5
//onNext = 6
//onNext = 7
//onNext = 8
//onNext = 9
//onNext = 10
//onNext = 11
//onNext = 12
//onNext = 13
//onNext = 14
//onNext = 15
//onNext = 16
//onNext = 17
//onNext = 18
//onNext = 19
//onNext = 20

 

11 ~ 20까지 발행하는 observable의 앞에 1 ~ 10까지 발행하는 observable을 startWith를 통해 결합하였다.

 

2. zip

 

zip 연산자는 다수의 observable을 결합한다.

이 때, zip 된 모든 observable이 각 아이템을 발행하였을 때 zip 하여 return 된 observable에서 각 observable에서 발행한 값을 합쳐 발행한다. zip을 수행할 모든 observable의 아이템이 업데이트 되었을 때 발행한다는 의미이다.

 

즉, 3개의 아이템을 발행하는 A, 4개의 아이템을 발행하는 B, 5개의 아이템을 발행하는 C observable을 zip하였을 때, zip을 통해 return 된 observable은 총 3번의 발행이 이루어지며 나머지 B, C의 발행값은 drop된다.

 

fun main() {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val observable2 = Observable.interval(300, TimeUnit.MILLISECONDS)
    val observable3 = Observable.interval(500, TimeUnit.MILLISECONDS)

    Observable
        .zip(observable, observable2, observable3) { a, b, c ->
        "A: $a / B: $b / C: $c" 
        }
        .subscribe { println(it) }

    runBlocking {
        delay(3000)
    }
}

//A: 0 / B: 0 / C: 0
//A: 1 / B: 1 / C: 1
//A: 2 / B: 2 / C: 2
//A: 3 / B: 3 / C: 3
//A: 4 / B: 4 / C: 4
//A: 5 / B: 5 / C: 5

 

각 observable은 각각 100ms, 300ms, 500ms마다 아이템을 발행한다.

3개의 observable이 모두 새로운 값을 발행했을 때에 zip을 통해 return 된 observable은 값을 결합하여 발행한다.

 

3. combineLatest

 

combineLatest 연산자는 zip 연산자와 유사하게 동작한다.

그러나 combineLatest는 zip과 달리 결합한 observable 중 하나라도 값이 update 되면 즉시 아이템을 발행한다는 점에서 차이가 있다.

 

fun main() {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val observable2 = Observable.interval(300, TimeUnit.MILLISECONDS)
    val observable3 = Observable.interval(500, TimeUnit.MILLISECONDS)

    Observable
        .combineLatest(observable, observable2, observable3) { a, b, c ->
        "A: $a / B: $b / C: $c" }
        .subscribe {
            println(it)
        }

    runBlocking {
        delay(1000)
    }
}

//A: 4 / B: 0 / C: 0
//A: 4 / B: 1 / C: 0
//A: 5 / B: 1 / C: 0
//A: 6 / B: 1 / C: 0
//A: 7 / B: 1 / C: 0
//A: 8 / B: 1 / C: 0
//A: 8 / B: 2 / C: 0
//A: 9 / B: 2 / C: 0
//A: 9 / B: 2 / C: 1

 

위 zip 코드에서 combineLatest로 바꾸었다.

결과가 달라진 것을 볼 수 있는데, 500ms 전까지는 C의 발행값이 아무것도 없어 아무것도 발행되지 않았다.

500ms부터는 C도 발행을 시작하여 4,0,0이 발행되었다.

600ms 때에는 B가 1을 발행하여 4,1,0이 발행되었고, 동시에 A도 5를 발행하여 5,1,0도 발행되었다.

이런 식으로, combineLatest는 zip과 달리 하나의 observable에서 새로운 값이 발행될 때마다 업데이트된다.

 

4. merge

merge 연산자는 다수의 observable을 결합하여 각 아이템에 대해 동일한 작업을 수행 할 수 있다.

merge 연산자를 통해 최대 4개의 observable을 결합할 수 있으며, 더 많은 observable을 결합할 시에는 mergeArray 연산자를 이용한다.

 

fun main() {
    val observable = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map { 
            "ob1 : $it"
        }
    val observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
        .map { 
            "ob2 : $it"
        }

    Observable
        .merge(observable, observable2)
        .subscribe { 
            println(it)
        }

    runBlocking {
        delay(1000)
    }
}

//ob1 : 0
//ob2 : 0
//ob1 : 1
//ob1 : 2
//ob2 : 1

 

merge 연산자는 위 코드의 결과에서 보이는 것과 같이 merge의 순서를 보장하지 않는다.

merge의 결합 순서에 상관없이 비동기적으로 처리한다.

 

 

5. concat

 

concat 연산자는 merge 연산자와 유사한 동작을 하지만, 가장 큰 차이점은 하나의 observable이 onComplete가 호출 된 후에 다른 observable의 구독이 수행된다는 점이다. 즉, 동기적으로 처리한다.

 

fun main() {
    val observable = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map {
            "ob1 : $it"
        }.take(2)
    val observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
        .map {
            "ob2 : $it"
        }

    Observable
        .concat(observable, observable2)
        .subscribe {
            println(it)
        }

    runBlocking {
        delay(2000)
    }
}

//ob1 : 0
//ob1 : 1
//ob2 : 0
//ob2 : 1

 

interval observable1에게서 onComplete 호출을 위해 take 연산자로 2개의 값만을 발행하고 종료하도록 하였다.

observable2는 observable1이 2개의 값을 발행하고 종료된 후에야 구독이 되어 값이 순차적으로 발행됨을 확인 할 수 있다.

 

 

+ flatMap과 concatMap

flatMap 연산자는 내부적으로 merge 연산자를 사용하여 비동기적으로 처리한다.

concatMap 연산자는 내부적으로 concat 연산자를 사용하여 동기적으로 처리한다.

 

flatMap은 순서에 상관없이 비동기적으로 데이터를 처리할 때 주로 사용하고,

concatMap은 순서가 중요하여 기존 순서를 그대로 유지하며 데이터를 처리할 때 주로 사용한다.

 

 

6. amb

 

amb 연산자는 observable 중 가장 먼저 아이템을 발행한 observable만을 구독하고 나머지 observable의 결과는 모두 drop한다.

매개변수로 Iterable<Observable>을 받는다.

 

fun main() {
    val observable = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map {
            "ob1 : $it"
        }
    val observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
        .map {
            "ob2 : $it"
        }
    val observable3 = Observable.interval(700, TimeUnit.MILLISECONDS)
        .map {
            "ob3 : $it"
        }

    Observable
        .amb(listOf(observable, observable2, observable3))
        .subscribe {
            println(it)
        }

    runBlocking {
        delay(2000)
    }
}

//ob1 : 0
//ob1 : 1
//ob1 : 2
//ob1 : 3
//ob1 : 4
//ob1 : 5

 

3개의 observable 중 observable1이 300ms에 가장 먼저 아이템을 발행하였으므로 나머지 observable은 무시하고 observable1만을 구독한다.

 

7. groupBy

 

groupBy 연산자는 람다 내의 결과에 따라 아이템을 그룹화한다.

groupBy를 통해 그룹화하여 return 되는 형태는 Observable<GroupedObservable<K,T>> 형태로,

람다 내의 결과가 그룹의 key(K)가 된다.

 

fun main() {
    val observable = Observable.range(1, 10)

    observable
        .groupBy {
            it > 5
        }
        .subscribe {
            println("key: ${it.key}")
            // groupBy의 it = Group의 각 Observable
            // 각 Observable을 subscribe 처리
            it.subscribe { value ->
                println("onNext = $value")
            }
        }
}

//key: false
//onNext = 1
//onNext = 2
//onNext = 3
//onNext = 4
//onNext = 5
//key: true
//onNext = 6
//onNext = 7
//onNext = 8
//onNext = 9
//onNext = 10

 

groupBy를 통해 1 ~ 10을 발행하는 observable을 그룹핑했다.

람다 내에는 it > 5 라는 조건이 붙어있는데, 5를 기준으로 5 이하의 값은 false라는 key를 가지는 그룹으로 그룹핑 될 것이고,

5 초과의 값은 true라는 key를 가지는 그룹으로 그룹핑 된다.

groupBy를 통해 나오는 결과는 GroupedObservable<K,T> 이므로 해당 리턴값을 다시 subscribe 해주어야 한다.

 

 

8. switchMap

 

switchMap 연산자는 각 아이템에 대한 변환을 수행하는데, 변환된 아이템의 소비가 새로운 아이템의 발행보다 느리다면 drop 된다.

 

fun main() {
    val observable = Observable.range(1, 10)

    observable
        .switchMap {
            return@switchMap Observable.just(it).delay(100, TimeUnit.MILLISECONDS)
        }
        .subscribe {
            println("onNext = $it")
        }
    runBlocking {
        delay(1000)
    }
}

//onNext = 10

 

observable은 1~10까지의 값을 발행하였지만 100ms의 딜레이 동안 새로운 값이 발행되어 drop 되었으므로 마지막 값만이 소비된다.

 

9. skip, take

 

skip 연산자는 주어진 조건에 맞는 아이템을 생략하는 연산자로, 2가지의 오버로드가 존재한다.

skip(count) : count 만큼의 아이템을 drop

skip(time, timeunit) : 설정한 시간 동안의 아이템을 drop

 

take 연산자는 skip과 정확히 반대로 동작하는 연산자로, 주어진 조건에 맞는 아이템만을 출력하는 연산자이다.

(정확히 반대로 동작하므로, 예제코드와 결과는 생략한다.)

 

fun main() {
    val observable = Observable.range(1, 10)
    val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
    observable
        .skip(5)
        .subscribe {
            println("skip(count) = $it")
        }

    observable2
        .skip(300, TimeUnit.MILLISECONDS)
        .subscribe {
            println("skip(time) = $it")
        }

    runBlocking { delay(1000) }
}

//skip(count) = 6
//skip(count) = 7
//skip(count) = 8
//skip(count) = 9
//skip(count) = 10
//skip(time) = 3
//skip(time) = 4
//skip(time) = 5
//skip(time) = 6
//skip(time) = 7
//skip(time) = 8
//skip(time) = 9

skip(5)의 경우는 5개의 item을 drop하여 6~10의 값만이 출력되었다.

skip(300, TimeUnit.MILISECONDS)의 경우는 300ms간 item을 drop하여 0~2의 값이 drop 되었다.

 

 

9. skipLast, takeLast

 

skipLast 연산자는 skip과 동일한 동작을 수행한다. 다만 차이점은 맨 뒤의 index부터 skip이 적용 된다는 점이다.

takeLast 연산자는 맨 뒤의 index부터 take가 적용된다.

fun main() {
    val observable = Observable.range(1, 10)
    val observable2 = Observable.interval(100, TimeUnit.MILLISECONDS)
    observable
        .skipLast(5)
        .subscribe {
            println("skip(count) = $it")
        }

    observable2
        .skipLast(300, TimeUnit.MILLISECONDS)
        .subscribe {
            println("skip(time) = $it")
        }

    runBlocking { delay(1000) }
}

//skip(count) = 1
//skip(count) = 2
//skip(count) = 3
//skip(count) = 4
//skip(count) = 5
//skip(time) = 0
//skip(time) = 1
//skip(time) = 2
//skip(time) = 3
//skip(time) = 4
//skip(time) = 5
//skip(time) = 6

 

skip과 같은 오버로드를 가진다. 

다만 결과에서의 차이점은 모두 뒤의 배출값들이 drop 되었다는 점이다.

 

 

10. skipWhile, takeWhile

 

skipWhile 역시 skip과 마찬가지로 배출값을 건너뛴다는 공통점이 있다.

skipWhile은 람다 내 조건식이 true일 때까지 배출을 건너뛰다가, false인 순간부터 배출을 수행한다.

 

takeWhile은 람다 내 조건식이 true일 때까지 배출을 수행하다가, false인 순간부터 배출을 건너뛴다.

 

fun main() {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    observable
        .skipWhile {
            it.toInt() < 5
        }
        .subscribe {
            println("onNext = $it")
        }

    runBlocking { delay(1000) }
}

//onNext = 5
//onNext = 6
//onNext = 7
//onNext = 8
//onNext = 9

 

skipWhile 내 람다에 의해 배출값이 5 미만일 때까지는 배출을 수행하지 않는다.

5 이상이 되었을 때부터 이후의 값들은 모두 배출한다.

 

11. skipUntil

skipUntil 연산자는 매개변수의 observable이 배출을 시작하기 전까지 해당 observable의 모든 배출을 건너뛴다.

 

fun main() {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
    val observable2 = Observable.timer(600, TimeUnit.MILLISECONDS)
    observable
        .skipUntil(observable2)
        .subscribe {
            println("onNext = $it")
        }

    runBlocking { delay(1000) }
}

//onNext = 5
//onNext = 6
//onNext = 7
//onNext = 8
//onNext = 9

 

observable2는 timer를 통해 600ms 뒤에 배출을 수행한다.

observable.skipUntil(observable2)를 통해 observable2가 배출을 수행할 때 observable도 배출을 수행할 수 있도록 설정하였으므로 observable2가 배출을 수행하지 않는 600ms 동안의 모든 배출값들은 drop된다.

 

 

12. onErrorReturn

 

onErrorReturn 연산자는 오류 처리 연산자로, 에러가 발생했을 때 다운스트림으로 기본값을 전달할 수 있다.

 

fun main() {
    val observable = Observable.range(1, 10)
    observable
        .map { it / 0 }
        .onErrorReturn { 999 }
        .subscribe {
            println("onNext = $it")
        }
}

//onNext = 999

 

첫번째 배출부터 오류가 발생하여 기본값인 999를 발행하고, 구독이 종료된다.

 

13. onErrorResumeNext

 

위 onErrorReturn의 경우는 에러가 발생하면 기본값 배출 후 구독이 종료된다.

그러나 onErrorResumeNext는 에러가 발생하면 새로운 observable을 구독한다.

 

fun main() {
    val observable = Observable.range(1, 10)
    observable
        .map { it / 0 }
        .onErrorResumeNext(Observable.range(100, 10))
        .subscribe {
            println("onNext = $it")
        }
}

//onNext = 100
//onNext = 101
//onNext = 102
//onNext = 103
//onNext = 104
//onNext = 105
//onNext = 106
//onNext = 107
//onNext = 108
//onNext = 109

 

첫번째 배출부터 오류가 발생하여 기존 구독을 취소하고 대신 매개변수로 들어온 100 ~ 109를 발행하는 observable을 구독한다.

 

14. retry

 

retry 연산자는 오류 발생 시 발행을 재시도하는 횟수 또는 조건을 지정할 수 있는 연산자이다.

 

fun main() {
    val observable = Observable.range(1, 10)
    observable
        .map { it / (3 - it) }
        .retry(2)
        .subscribe({
            println("onNext = $it")
        }, {
            println("onError = $it")
        }, {
            println("onComplete")
        })
}

//onNext = 0
//onNext = 2
//onNext = 0
//onNext = 2
//onNext = 0
//onNext = 2
//onError = java.lang.ArithmeticException: / by zero

 

retry(2)를 통해 오류 발생 시 2번의 재시도를 수행한다.

주어진 횟수만큼 재시도 후에도 오류가 발생하면 그 때 onError를 호출하고 구독을 종료한다.

 

fun main() {
    val observable = Observable.range(1, 10)
    var retryCount = 0
    observable
        .map { it / (3 - it) }
        .retry { _,_ ->
            (++retryCount) < 3
        }
        .subscribe({
            println("onNext = $it")
        }, {
            println("onError = $it")
        }, {
            println("onComplete")
        })
}

//onNext = 0
//onNext = 2
//onNext = 0
//onNext = 2
//onNext = 0
//onNext = 2
//onError = java.lang.ArithmeticException: / by zero

 

retry의 람다형태를 통해 주어진 조건이 true일 경우 재시도, false일 경우 onError 호출 후 구독을 종료한다.

이 역시도 마찬가지로 오류 발생 시 2번의 재시도 후 onError를 호출 후 구독을 종료한다.

 

 

 

 

 

책 : http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9791161752976

 

코틀린 리액티브 프로그래밍 - 교보문고

RxKotlin을 사용한 리액티브 프로그래밍 | ★ 이 책에서 다루는 내용 ★▣ 리액티브 프로그래밍 패러다임과 기존 프로젝트 향상 방법▣ RxKotlin 2.0과 ReactiveX 프레임워크▣ 안드로이드에서 RxKotlin 사

www.kyobobook.co.kr

코틀린 리액티브 프로그래밍 - 리부 차크라보티 지음 | 조승진 옮김 | 에이콘출판

 

* 책의 내용이 아닌 책의 내용을 통한 개인의 학습을 정리한 글입니다.

'Kotlin' 카테고리의 다른 글

스케줄러를 통한 멀티스레딩, subscribeOn, observeOn  (0) 2021.12.30
연산자 - 필터링, 변환, 축소  (0) 2021.12.24
Cold Observable / Hot Observable  (0) 2021.12.23
Observable / Observer  (0) 2021.12.23
Reactive Programming  (0) 2021.12.22