Observable / Observer
Kotlin

Observable / Observer

Observable

 

옵저버블(Observerble)은 데이터를 소모하는 컨슈머(Observer)가 소모할 수 있는 데이터를 내보내주는 역할을 한다.

기존 iterator에서 값을 가져오는 pull 방식이 아니라 옵저버블에서 값을 내보내는 push 방식을 채택함을 알 수 있다.

 

옵저버 패턴을 사용하여 옵저버블을 구독하는 옵저버 측에서 옵저버블이 데이터를 내보낼 때 이를 사용한다.

 

Observable의 동작

Observable은 데이터에 대한 처리를 할 수 있는 3가지 메소드를 가진다.

 

onNext : 해당 메소드를 통해 옵저버블은 옵저버에게 데이터를 내보낸다.

 

onComplete : 해당 메소드를 통해 옵저버블은 옵저버에게 모든 아이템의 발행이 완료되었음을 알린다.

터미널 메소드로써, 마지막 아이템까지 발행이 완료되었음을 알리는 메소드이므로 이후의 onNext, onError는 호출되지 않는다.

 

onError : 해당 메소드를 통해 옵저버블은 옵저버에게 아이템 발행 중 에러가 발생했음을 알린다.

터미널 메소드로써, 아이템 발행 중 에러가 발생했음을 알리는 메소드이므로 이후의 onNext, onComplete는 호출되지 않는다.

 

onSubscribe : 구독자가 해당 옵저버블을 구독 할 때 호출되는 메소드이다.

 

fun main() {
    val observer =  object : Observer<Any> {

        // 해당 observer가 observable을 구독할 때 호출
        override fun onSubscribe(d: Disposable) {
            println("onSubscribe")
        }

        // observable에서 새로운 아이템이 발행될 때 호출
        override fun onNext(t: Any) {
            println("onNext = $t")
        }

        // observable에서 error가 발생했을 때 호출되고 observable 종료
        override fun onError(e: Throwable) {
            println("onError = $e")
        }

        // observable의 아이템 발행이 모두 완료될 때 호출되고 observable 종료
        override fun onComplete() {
            println("onComplete")
        }

    }

    // RxKotlin을 통해 List를 Observable로 변환
    val observable = listOf(0,1,2,3,4).toObservable()

    //subscribe 메소드를 통해 해당 observer가 observable 구독
    observable.subscribe(observer)
}

// Result
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onNext = 3
// onNext = 4
// onComplete

 

위 코드처럼 collection에 대해 .toObservable로 변환 할 수도 있지만 팩토리 메서드를 통해 직접 Observable을 생성할 수도 있다.

 

Observable을 생성하는 방법, 팩토리 메서드
1. Observable.create

 

Observable.create는 팩토리 메서드 중 하나로, 직접 Observable의 동작을 구성할 수 있다.

 

fun main() {
    val observer =  object : Observer<Any> {

        override fun onSubscribe(d: Disposable) {
            println("onSubscribe")
        }

        override fun onNext(t: Any) {
            println("onNext = $t")
        }

        override fun onError(e: Throwable) {
            println("onError = $e")
        }

        override fun onComplete() {
            println("onComplete")
        }

    }

    val observable = Observable.create<Int> {
        it.onNext(0)
        it.onNext(1)
        it.onNext(2)
        it.onNext(3)
        it.onNext(4)
        it.onError(IOException())
    }

    observable.subscribe(observer)
}

// Result
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onNext = 3
// onNext = 4
// onError = java.io.IOException

위와 같은 아이템을 발행하는 observable을 create 팩토리 메서드로 구현해보았다.

또한, 이번에는 살짝 다르게 onError를 통해 의도적으로 IOException을 발생할 수 있도록 직접 observable을 생성하였다.

이 때 it은 ObservableEmitter<T>로써, 해당 ObservableEmitter에 존재하는 onNext, onError 등의 메서드를 사용하여 observable의 동작을 구성한다.

 

 

 

Observable을 생성하는 방법, 팩토리 메서드
2. Observable.from

 

Observable.from 은 여러 데이터 타입을 Observable로 변환할 수 있는 팩토리 메서드이다.

 

fromArray() : Array를 Observable로 변환

fromIterable() : ArrayList, LinkedList와 같은 Iterator 인터페이스를 구현한 클래스를 Observable로 변환

fromCallable() : Callable을 Observable로 변환

fromFeature() : Feature를 Observable로 변환

val array = arrayOf(1,2,3) // Array
val list = listOf(4,5,6) // List (Iterable)


val observableFromArray = Observable.fromArray(array) // fromArray
val observableToObservable1 = array.toObservable() // toObservable

val observableFromIterable = Observable.fromIterable(list) // fromIterable
val observableToObservable2 = list.toObservable() // toObservable

 

* fromArray, fromIterable의 경우는 RxKotlin에서 .toObservable()로 사용 할 수 있는 경우 존재.

이 때, toObservable 역시 내부적으로는 from 팩토리 메서드를 사용한다.

 

//toObservable 메서드의 내부 구현
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this)
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)

 

 

Observable을 생성하는 방법, 팩토리 메서드
3. Observable.just

 

Observable.just 는 just 내 인자만을 발행하는 팩토리 메서드이다. 하나의 아이템, 또는 여러 개의 아이템을 발행할 수 있다.

 

val observable = Observable.just(1,2,3)
observable.subscribe(observer)

// Result
// onSubscribe
// onNext = 1
// onNext = 2
// onNext = 3
// onComplete


val observable2 = Observable.just(listOf(0,1), listOf(2,3), listOf(4,5))
observable2.subscribe(observer)

// Result
// onSubscribe
// onNext = [0, 1]
// onNext = [2, 3]
// onNext = [4, 5]
// onComplete

Observable.from과 헷갈리지 않아야 할 부분은, from의 경우는 Iterable의 내부의 각 아이템을 발행한다는 것이고 

just의 경우는 Iterable 하나를 통째로 아이템으로 취급하고 발행한다는 것이다. 헷갈리지 말자!

 

 

Observable을 생성하는 방법, 팩토리 메서드
4. range, empty, interval, timer

그 외에도 Observable을 생성하는 방법에는 range, empty, interval, timer와 같은 팩토리 메서드가 존재한다.

코드와 실행 결과를 통해 확인하도록 한다.

 

// range(start, end) : start 부터 end까지의 수를 발행한다.
val rangeObservable = Observable.range(0,3)
rangeObservable.subscribe(observer)

// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onComplete



// empty() : onNext 없이 (아이템을 발행하지 않고) 즉시 onComplete를 발생시킨다.
val emptyObservable = Observable.empty<Any>()
rangeObservable.subscribe(observer)

// onSubscribe
// onComplete



runBlocking {
  // interval(period, unit) : 구독취소, 프로그램 종료 전까지 period 간격 마다 0부터 숫자를 발행한다.
  val intervalObservable = Observable.interval(200, TimeUnit.MILLISECONDS)
  intervalObservable.subscribe(observer)
  delay(600)
}

// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2


runBlocking {
  // timer(period, unit) : period 간격 뒤에 딱 한 번 발행한다.
  val timerObservable = Observable.timer(200, TimeUnit.MILLISECONDS)
  timerObservable.subscribe(observer)
  delay(600)
}

// onSubscribe
// onNext = 0
// onComplete

 

Observer의 구독 해지

Observer 인스턴스를 통해 Observable을 구독하게 되면 subscribe 연산자는 Disposable의 인스턴스를 반환한다.

해당 Disposable 인스턴스의 dispose() 메소드를 통해 구독을 해지할 수 있으며,

해당 Disposable 인스턴스는 onSubscribe 메소드의 매개변수로 얻을 수 있다.

 

interval 팩토리 메서드로 무한정 아이템을 내보내는 Observable의 구독을 해지하는 예제를 본다.

 

val observer = object : Observer<Any> {

    lateinit var disposable: Disposable

    override fun onSubscribe(d: Disposable) {
        println("onSubscribe")
        disposable = d // disposable 인스턴스 저장
    }

    override fun onNext(t: Any) {
        println("onNext = $t")
        
        // Disposable이 dispose 상태가 아니고 배출된 값이 10 이상이면 dispose
        if (t as Long >= 10 && !disposable.isDisposed) {
            disposable.dispose()
            println("Dispose!")
        }
    }

    override fun onError(e: Throwable) {
        println("onError = $e")
    }

    override fun onComplete() {
        println("onComplete")
    }
}

runBlocking {
        // interval(period, unit) : 구독취소, 프로그램 종료 전까지 period 간격 마다 0부터 숫자를 발행한다.
        val intervalObservable = Observable.interval(200, TimeUnit.MILLISECONDS)
        intervalObservable.subscribe(observer)
        delay(10000)
    }
    
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onNext = 3
// onNext = 4
// onNext = 5
// onNext = 6
// onNext = 7
// onNext = 8
// onNext = 9
// onNext = 10
// Dispose!

 

onSubscribe 메서드의 disposable 매개변수를 저장하고, onNext 메서드에서 isDisposed를 통해 현재 disposable이 dispose 상태인지 확인한다. dispose 상태가 아니고 동시에 아이템이 10 이상이면 dispose를 수행한다.

 

그 결과로, 10 이후에는 dispose 되어 더이상 onNext가 수행되지 않음을 확인 할 수 있다.

 

 

 

 

책 : http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9791161752976

 

코틀린 리액티브 프로그래밍 - 교보문고

RxKotlin을 사용한 리액티브 프로그래밍 | ★ 이 책에서 다루는 내용 ★▣ 리액티브 프로그래밍 패러다임과 기존 프로젝트 향상 방법▣ RxKotlin 2.0과 ReactiveX 프레임워크▣ 안드로이드에서 RxKotlin 사

www.kyobobook.co.kr

코틀린 리액티브 프로그래밍 - 리부 차크라보티 지음 | 조승진 옮김 | 에이콘출판

 

* 책의 내용이 아닌 책의 내용을 통한 개인의 학습을 정리한 게시글입니다.