Observable
옵저버블(Observerble)은 데이터를 소모하는 컨슈머(Observer)가 소모할 수 있는 데이터를 내보내주는 역할을 한다.
기존 iterator에서 값을 가져오는 pull 방식이 아니라 옵저버블에서 값을 내보내는 push 방식을 채택함을 알 수 있다.
옵저버 패턴을 사용하여 옵저버블을 구독하는 옵저버 측에서 옵저버블이 데이터를 내보낼 때 이를 사용한다.
Observable의 동작
Observable은 데이터에 대한 처리를 할 수 있는 3가지 메소드를 가진다.
onNext : 해당 메소드를 통해 옵저버블은 옵저버에게 데이터를 내보낸다.
onComplete : 해당 메소드를 통해 옵저버블은 옵저버에게 모든 아이템의 발행이 완료되었음을 알린다.
터미널 메소드로써, 마지막 아이템까지 발행이 완료되었음을 알리는 메소드이므로 이후의 onNext, onError는 호출되지 않는다.
onError : 해당 메소드를 통해 옵저버블은 옵저버에게 아이템 발행 중 에러가 발생했음을 알린다.
터미널 메소드로써, 아이템 발행 중 에러가 발생했음을 알리는 메소드이므로 이후의 onNext, onComplete는 호출되지 않는다.
onSubscribe : 구독자가 해당 옵저버블을 구독 할 때 호출되는 메소드이다.
fun main() {
val observer = object : Observer<Any> {
// 해당 observer가 observable을 구독할 때 호출
override fun onSubscribe(d: Disposable) {
println("onSubscribe")
}
// observable에서 새로운 아이템이 발행될 때 호출
override fun onNext(t: Any) {
println("onNext = $t")
}
// observable에서 error가 발생했을 때 호출되고 observable 종료
override fun onError(e: Throwable) {
println("onError = $e")
}
// observable의 아이템 발행이 모두 완료될 때 호출되고 observable 종료
override fun onComplete() {
println("onComplete")
}
}
// RxKotlin을 통해 List를 Observable로 변환
val observable = listOf(0,1,2,3,4).toObservable()
//subscribe 메소드를 통해 해당 observer가 observable 구독
observable.subscribe(observer)
}
// Result
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onNext = 3
// onNext = 4
// onComplete
위 코드처럼 collection에 대해 .toObservable로 변환 할 수도 있지만 팩토리 메서드를 통해 직접 Observable을 생성할 수도 있다.
Observable을 생성하는 방법, 팩토리 메서드
1. Observable.create
Observable.create는 팩토리 메서드 중 하나로, 직접 Observable의 동작을 구성할 수 있다.
fun main() {
val observer = object : Observer<Any> {
override fun onSubscribe(d: Disposable) {
println("onSubscribe")
}
override fun onNext(t: Any) {
println("onNext = $t")
}
override fun onError(e: Throwable) {
println("onError = $e")
}
override fun onComplete() {
println("onComplete")
}
}
val observable = Observable.create<Int> {
it.onNext(0)
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onNext(4)
it.onError(IOException())
}
observable.subscribe(observer)
}
// Result
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onNext = 3
// onNext = 4
// onError = java.io.IOException
위와 같은 아이템을 발행하는 observable을 create 팩토리 메서드로 구현해보았다.
또한, 이번에는 살짝 다르게 onError를 통해 의도적으로 IOException을 발생할 수 있도록 직접 observable을 생성하였다.
이 때 it은 ObservableEmitter<T>로써, 해당 ObservableEmitter에 존재하는 onNext, onError 등의 메서드를 사용하여 observable의 동작을 구성한다.
Observable을 생성하는 방법, 팩토리 메서드
2. Observable.from
Observable.from 은 여러 데이터 타입을 Observable로 변환할 수 있는 팩토리 메서드이다.
fromArray() : Array를 Observable로 변환
fromIterable() : ArrayList, LinkedList와 같은 Iterator 인터페이스를 구현한 클래스를 Observable로 변환
fromCallable() : Callable을 Observable로 변환
fromFeature() : Feature를 Observable로 변환
val array = arrayOf(1,2,3) // Array
val list = listOf(4,5,6) // List (Iterable)
val observableFromArray = Observable.fromArray(array) // fromArray
val observableToObservable1 = array.toObservable() // toObservable
val observableFromIterable = Observable.fromIterable(list) // fromIterable
val observableToObservable2 = list.toObservable() // toObservable
* fromArray, fromIterable의 경우는 RxKotlin에서 .toObservable()로 사용 할 수 있는 경우 존재.
이 때, toObservable 역시 내부적으로는 from 팩토리 메서드를 사용한다.
//toObservable 메서드의 내부 구현
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this)
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)
Observable을 생성하는 방법, 팩토리 메서드
3. Observable.just
Observable.just 는 just 내 인자만을 발행하는 팩토리 메서드이다. 하나의 아이템, 또는 여러 개의 아이템을 발행할 수 있다.
val observable = Observable.just(1,2,3)
observable.subscribe(observer)
// Result
// onSubscribe
// onNext = 1
// onNext = 2
// onNext = 3
// onComplete
val observable2 = Observable.just(listOf(0,1), listOf(2,3), listOf(4,5))
observable2.subscribe(observer)
// Result
// onSubscribe
// onNext = [0, 1]
// onNext = [2, 3]
// onNext = [4, 5]
// onComplete
Observable.from과 헷갈리지 않아야 할 부분은, from의 경우는 Iterable의 내부의 각 아이템을 발행한다는 것이고
just의 경우는 Iterable 하나를 통째로 아이템으로 취급하고 발행한다는 것이다. 헷갈리지 말자!
Observable을 생성하는 방법, 팩토리 메서드
4. range, empty, interval, timer
그 외에도 Observable을 생성하는 방법에는 range, empty, interval, timer와 같은 팩토리 메서드가 존재한다.
코드와 실행 결과를 통해 확인하도록 한다.
// range(start, end) : start 부터 end까지의 수를 발행한다.
val rangeObservable = Observable.range(0,3)
rangeObservable.subscribe(observer)
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onComplete
// empty() : onNext 없이 (아이템을 발행하지 않고) 즉시 onComplete를 발생시킨다.
val emptyObservable = Observable.empty<Any>()
rangeObservable.subscribe(observer)
// onSubscribe
// onComplete
runBlocking {
// interval(period, unit) : 구독취소, 프로그램 종료 전까지 period 간격 마다 0부터 숫자를 발행한다.
val intervalObservable = Observable.interval(200, TimeUnit.MILLISECONDS)
intervalObservable.subscribe(observer)
delay(600)
}
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
runBlocking {
// timer(period, unit) : period 간격 뒤에 딱 한 번 발행한다.
val timerObservable = Observable.timer(200, TimeUnit.MILLISECONDS)
timerObservable.subscribe(observer)
delay(600)
}
// onSubscribe
// onNext = 0
// onComplete
Observer의 구독 해지
Observer 인스턴스를 통해 Observable을 구독하게 되면 subscribe 연산자는 Disposable의 인스턴스를 반환한다.
해당 Disposable 인스턴스의 dispose() 메소드를 통해 구독을 해지할 수 있으며,
해당 Disposable 인스턴스는 onSubscribe 메소드의 매개변수로 얻을 수 있다.
interval 팩토리 메서드로 무한정 아이템을 내보내는 Observable의 구독을 해지하는 예제를 본다.
val observer = object : Observer<Any> {
lateinit var disposable: Disposable
override fun onSubscribe(d: Disposable) {
println("onSubscribe")
disposable = d // disposable 인스턴스 저장
}
override fun onNext(t: Any) {
println("onNext = $t")
// Disposable이 dispose 상태가 아니고 배출된 값이 10 이상이면 dispose
if (t as Long >= 10 && !disposable.isDisposed) {
disposable.dispose()
println("Dispose!")
}
}
override fun onError(e: Throwable) {
println("onError = $e")
}
override fun onComplete() {
println("onComplete")
}
}
runBlocking {
// interval(period, unit) : 구독취소, 프로그램 종료 전까지 period 간격 마다 0부터 숫자를 발행한다.
val intervalObservable = Observable.interval(200, TimeUnit.MILLISECONDS)
intervalObservable.subscribe(observer)
delay(10000)
}
// onSubscribe
// onNext = 0
// onNext = 1
// onNext = 2
// onNext = 3
// onNext = 4
// onNext = 5
// onNext = 6
// onNext = 7
// onNext = 8
// onNext = 9
// onNext = 10
// Dispose!
onSubscribe 메서드의 disposable 매개변수를 저장하고, onNext 메서드에서 isDisposed를 통해 현재 disposable이 dispose 상태인지 확인한다. dispose 상태가 아니고 동시에 아이템이 10 이상이면 dispose를 수행한다.
그 결과로, 10 이후에는 dispose 되어 더이상 onNext가 수행되지 않음을 확인 할 수 있다.
책 : http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9791161752976
코틀린 리액티브 프로그래밍 - 리부 차크라보티 지음 | 조승진 옮김 | 에이콘출판
* 책의 내용이 아닌 책의 내용을 통한 개인의 학습을 정리한 게시글입니다.
'Kotlin' 카테고리의 다른 글
연산자 - 필터링, 변환, 축소 (0) | 2021.12.24 |
---|---|
Cold Observable / Hot Observable (0) | 2021.12.23 |
Reactive Programming (0) | 2021.12.22 |
[Android, Kotlin] 제네릭의 in, out 키워드는 무엇일까? (9) | 2021.11.28 |
[Kotlin] object / companion object를 통한 Java의 static 키워드 대체 (0) | 2021.11.20 |