RxJava 原理探究
关于 RxJava 的文章网上有很多,这里只是个人的学习总结,阅读前需要对 RxJava 有一定的了解,并不能作为学习资料,如果想要入门这里有更好的选择:给 Android 开发者的 RxJava 详解(虽然版本有点久,很多 API 都已过时,但原理差不多,不影响理解)。
如果只是想了解 API 的使用,可以看看 ReactiveX/RxJava文档中文版。
什么是 RxJava
RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.
翻译一下就是:RxJava 是响应式扩展库在 Java 虚拟机上的实现,它使用观察者模式为异步和基于事件程序的编写提供便利。
为什么是 RxJava
关于 RxJava 的优点网上总结了很多,总的说来就是使得代码整洁、逻辑清晰。它能将本来一段逻辑复杂、外观参差错落的代码,变成一条逻辑清晰的长链,使得可阅读性和可维护性大大提高。
以下例子改编自《给 Android 开发者的 RxJava 详解》:
界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:
1 | new Thread() { |
而如果使用 RxJava ,实现方式是这样的:
1 | Observable.from(folders) |
简单解释下这段代码:观察者发出数据源,然后对数据源进行三次数据类型变换,之后指定被观察者和观察者所在线程,最后对观察者进行订阅。
如果对 java 8 引入的 Stream 熟悉的话,就会发现这与对 Stream
的操作很像,从flatMap()
到 observeOn()
的操作相当于对流的 Intermediate 操作,而最后的 subscribe()
相当是对流的 Terminal 操作。而且前面的操作也都是惰性化(lazy)的,最后的订阅才会触发前面一系列的变换操作(其实 subscribeOn()
和 observeOn()
这两个方法也是基于变换的)。怎么理解呢,就相当于 RxJava 库将这些操作先 “缓存” 起来了,直到最后的订阅观察者才将那些操作执行,怎么 “缓存” 是库做的事,逻辑上我们可以把那些操作看作是从上到下按顺序执行的。
可见,RxJava 虽然增加了程序代码,但是可阅读性和可扩展性大大增强。
了解过 RxJava 的人都会觉得 RxJava 很神奇,几行代码便完成了数据流的转换和线程的切换。那么它的内部原理是怎样的呢,通过阅读源码和网上的资料,我对 RxJava 的实现原理有了大概的了解,接下来就和大家分享一下。
对 RxJava 实现原理的粗浅理解
约定
由于 RxJava 同时应用了观察者模式(别名:发布-订阅模式)和生产者-消费者模式,所以其 API 的名称有点混乱。为了避免读者理解困难,这里约定如下:
Observable
及其子类称为 被观察者;Comsumer
(消费者) 和Observer
(观察者) 及其子类统称为 观察者(两者在 RxJava 含义相似);- 由于 RxJava 订阅的动作(subscribe)定义在
Observale
中,给我们的感觉是 被观察者 订阅 观察者,因此为了叙述方便和避免含义混乱,被观察者 订阅 观察者 和 观察者 订阅 观被察者 是同样的意思; - 本文基于 RxJava 2.0.7 版。
变换
变换是 RxJava 的亮点和理解上的难点,变换的作用是将一种数据类型转换成另一种数据类型。当被观察者发出的原始数据类型并不是观察者想要的数据类型时,我们就可以用变换来实现源数据类型和目的数据类型的转换。RxJava 中有很多种变换,如 map 变换、flatMap 变换、filter 变换等。其中 map 变换比较基础也较易理解,因此我们先从 map 变换讲起。
map 变换
map 变换能够将一种类型的数据源转换成另一种类型的数据源,比如要将 int
类型的图片资源 id 转换成 Bitmap
类型的对象并将其设置给一个 ImageView
,我们可以这样做:
1 | Observble.just(R.id.img_example) // 1 |
上面的例子里将上游的 int
数据类型转换成了 Bitmap
类型的数据,供下游的观察者接收。这个过程涉及了一次 map 变换,我们将透过这个例子分析 map 变换的原理。
首先从变换方法 public Bitmap apply(Integer resourceId)
看起,这个方法的参数类型是 Integer
,返回值类型是 Bitmap
,因此我们很容易猜想到它就是实现变换的核心方法。这个方法被包装后就传入了 map()
中,map()
的源码是这样的:
1 | public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { |
方法中第一行验证 mapper
是否为空;第二行中的 RxJavaPlugin.onAssenbly()
是一个钩子方法,据说是用来调试 RxJava 库用的,你可以把它理解为什么都没做,因此 map()
方法可以简化为:return new ObservableMap<T, R>(this, mapper);
,它返回的是一个 Observable
子类 ObservableMap
的一个实例。现在我重点看看 ObservableMap
这个类:
1 | public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { |
因为代码不多,我全部贴上来,便于大家理解。在上面的过程中,有这几类角色:1)实际被观察者;2)新创建的被观察者;3)实际观察者;4)新创建的观察者;5)转换器 mapper。它们的关系比较复杂,我们按照代码的执行顺序来将思路理一遍:
Observable
通过just()
这个静态工厂方法创建了一个Observable
对象,此对象即“实际被观察者”;map()
方法接受一个转换器对象,在内部新建了一个被观察者后将其返回,也就是 “新创建的被观察者”;- 用 map 变换后返回的那个 “新创建的被观察者” 订阅 “实际观察者”,最终 “新创建的被观察者” 的
subscribeActual()
方法被调用,此方法触发的操作有: - 创建一个观察者,也就是 “新创建的观察者”,向 “实际的被观察者” 订阅;
- 实际的被观察者发出数据,通过调用 “新创建的观察者” 的
onNext()
方法向其传递数据; - 在新创建的被观察者的内部先用 “转换器” 的
apply()
方法将数据类型进行转换,然后调用 “实际观察者” 的onNext()
方法将转换后的数据传给 “实际观察者”
整个过程有点复杂,所以我在上述代码片段中用序号来表示各个过程的调用顺序,同时用一张图来表示 map 变换过程中以上几个角色的交互过程:
说明:
- A 表示实际被观察者,B 表示新创建的观察者,C 表示新创建的被观察者,D 表示实际的观察者,E 代表转换器;
- 虚线框中的 B 和 C 是 map 变换导致的新创建的角色,它们没有发生订阅关系,但是 C 一旦被订阅就会导致 B 的创建和对实际观察者的订阅;
- B 和 D 都是观察者,它们进行了直接的接触:B 将信息转换后交给 D;
如果大家理解了一次 map 变换的原理,连续两次 map 变换的的原理就不难理解了,在这里就不赘述了,贴一张和上面类似的交互图,请大家自行理解:
flatMap 变换
此处先占坑,日后来填