Kotlin

연산자 - 필터링, 변환, 축소

연산자 (Operator)

Observable , Flowable을 피연산자로 사용해 변환된 새로운 Observable, Flowable을 반환하는 함수

 

 

필터링 연산자
1. debounce

 

debounce 연산자는 아이템의 발행 이후 특정 시간이 지난 후에 다운스트림으로 해당 아이템이 전달될 수 있도록 한다.

ex) EditText에서 사용자가 입력 할 때마다 검색이 수행되는 것이 아닌 입력 한 뒤 n초가 지난 후에 검색이 수행되도록 할 때

 

fun main() {
    val observable = Observable.create<String> {
        it.onNext("H")
        runBlocking { delay(100) }
        it.onNext("He")
        runBlocking { delay(100) }
        it.onNext("Hel")
        runBlocking { delay(100) }
        it.onNext("Hell")
        runBlocking { delay(100) }
        it.onNext("Hello")
        runBlocking { delay(300) }
        it.onComplete()
    }

    observable
        .debounce(200, TimeUnit.MILLISECONDS)
        .subscribe {
            println("Search Query = \"$it\"")
        }
}

// Search Query = "Hello"

사용자의 EditText 입력을 Observable.create를 통해 임시로 구현하였다.

debounce를 통해 아이템이 발행된 후 200 ms가 지난 값에 대해서만 다운스트림으로 값을 내려준다.

Hell 까지는 입력 사이 간격이 200ms 이내이기 때문에 다운스트림으로 값이 내려가지 않고, Hello 이후 200ms 이상 간격이 생겼으므로 최종적으로 Hello만 발행된다.

 

 

필터링 연산자
2. distinct, disctinctUntilChanged

distinct 연산자는 observable의 아이템 중 중복된 값(이미 발행된 값)은 필터링하여 발행하지 않는다.

distinctUntilChanged 연산자는 distinct와 비슷한데, 직전에 발행된 값과 중복된 값만 필터링하여 발행하지 않는다.

 

fun main() {
    val observable = Observable.just(1,1,2,2,3,2,4)

    observable
        .distinct()
        .subscribe {
            println("distinct - $it")
        }

    observable
        .distinctUntilChanged()
        .subscribe {
            println("distinctUntilChanged - $it")
        }
}

//distinct - 1
//distinct - 2
//distinct - 3
//distinct - 4
//distinctUntilChanged - 1
//distinctUntilChanged - 2
//distinctUntilChanged - 3
//distinctUntilChanged - 2
//distinctUntilChanged - 4

 

1, 1, 2, 2, 3, 2, 4 의 아이템을 갖는 observable에 대해

distinct 연산자는 이전에 발행했던 모든 데이터에 대한 중복 확인을 통해 1, 2, 3, 4만이 발행되었고,

distinctUntilChanged 연산자는 직전에 발행했던 데이터에 대해서만 중복 확인을 하여 1, 2, 3, 2, 4가 발행되었다.

 

 

필터링 연산자
3. elementAt

 

elementAt 연산자는 n번째 요소만을 발행한다.

 

fun main() {
    val observable = Observable.just(0,1,2,3,4,5)

    observable
        .elementAt(1)
        .subscribe {
            println("elementAt(1) - $it")
        }

    observable
        .elementAt(4)
        .subscribe {
            println("elementAt(4) - $it")
        }
}

//elementAt(1) - 1
//elementAt(4) - 4

 

 

필터링 연산자
4. filter

wow 필터링 연산자 그 자체인 filter 연산자다.

람다 내 로직을 통해 해당 로직에 상응하는 아이템만 발행 할 수 있도록 직접 필터링 할 수 있다.

 

fun main() {
    val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    observable
        .filter { it%2 == 0 }
        .subscribe {
            println("result - $it")
        }
}

//result - 2
//result - 4
//result - 6
//result - 8
//result - 10

 

filter 연산자를 통해 짝수인 아이템만을 발행할 수 있다.

 

 

필터링 연산자
5. first, last

 

first, last 연산자는 각각 아이템의 첫번째 값, 마지막 값만을 Single 형태로 반환한다.

매개변수의 값은 각 요소에 접근 할 수 없는 경우 발행할 기본값이다.

fun main() {
    val observable = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    observable
        .first(-1)
        .subscribe { result ->
            println("first - $result")
        }

    observable
        .last(-1)
        .subscribe { result ->
            println("last - $result")
        }
}

//first - 1
//last - 10

 

필터링 연산자
6. ignoreElements

ignoreElements 연산자는 간단하다. onComplete 이벤트만을 가지는 Completable 모나드를 반환한다.

모나드에 대해서는 아직 자세히 공부하지 않아 다음에 제대로 알아보자.

지금은 본 연산자가 오직 onComplete만을 수행한다는 것으로 기억한다.

 

fun main() {
    val observable = Observable.just(1, 2, 3, 4)

    observable
        .ignoreElements()
        .subscribe {
            println("onComplete!")
        }
}

//onComplete!

 

 

변환 연산자
1. map

map 연산자는 발행된 각 아이템에 대한 변환을 수행한다.

 

fun main() {
    val observable = Observable.just(1, 2, 3, 4)

    observable
        .map {
            "$it to String!"
        }
        .subscribe {
            println("onNext - $it")
        }
}


//onNext - 1 to String!
//onNext - 2 to String!
//onNext - 3 to String!
//onNext - 4 to String!

 

map 연산자를 통해 Int type의 각 아이템을 String으로 변환하였다.

 

 

변환 연산자
2. cast

cast 연산자는 각 아이템을 특정 type으로 casting 할 수 있다.

아이템을 특정 type으로 casting하기 위해 map 연산자를 사용할 수도 있으나, casting만이 목적이라면 cast 연산자를 사용했을 때 코드가 훨씬 간결해진다.

 

fun main() {
    val observable = Observable.just(IntData(1), IntData(2))

    // map을 사용한 타입 캐스팅
    observable
        .map {
            it as ParentIntData
        }
        .subscribe {
            println("map - $it")
        }

    // cast를 사용한 타입 캐스팅
    observable
        .cast(ParentIntData::class.java)
        .subscribe {
            println("cast - $it")
        }
}

open class ParentIntData(val value: Int) {
    override fun toString(): String {
        return "ParentIntData($value)"
    }
}
class IntData(value: Int) : ParentIntData(value) {
    override fun toString(): String {
        return "IntData($value)"
    }
}

//map - IntData(1)
//map - IntData(2)
//cast - IntData(1)
//cast - IntData(2)

 

map과 cast를 이용하여 IntData를 ParentIntData로 타입 캐스팅 한 것이다.

람다를 이용하는 map과는 달리 cast의 경우에는 더 간결하게 타입 캐스팅을 수행할 수 있다.

 

 

변환 연산자
3. flatMap

 

발행된 각 item을 변환하여 하나의 observable을 반환하는 map과는 달리

flatMap 연산자는 발행된 각 item에 대한 각각의 observable을 반환한다.

주로 각 아이템에 대해 여러 아이템을 발행할 때 사용한다.

 

fun main() {
    val observable = Observable.just(1, 4, 7, 10)

    // flatMap을 통해 각 아이템(1, 4, 7, 10)에 대한 새로운 observable return
    // 1에 대한 새로운 observable은 1,2,3을 발행
    // 4에 대한 새로운 observable은 4,5,6을 발행
    // ...
    observable
        .flatMap { value ->
            Observable.create<Int> {
                it.onNext(value)
                it.onNext(value+1)
                it.onNext(value+2)
                it.onComplete()
            }
        }
        .subscribe ({
            println("onNext - $it")
        }, {
            println("onError!")
        }, {
            println("onComplete!")
        })
}

//onNext - 1
//onNext - 2
//onNext - 3
//onNext - 4
//onNext - 5
//onNext - 6
//onNext - 7
//onNext - 8
//onNext - 9
//onNext - 10
//onNext - 11
//onNext - 12
//onComplete!

 

flatMap을 통해 기존 observable에서 발행한 값인 1, 4, 7, 10을 각각 새로운 observable로 리턴한다.

각 observable은 기존 발행 값, 발행값+1, 발행값+2를 반환한다.

즉, 1 -> observable(1,2,3)

4 -> observable(4,5,6)

7 -> observable(7,8,9)

10 -> observable(10,11,12) 로 새로운 observable을 리턴한다.

 

여기서 눈여겨 볼 점은 각 observable마다 onComplete 동작이 있는데, onComplete는 실제로 한 번만 수행되었다는 점이다.

flatMap은 return 된 각 observable을 내부적으로 merge하기 때문에, onComplete가 한 번만 수행된다.

 

 

변환 연산자
4. defaultIfEmpty

 

defaultIfEmpty 연산자는 이름과 같이 비어있는 Observable 반환 시 매개변수의 기본값을 발행한다.

 

fun main() {
    val observable = Observable.just(1,2,3,4)

    // filter를 통해 5 보다 큰 값만 발행하도록 하였으므로 빈 observable 반환됨
    observable
        .filter { it > 5 }
        .defaultIfEmpty(-1)
        .subscribe {
            println("onNext - $it")
        }
}

//onNext - -1

 

filter 연산자를 통해 아이템 중 5보다 큰 값만 발행하도록 하였다.

그러나 observable에는 5보다 큰 아이템이 없으므로 빈 observable이 반환되고, 기본값인 -1만이 발행되었다.

 

 

변환 연산자
5. switchIfEmpty

switchIfEmpty 연산자는 defaultIfEmpty와 유사하지만, 빈 observable 반환 시 기본값이 아닌 대체 할 observable을 반환한다.

 

fun main() {
    val observable = Observable.just(0,1,2,3)

    observable
        .filter { it > 5 }
        .switchIfEmpty(Observable.range(100,10))
        .subscribe {
            println("onNext - $it")
        }
}

//onNext - 100
//onNext - 101
//onNext - 102
//onNext - 103
//onNext - 104
//onNext - 105
//onNext - 106
//onNext - 107
//onNext - 108
//onNext - 109

이번에도 역시 filter로 인해 빈 observable이 반환되었다.

switchIfEmpty 내 매개변수 observable이 구독된 결과를 확인 할 수 있다.

 

변환 연산자
6. startWith

 

startWith 연산자는 observable의 기존 아이템의 맨 앞 index에 새로운 아이템을 추가한다.

 

fun main() {
    val observable = Observable.just(0,1,2,3)

    observable
        .startWith(-123)
        .subscribe {
            println("onNext - $it")
        }
}

//onNext - -123
//onNext - 0
//onNext - 1
//onNext - 2
//onNext - 3

 

 

변환 연산자
7. sorted

 

sorted 연산자는 이름 그대로 아이템을 정렬하는 연산자이다. 사실 변환 연산자,, 보다는 정렬 연산자라고 하는 게 나을 것 같긴 하다.

매개변수가 없을 경우 Comparable type을 가지는 아이템의 compareTo를 호출하여 정렬을 수행한다.

만약 매개변수가 없는데 아이템이 Comparable type이 아니라면 오류가 발생한다.

 

fun main() {
    val observable = Observable.just(2,1,4,3)
    val strObservable = Observable.just("Banana", "Car", "Apple")

    // Int type 아이템에 대한 정렬
    observable
        .sorted()
        .subscribe {
            println("sorted(Int) - $it")
        }

    // String type 아이템에 대한 정렬
    strObservable
        .sorted()
        .subscribe {
            println("sorted(String) - $it")
        }
}

//sorted(Int) - 1
//sorted(Int) - 2
//sorted(Int) - 3
//sorted(Int) - 4
//sorted(String) - Apple
//sorted(String) - Banana
//sorted(String) - Car

 

또는 람다를 통해 직접 compare를 구현할 수도 있다.

fun main() {
    val myItemObservable = Observable.just(MyItem(3), MyItem(5), MyItem(1))
    // data class에 대한 custom 정렬
    myItemObservable
        .sorted { item1, item2 ->
            item1.value - item2.value
        }
        .subscribe {
            println("sorted(Custom) - $it")
        }
}

data class MyItem(val value: Int)

//sorted(Custom) - MyItem(value=1)
//sorted(Custom) - MyItem(value=3)
//sorted(Custom) - MyItem(value=5)

 

 

변환 연산자
8. scan

 

scan 연산자는 observable에서 현재 발행된 값과 scan을 통해 이전에 변환되어 발행된 값을 누적하여 발행한다.

이를 통해 1부터 10까지의 합을 구해본다.

 

fun main() {
    val observable = Observable.range(1, 10)

    // prevScan : scan을 통해 이전에 변환되어 발행된 값
    // currentObs : 현재 observable에서 발행된 값
    observable
        .scan { prevScan, currentObs ->
            prevScan + currentObs
        }
        .subscribe {
            println("onNext = $it")
        }
}

//onNext = 1
//onNext = 3
//onNext = 6
//onNext = 10
//onNext = 15
//onNext = 21
//onNext = 28
//onNext = 36
//onNext = 45
//onNext = 55

 

observable이 1 발행 -> prevScan = 0 + currentObs = 1 -> scan에서 1 발행

observable이 2 발행 -> prevScan = 1 + currentObs = 2 -> scan에서 3 발행

observable이 3 발행 -> prevScan = 3 + currentObs = 3 -> scan에서 6 발행

observable이 4 발행 -> prevScan = 6 + currentObs = 4 -> scan에서 10 발행

observable이 5 발행 -> prevScan = 10 + currentObs = 5 -> scan에서 15 발행

observable이 6 발행 -> prevScan = 15 + currentObs = 6 -> scan에서 21 발행

observable이 7 발행 -> prevScan = 21 + currentObs = 7 -> scan에서 28 발행

observable이 8 발행 -> prevScan = 28 + currentObs = 8 -> scan에서 36 발행

observable이 9 발행 -> prevScan = 36 + currentObs = 9 -> scan에서 45 발행

observable이 10 발행 -> prevScan = 45 + currentObs = 10 -> scan에서 55 발행

 

 

축소 연산자
1. count

 

count 연산자는 간단하다. observable이 발행한 아이템의 수를 Single 형태로 반환한다.

 

fun main() {
    val observable = Observable.range(1, 10)

    observable
        .count()
        .subscribe { count ->
            println("count = $count")
        }
}

//count = 10

 

 

축소 연산자
2. reduce

 

reduce 연산자는 scan과 유사하다.

observable의 각 발행 아이템을 모두 누적하여 onComplete 시에 해당 누적값을 내보낸다.

scan과 다른 점은 각 누적 연산에 대한 발행을 수행하는 scan과 달리 reduce는 값을 누적하고 onComplete 시에 한 번 발행한다.

 

fun main() {
    val observable = Observable.range(1, 10)

    // prevReduce : prevScan을 통해 현재까지 누적된 값
    // currentObs : 현재 observable에서 발행된 값
    observable
        .reduce { prevReduce, currentObs ->
            prevReduce + currentObs
        }
        .subscribe { result ->
            println("onNext = $result")
        }
}

//onNext = 55

 

observable이 1 발행 -> prevReduce = 0 + currentObs = 1 -> 누적 값 1

observable이 2 발행 -> prevReduce = 1 + currentObs = 2 -> 누적 값 3

observable이 3 발행 -> prevReduce = 3 + currentObs = 3 -> 누적 값 6

observable이 4 발행 -> prevReduce = 6 + currentObs = 4 -> 누적 값 10

observable이 5 발행 -> prevReduce = 10 + currentObs = 5 -> 누적 값 15

observable이 6 발행 -> prevReduce = 15 + currentObs = 6 -> 누적 값 21

observable이 7 발행 -> prevReduce = 21 + currentObs = 7 -> 누적 값 28

observable이 8 발행 -> prevReduce = 28 + currentObs = 8 -> 누적 값 36

observable이 9 발행 -> prevReduce = 36 + currentObs = 9 -> 누적 값 45

observable이 10 발행 -> prevReduce = 45 + currentObs = 10 -> 누적 값 55

onComplete() -> 누적값인 55가 발행됨

 

 

 

 

 

책 : http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9791161752976

 

코틀린 리액티브 프로그래밍 - 교보문고

RxKotlin을 사용한 리액티브 프로그래밍 | ★ 이 책에서 다루는 내용 ★▣ 리액티브 프로그래밍 패러다임과 기존 프로젝트 향상 방법▣ RxKotlin 2.0과 ReactiveX 프레임워크▣ 안드로이드에서 RxKotlin 사

www.kyobobook.co.kr

코틀린 리액티브 프로그래밍 - 리부 차크라보티 지음 | 조승진 옮김 | 에이콘출판

 

* 책의 내용이 아닌 책의 내용을 통한 개인의 학습을 정리한 글입니다.