Cold Observable
Cold Observable은 각 observer가 observable을 구독 할 때마다 구독 된 시기에 상관없이 늘 처음부터 배출 된 값을 온전히 받을 수 있는 형태를 말한다.
여태까지 알아봤던 create, from, just와 같은 팩토리 메서드들은 모두 이러한 Cold Observable에 해당한다.
fun main() {
val coldObservable = Observable.just(1, 2, 3, 4)
val observer1 = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
println("observer1 onSubscribe")
}
override fun onNext(t: Int) {
println("observer1 onNext = $t")
}
override fun onError(e: Throwable) {
println("observer1 onError = $e")
}
override fun onComplete() {
println("observer1 onComplete")
}
}
val observer2 = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
println("observer2 onSubscribe")
}
override fun onNext(t: Int) {
println("observer2 onNext = $t")
}
override fun onError(e: Throwable) {
println("observer2 onError = $e")
}
override fun onComplete() {
println("observer2 onComplete")
}
}
// observer1 구독
coldObservable.subscribe(observer1)
runBlocking {
delay(800)
}
// 0.8초 후에 observer2 구독
coldObservable.subscribe(observer2)
}
// observer1 onSubscribe
// observer1 onNext = 1
// observer1 onNext = 2
// observer1 onNext = 3
// observer1 onNext = 4
// observer1 onComplete
// observer2 onSubscribe
// observer2 onNext = 1
// observer2 onNext = 2
// observer2 onNext = 3
// observer2 onNext = 4
// observer2 onComplete
just를 통해 1, 2, 3, 4를 배출하는 observable에 대해 observer1이 구독을 했을 때 1, 2, 3, 4를 모두 받는다.
0.8초 후 observer2가 구독을 했을 때에도 마찬가지로 1, 2, 3, 4를 모두 받는 것을 확인 할 수 있다.
이렇게 구독 시기와 상관 없이 observable이 배출한 모든 값을 받을 수 있음이 보장되는 것이 Cold Observable에 해당한다.
위와 같이 subscribe를 호출하지 않는다면 데이터를 발행하지 않는다.
이러한 Cold Observable은 주로 데이터를 온전하게 모두 가져와야 하는 API 호출, Room DB 등에 사용된다.
Hot Observable
Hot Observable은 Cold Observable과 반대되는 개념으로, 구독 여부에 관계 없이 아이템을 발행한다.
즉, observer의 구독 시기에 따라서 이전에 발행된 아이템을 받지 못할 수도 있다는 것이다.
본 책에서 Cold Observable과 Hot Observable을 비유한 예시가 인상적이므로 인용하여 적어보자면,
콜드 옵저버블을 CD/DVD 레코딩으로 본다면,
핫 옵저버블은 TV 채널과 비슷하게 시청자가 시청하는지 여부에 관계없이 콘텐츠를 계속 브로드캐스팅(배출)한다.
위의 말이 가장 적합한 비유인 것 같다.
이제 정의를 알았으니 Cold Observable을 Hot Observable로 변환하는 방법에 대해 알아본다.
Hot Observable로의 변환
1. ConnectableObservable
ConnectableObservable은 Cold Observable을 Hot Observable로 변환할 수 있는 객체이다.
ConnectableObservable은 기존 Cold Observable과는 달리, subscribe에 의해 배출이 시작되는 것이 아니라 connect에 의해 배출이 시작된다. 그러므로, connect 이후에 구독을 한 observe들은 이전에 배출된 아이템을 받지 못하게 된다.
fun main() {
// Observable.publish()를 통해 connectableObservable 생성
val connectableObservable = Observable.interval(100L, TimeUnit.MILLISECONDS).publish()
val observer1 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer1 onSubscribe")
}
override fun onNext(t: Long) {
println("observer1 onNext = $t")
}
override fun onError(e: Throwable) {
println("observer1 onError = $e")
}
override fun onComplete() {
println("observer1 onComplete")
}
}
val observer2 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer2 onSubscribe")
}
override fun onNext(t: Long) {
println("observer2 onNext = $t")
}
override fun onError(e: Throwable) {
println("observer2 onError = $e")
}
override fun onComplete() {
println("observer2 onComplete")
}
}
// subscribe 수행했으나 connect 전까지는 데이터 발행이 지연됨
connectableObservable.subscribe(observer1)
// 현 시점부터 데이터 발행
connectableObservable.connect()
runBlocking {
delay(200)
}
// observer2는 connect 이후 0.2초 간 발행된 0, 1에 대한 아이템은 발행받지 못함
connectableObservable.subscribe(observer2)
runBlocking {
delay(200)
}
}
//observer1 onSubscribe
//observer1 onNext = 0
//observer1 onNext = 1
//observer2 onSubscribe
//observer1 onNext = 2
//observer2 onNext = 2
//observer1 onNext = 3
//observer2 onNext = 3
이러한 ConnectableObservable은 하나의 Observable에 대해서 connect 전에 구독한 다수의 Observer들이 하나의 push를 동시에 받을 수 있도록 하기 위한 목적으로 사용된다.
기존 Cold Observable의 경우에는 구독한 Observer 마다 각각 push를 해주는 방식과는 차별점이 존재한다는 의미이다.
Hot Observable로의 변환
+ Subject
Hot Observable로의 변환을 수행하는 또 다른 방법으로는 Subject class를 이용하는 것이다.
Subject는 Observable과 Observer의 특성을 모두 가진다.
말로 설명하는 것 보다는 코드와 수행 동작을 보고 이해하는 것이 빠를 것이다.
fun main() {
val observable = Observable.interval(200, TimeUnit.MILLISECONDS)
val subject = PublishSubject.create<Long>()
val observer = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("onSubscribe")
}
override fun onNext(t: Long) {
println("onNext = $t")
}
override fun onError(e: Throwable) {
println("onError = $e")
}
override fun onComplete() {
println("onComplete")
}
}
// observer로서의 역할
observable.subscribe(subject)
// observable로서의 역할
subject.subscribe(observer)
runBlocking {
delay(400)
}
}
//onSubscribe
//onNext = 0
//onNext = 1
subject의 동작을 위해 PublishSubject를 생성하였다. PublishSubject에 대해서는 뒤에 알아보도록 하고, 일단은 Subject의 특성에 대해서만 확인한다.
observable을 subject가 구독하고 (Observer의 역할), 해당 subject를 observer가 구독한다 (Observable의 역할).
observable과 observer 사이에 위치하는 것 같은 형태를 띄는 subject는 그 종류에 따라서 다양한 Hot Observable로서의 역할을 수행한다. 각 subject에 따른 동작을 알아본다.
Hot Observable로의 변환
2. AsyncSubject
AsyncSubject는 구독한 observer들이 오직 observable의 마지막 값만을 수신 하는 동작을 가진다.
ReactiveX 문서 내 AsyncSubject의 마블 다이어그램으로 감을 잡고 코드 동작을 확인한다.
observer가 어느 시기에 subscribe를 수행하든 observable의 배출이 모두 완료될 때 모든 observer들은 배출된 값 중 가장 마지막의 값만을 받고 complete 된다.
fun main() {
val observable = Observable.just(1, 2, 3, 4, 5, 6, 7)
val asyncSubject = AsyncSubject.create<Int>()
val observer1 = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
println("observer 1 - onSubscribe")
}
override fun onNext(t: Int) {
println("observer 1 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 1 - onError = $e")
}
override fun onComplete() {
println("observer 1 - onComplete")
}
}
val observer2 = object : Observer<Int> {
override fun onSubscribe(d: Disposable) {
println("observer 2 - onSubscribe")
}
override fun onNext(t: Int) {
println("observer 2 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 2 - onError = $e")
}
override fun onComplete() {
println("observer 2 - onComplete")
}
}
// AsyncSubject가 observable를 구독
observable.subscribe(asyncSubject)
// observe1이 AsyncSubject를 구독
asyncSubject.subscribe(observer1)
// 0.4초 후 observer2가 AsyncSubject를 구독
runBlocking {
delay(400)
asyncSubject.subscribe(observer2)
}
}
//observer 1 - onSubscribe
//observer 1 - onNext = 7
//observer 1 - onComplete
//observer 2 - onSubscribe
//observer 2 - onNext = 7
//observer 2 - onComplete
1, 2, 3, 4, 5, 6, 7을 배출하는 observable을 AsyncSubject가 구독한다.
해당 AsyncSubject를 observer1이 구독하고, 0.4초 뒤 observer2가 구독해도 둘 다 observable의 마지막 배출 값인 7만을 받고 complete 된 것을 확인 할 수 있다.
만약 AsyncSubject가 Error를 발행했다면, 뒤이어 구독한 모든 observer들도 Error만을 받게 된다.
또한 눈여겨 볼 수 있는 부분은, 결과에서 보이듯 observer1이 complete 되고 observer2도 값을 받아 complete 되었다.
이를 통해 알 수 있는 건 AsyncSubject의 경우에는 값을 동시에 교차로 받는 인터리브 방식이 아니라는 점이다.
* 인터리브 방식 ex )
observer 1 - onNext = 7
observer 2 - onNext = 7
observer 1 - onComplete
observer 2 - onComplete
Hot Observable로의 변환
3. PublishSubject
PublishSubject는 ConnectableObservable에서 connect 이후 구독한 observer들은 구독한 시점부터 발행된 데이터만 받을 수 있는 방식과 유사하다.
PublishSubject는 이전에 발행된 데이터와 관계 없이 오직 구독한 시점에서부터 발행된 데이터만을 받을 수 있다.
이 역시도 PublishSubject의 마블 다이어그램으로 동작을 확인하고 코드를 보도록 한다.
구독한 시점부터 발행되는 데이터만을 취하는 형식을 가진다.
fun main() {
val observable = Observable.interval(200, TimeUnit.MILLISECONDS)
val publishSubject = PublishSubject.create<Long>()
val observer1 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer 1 - onSubscribe")
}
override fun onNext(t: Long) {
println("observer 1 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 1 - onError = $e")
}
override fun onComplete() {
println("observer 1 - onComplete")
}
}
val observer2 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer 2 - onSubscribe")
}
override fun onNext(t: Long) {
println("observer 2 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 2 - onError = $e")
}
override fun onComplete() {
println("observer 2 - onComplete")
}
}
// PublishSubject가 observable를 구독
observable.subscribe(publishSubject)
// observe1이 PublishSubject를 구독
publishSubject.subscribe(observer1)
// 0.4초 후 observer2가 publishSubject를 구독
runBlocking {
delay(400)
publishSubject.subscribe(observer2)
delay(400)
}
}
//observer 1 - onSubscribe
//observer 1 - onNext = 0
//observer 1 - onNext = 1
//observer 2 - onSubscribe
//observer 1 - onNext = 2
//observer 2 - onNext = 2
//observer 1 - onNext = 3
//observer 2 - onNext = 3
AsyncSubject의 코드와 마찬가지로 observable을 구독한 publishSubject를 observer1이 구독하고, 0.4초 뒤 observer2가 구독하는 형태이다. 바뀐 점은 observable이 0.2초마다 값을 발행하는 interval observable이라는 점이다.
observer 1은 observable이 값을 발행하기 시작한 시점부터 publishSubject를 구독했으므로 0부터 모든 값을 받는다.
그러나 observer 2는 observable이 값을 발행하기 시작한 시점부터 0.4초 뒤에 publishSubject를 구독했으므로
구독 전에 발행된 0, 1에 대한 값은 받지 못하고 2와 3만을 받게 된다.
Hot Observable로의 변환
4. BehaviorSubject
BehaviorSubject는 위의 AsyncSubject와 PublishSubject의 특성이 합쳐진 형태와 유사하다.
BehaviorSubject는 구독 시점의 바로 이전에 발행된 아이템과 구독 후 발행되는 아이템을 받을 수 있다.
이 역시도 BehaviorSubject의 마블 다이어그램을 보고 코드를 보도록 한다.
마블 다이어그램과 같이, 구독한 시점을 기준으로 바로 직전에 발행된 값과, 구독한 시점 이후 발행된 값을 받아온다.
fun main() {
val observable = Observable.interval(200, TimeUnit.MILLISECONDS)
val behaviorSubject = BehaviorSubject.create<Long>()
val observer1 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer 1 - onSubscribe")
}
override fun onNext(t: Long) {
println("observer 1 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 1 - onError = $e")
}
override fun onComplete() {
println("observer 1 - onComplete")
}
}
val observer2 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer 2 - onSubscribe")
}
override fun onNext(t: Long) {
println("observer 2 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 2 - onError = $e")
}
override fun onComplete() {
println("observer 2 - onComplete")
}
}
// BehaviorSubject가 observable를 구독
observable.subscribe(behaviorSubject)
// observe1이 BehaviorSubject를 구독
behaviorSubject.subscribe(observer1)
// 0.4초 후 observer2가 BehaviorSubject를 구독
runBlocking {
delay(400)
behaviorSubject.subscribe(observer2)
delay(400)
}
}
//observer 1 - onSubscribe
//observer 1 - onNext = 0
//observer 1 - onNext = 1
//observer 2 - onSubscribe
//observer 2 - onNext = 1
//observer 1 - onNext = 2
//observer 2 - onNext = 2
//observer 1 - onNext = 3
//observer 2 - onNext = 3
위 PublishSubject의 코드에서 PublishSubject만 BehaviorSubject로 바꾼 것이다.
observer 2의 결과에 차이가 있는데, onSubscribe가 수행된 시점 직전에 발행된 값인 1을 받아오고, 구독 이후에 발행된 값을 받아오는 것을 확인 할 수 있다.
구독 시점에 이전에 발행된 값이 없으면 초기값을 받아오게 되는데, 초기값은 BehaviorSubject.createDefault(item)을 통해 설정 할 수 있다.
Hot Observable로의 변환
5. ReplaySubject
ReplaySubject는 Cold Observerble과 유사하게 observer의 구독 시점과 상관없이 observable이 가지고 있는 모든 아이템을 다시 전달한다. 그래서 리플레이인 것이다.
ReplaySubject의 마블 다이어그램을 확인하고 코드를 확인해본다.
마블 다이어그램에서 확인 할 수 있듯이, 구독 시점과 상관없이 이전에 발행된 모든 값을 받아온다.
fun main() {
val observable = Observable.interval(200, TimeUnit.MILLISECONDS)
val replaySubject = ReplaySubject.create<Long>()
val observer1 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer 1 - onSubscribe")
}
override fun onNext(t: Long) {
println("observer 1 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 1 - onError = $e")
}
override fun onComplete() {
println("observer 1 - onComplete")
}
}
val observer2 = object : Observer<Long> {
override fun onSubscribe(d: Disposable) {
println("observer 2 - onSubscribe")
}
override fun onNext(t: Long) {
println("observer 2 - onNext = $t")
}
override fun onError(e: Throwable) {
println("observer 2 - onError = $e")
}
override fun onComplete() {
println("observer 2 - onComplete")
}
}
// ReplaySubject가 observable를 구독
observable.subscribe(replaySubject)
// observe1이 ReplaySubject를 구독
replaySubject.subscribe(observer1)
// 0.4초 후 observer2가 ReplaySubject를 구독
runBlocking {
delay(400)
replaySubject.subscribe(observer2)
delay(400)
}
}
//observer 1 - onSubscribe
//observer 1 - onNext = 0
//observer 1 - onNext = 1
//observer 2 - onSubscribe
//observer 2 - onNext = 0
//observer 2 - onNext = 1
//observer 1 - onNext = 2
//observer 2 - onNext = 2
//observer 1 - onNext = 3
//observer 2 - onNext = 3
이 역시도 위 PublishSubject의 코드에서 PublishSubject만 ReplaySubject로 바꾼 것이다.
observer 2의 결과에 차이가 있는데, onSubscribe가 수행된 시점과 상관없이 구독 이전에 발행되었던 0과 1을 모두 받아오는 것을 확인 할 수 있다. 즉, observer 1과 observer 2는 구독 시점의 차이가 있음에도 불구하고 두 observer는
observable이 발행한 0, 1, 2, 3을 모두 받은 것이다.
책 : http://www.kyobobook.co.kr/product/detailViewKor.laf?mallGb=KOR&ejkGb=KOR&barcode=9791161752976
코틀린 리액티브 프로그래밍 - 리부 차크라보티 지음 | 조승진 옮김 | 에이콘출판
* 책의 내용이 아닌 책의 내용을 통한 개인의 학습을 정리한 게시글입니다.
'Kotlin' 카테고리의 다른 글
연산자 - 결합, 그룹핑, 필터링 응용, 오류처리 (0) | 2021.12.27 |
---|---|
연산자 - 필터링, 변환, 축소 (0) | 2021.12.24 |
Observable / Observer (0) | 2021.12.23 |
Reactive Programming (0) | 2021.12.22 |
[Android, Kotlin] 제네릭의 in, out 키워드는 무엇일까? (9) | 2021.11.28 |