DEVLOG 2.7 RxJava(二)RxJava中的三种流,RxLifecycle和RxBus的简单实现
参考内容:RxJava动脑学院
代码仓库:https://github.com/kolibreath/Practices

基本上看完了这一系列的教程,感觉还可以。老师非常清楚地讲解了RxJava中一些重要的点,特别是对于RxJava中三个流的归纳我觉得非常到位。下面我来复盘一下最后一点的内容。不过最后RxJava和Retrofit的结合我没有看,因为RxJava有了协程之后基本不用了,协程取消的方式相对于RxJava更加简单,理解起来也更方便。
本文分成三个部分:
总结RxJava中的三个流,观察者流,订阅流,事件(回调)流
RxLifecycle的实现
RxBus的实现

RxJava中的三个流
为了更加清楚地整理和总结三个流,我将课程中的流程图自己复刻了一遍,对于其中的细节我也可以说明的更加清楚。
观察者流(链式调用流)
RxJava的链式调用通过手写的过程我们可以发现其实就是对于上游(upstream或者是source)的Observable的不断包装的过程

这个部分比较好理解,包装最开始的对象是ObservableOnSubscribe lambda,装饰器最后的生成的Observable会订阅Observer lambda,具体的流程如图。

订阅流
订阅流是三个流中最复杂的地方。回想一下之前手写实现map操作符时,会将传入的Observer变成一个MapObserver,然后交给source进行订阅。这个地方非常有意思,传入的Observer位于map操作符下游,被观察者source又是通过构造方法注入的上游的Observable。因此订阅流是从下往上的过程:

画成图像就是这样,下游的被观察者也通过Observer包装

仔细看看Observable和Observer交互的部分,N5虽然是下游的被观察者,但是它通过调用subscribe会间接调用被它装饰的(也就是N4)的subscribe方法。所以在图上使用虚线表示N5的subscribe对于N4的subscribe的间接调用关系。

事件回调流
最后一个部分是事件回调流。通过ObservableOnSubscribe#subscribe获取的Emitter执行onNext时,会一层一层地调用Observer#onNext,这样就实现了事件传递的目的。
这个部分在图中使用红色的虚线表示onNext

整体来看,整个事件调用的流程图如下:

这里有几个细节说一下:
最开始的Source表示ObsevableOnSubscribe,会通过subscribe【绑定】一个Emitter,然后这个Emitter发送消息
并且这个Emitter持有被包装的Observer的引用,所以最后的O6指向Emitter
subscribeOn和observeOn的作用分析
其实这两个操作符都是针对下游发生作用,但是需要注意的是订阅流是相反的,它的下游是调用链的上游。如果顺着subscribe链向下,也就是顺着调用链向上,在调用链下游设置的subscribeOn假设被执行到A线程,一定会被调用链上游的subscribeOn放到B线程去。因此就有在上游设置subscribeOn会覆盖下游的subscribeOn的说法。
RxLifecycle实现分析
实现目标,避免内存泄漏。
实现方式:
需要获取到当前的UI组件的生命周期状态,如果是onDestroy,正在执行的耗时操作需要被取消。
怎么取消?通过subscribe返回的操作句柄,disposable取消。
对于第一点,我们通过设计一个类,这个类实现Android X的LifecycleObserver,监听LifecycleOwner;对于第二点,我们使用CompositeDisposable批量管理。同时为了实现复用,我们使用compose操作符,传入一个ObservableTransformer,【拦截】Observable。
具体的代码如下(我直接在IntelliJ 中写的,所以是模拟Android X的接口 ):
RxBus的实现分析

如何在订阅之后收到发送的事件?使用publishSubject,其他的和别的Bus区别不大:

