DEVLOG 2.5 RxJava (一) 手写实现subscribeOn和observeOn操作符
参考内容:动脑学院 RxJava 课程
代码连接在这里:https://github.com/kolibreath/Practices/tree/master/HandsOnRxJava/core
有帮助的小伙伴可以给这篇文章投币 点赞 收藏 然后star一下我的代码仓库咯~

春节期间听了一下动脑学院关于RxJava的课程,虽然之前RxJava也经常用过,不过当时对于函数式编程的体验不深刻,特别是对于一些操作符根本没法理解。现在重新看看之后感觉好了很多。即使RxJava已经不如当时那么热门,现在通过手写一些操作符的方式更能更好地理解观察者模式和装饰器模式。
我个人感觉这门课还是非常好的,但是可能因为课程时常的限制,老师不能鞭辟入里的讲解实现思路。所以我今天来复盘一下老师讲的Observable#create,map,flatmap, subscribeOn以及observeOn的实现思路,并且指出老师的实现中可能存在的问题。
本文内容分成以下几个板块:
手写RxJava中几个类之间的关系
手写Observable#create
手写Observable#map
手写Observable#flatmap
手写subscribeOn和observeOn
目前的话我也只看到手写observeOn这里,虽然我可以将课程全部看完再进行总结,但是我还是习惯一边思考一边学习,不想浅尝辄止。如果在后面的课程中对于代码本身有所修正,以后面的课程为准。

手写RxJava中几个类之间的关系
RxJava是一种更加细致的观察者模式。在以往的观察者模式中,观察者(Observer)的引用被 被观察者(Observable)持有,事件通过被观察者调用观察者的方法传递。

我们来看看RxJava中是怎么处理的。在RxJava中抽象出了被观察者的顶级父类ObservableSource。实现的类如ObservableCreate,ObservableMapper继承Observable。同时事件传递的具体功能者通过Emitter实现 。
Observable和Emitter的绑定通过ObservableOnSubscribe实现。
绑定的具体含义我们通过Observable#create的实现来理解。

手写Observable#create
我们在写create之前理清一下思路:
我们是要进行链式调用,create一般是Observable的一个静态方法,并且后面需要跟一些如map,flatmap等等操作。所以我们不妨写一个类,ObservableCreate继承Observable。
事件的通知使用Emitter解耦,并且Emitter实例可以通过ObservableOnSubscribe的实现,因此ObservableCreate中需要持有一个ObservableOnSubscribe。
Emitter的具体实现需要指明。
基于以上三点,我相信ObservableCreate的思路还是相当清晰的:
之后需要在Observable中实现静态方法,并且返回ObservableCreate(作为Observable的一个实现):
这里有一个小细节,Emitter是直接调用对应的观察者的对应方法(Emitter#onNext和Observer#onNext相对应),这点和原始的观察者模式一模一样。
具体的思路图如下:
需要实现Observable#create 需要实现吧ObservableOnSubscribe Emitter,最后实现链式调用上的Observable#subscribe中的Observer lambda。

所以Observable#subscribe意味着Emitter#onNext会调用Observer#onNext

手写实现map和flatmap
map和flatmap都是转换操作符,他们的区别在于map是对于类型的转换。所以在我们首先可以在Observable中给出如下定义:
原类型是T,改变之后的类型是U。我们需要具体实现一个ObservableMap。
我们来整理一下思路:
上游的类型是T,我们通过实现Function进行一个转换,称为U类型
下游的观察者接收到的类型应该也是U类型
观察者的实现在Observable#subscribe中给出,我们可以通过【包装】这个Observer。具体的思路可以是在onNext中进行类型转换,然后再发射出去
具体的代码如下:
这里有一个细节,考虑一下为什么还需要再subscribeActual中重新调用Observable#subscribe? 在没有调用map之前,如果emitter发射一个事件,就会间接调用Observer#onNext,Observer中定义的onNext是通过Observer#subscribe传入的,因为我们装饰的MapObserver也调用了传入的Observer#onNext,所以需要重新进行subscribe。
flatmap和map的思路类似,但是flatmap是输出一个Observable,那必然不可能在onNext中直接进行映射类型,然后发射。
我们可以采取类似的思路,首先进行类型转换,将原始的T类型得到一个Obsevable<R>,使用这个Observable<R>,订阅传入的Observer lambda。
这里还是需要注意上面的细节,事件的传递是通过subscribe展开的。具体的代码如下,可以看看注释:

手写subscribeOn和observeOn
subscribeOn的具体的作用是指定事件发射的线程;observeOn是指定事件被接受的线程。在考虑具体的实现之前,我们首先看看RxJava的实现。对于以下代码,是否添加subscribeOn、observeOn的区别构成了三个版本:
都没有添加subscribeOn
添加了subscribeon无observeOn
两者都添加
三种代码的输出图如下:
如果看不清的小伙伴可以右键,选择在新标签页打开放大看。
相同的颜色表示在同一个线程。比如第二张图,subscribeOn是绿色,Map1和Map2,onNext都是绿色,表示在相同的线程。

可以观察到这样的规律:
在没有observeOn的情况下,subscribeOn可以影响所有和事件发射、转换相关的操作符的线程。并且影响Observer#onNext onComplete onError
在有observeOn的情况下只能影响它上面的操作符,并且observeOn指定了observer的线程。
onSubscribe不随着subscribeOn和observeOn的线程调度变化而变化。
特别是最后一条规律,在课程中老师写代码时,虽然直接在subscribe的时候立刻写observer.onSubscribe非常合理,但是仔细品一品的话还是有还原原版的RxJava的操作的味道的,因为要保证onSubscribe的线程不是subscribeOn指定的线程。
对于subscribeOn的实现,我们肯定需要将事件发射的线程切换走。调用subscribeOn的时候,其实我们只需要将observable.subscribe(observer)在其他线程执行即可,具体的原因我归结在了这张图中:

因为observable.subscribe(observer)意味着Emitter#onNext会调用observer#onNext,修改这里就会修改上面从create走出来的逻辑的线程。
记得上面说过的那个点,onSubscribe方法和创建初始的Observable的线程一致的问题吗?我们可以在observable.subscribe(observer)被调度之前调用observable#onSubscribe方法,但是这里必须要重新写一个Observer,这是为了避免onSubscribe被调用两次。
具体的代码如下:
observeOn就需要将接受事件的方法如Observer#onNext等放入新的线程。这里需要注意开一个队列挨个消费事件,这个是为了避免在事件消费过程中出现问题,需要终止消费行为。不过这里在课程中老师的消费事件的几个变量没有说明清楚作用,而且处理的机制存在问题。不过这个不是重点,我直接原样照搬过来了,着重体会在其他线程中处理事件的逻辑就行。
代码如下: