RxJava 原理探究

RxJava 原理探究

关于 RxJava 的文章网上有很多,这里只是个人的学习总结,阅读前需要对 RxJava 有一定的了解,并不能作为学习资料,如果想要入门这里有更好的选择:给 Android 开发者的 RxJava 详解(虽然版本有点久,很多 API 都已过时,但原理差不多,不影响理解)。

如果只是想了解 API 的使用,可以看看 ReactiveX/RxJava文档中文版

什么是 RxJava

RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.

翻译一下就是:RxJava 是响应式扩展库在 Java 虚拟机上的实现,它使用观察者模式为异步和基于事件程序的编写提供便利。

为什么是 RxJava

关于 RxJava 的优点网上总结了很多,总的说来就是使得代码整洁、逻辑清晰。它能将本来一段逻辑复杂、外观参差错落的代码,变成一条逻辑清晰的长链,使得可阅读性和可维护性大大提高。

以下例子改编自《给 Android 开发者的 RxJava 详解》:

界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();

而如果使用 RxJava ,实现方式是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.from(folders)
.flatMap(new Function<File, Observable<File>>() {
@Override
public Observable<File> apply(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Function<File, Boolean>() {
@Override
public Boolean apply(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Function<File, Bitmap>() {
@Override
public Bitmap apply(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});

简单解释下这段代码:观察者发出数据源,然后对数据源进行三次数据类型变换,之后指定被观察者和观察者所在线程,最后对观察者进行订阅。

如果对 java 8 引入的 Stream 熟悉的话,就会发现这与对 Stream 的操作很像,从
flatMap()observeOn() 的操作相当于对流的 Intermediate 操作,而最后的 subscribe() 相当是对流的 Terminal 操作。而且前面的操作也都是惰性化(lazy)的,最后的订阅才会触发前面一系列的变换操作(其实 subscribeOn()observeOn() 这两个方法也是基于变换的)。怎么理解呢,就相当于 RxJava 库将这些操作先 “缓存” 起来了,直到最后的订阅观察者才将那些操作执行,怎么 “缓存” 是库做的事,逻辑上我们可以把那些操作看作是从上到下按顺序执行的。

可见,RxJava 虽然增加了程序代码,但是可阅读性和可扩展性大大增强。

了解过 RxJava 的人都会觉得 RxJava 很神奇,几行代码便完成了数据流的转换和线程的切换。那么它的内部原理是怎样的呢,通过阅读源码和网上的资料,我对 RxJava 的实现原理有了大概的了解,接下来就和大家分享一下。

对 RxJava 实现原理的粗浅理解

约定

由于 RxJava 同时应用了观察者模式(别名:发布-订阅模式)和生产者-消费者模式,所以其 API 的名称有点混乱。为了避免读者理解困难,这里约定如下:

  • Observable 及其子类称为 被观察者
  • Comsumer(消费者) 和 Observer(观察者) 及其子类统称为 观察者(两者在 RxJava 含义相似);
  • 由于 RxJava 订阅的动作(subscribe)定义在 Observale 中,给我们的感觉是 被观察者 订阅 观察者,因此为了叙述方便和避免含义混乱,被观察者 订阅 观察者观察者 订阅 观被察者 是同样的意思;
  • 本文基于 RxJava 2.0.7 版。

变换

变换是 RxJava 的亮点和理解上的难点,变换的作用是将一种数据类型转换成另一种数据类型。当被观察者发出的原始数据类型并不是观察者想要的数据类型时,我们就可以用变换来实现源数据类型和目的数据类型的转换。RxJava 中有很多种变换,如 map 变换、flatMap 变换、filter 变换等。其中 map 变换比较基础也较易理解,因此我们先从 map 变换讲起。

map 变换

map 变换能够将一种类型的数据源转换成另一种类型的数据源,比如要将 int 类型的图片资源 id 转换成 Bitmap 类型的对象并将其设置给一个 ImageView ,我们可以这样做:

1
2
3
4
5
6
7
8
9
10
11
12
13
Observble.just(R.id.img_example) // 1
.map(new Function<Integer, Bitmap>() { // 2
@Override
public Bitmap apply(Integer resourceId) {
return BitmapFactory.decodeResource(resourceId);
}
})
.subscribe(new Consumer<String>() { // 4
@Override
public void accept(@NonNull Bitmap b) throws Exception {
imageView.setImageBitmap(b);
}
});

上面的例子里将上游的 int 数据类型转换成了 Bitmap 类型的数据,供下游的观察者接收。这个过程涉及了一次 map 变换,我们将透过这个例子分析 map 变换的原理。

首先从变换方法 public Bitmap apply(Integer resourceId) 看起,这个方法的参数类型是 Integer,返回值类型是 Bitmap ,因此我们很容易猜想到它就是实现变换的核心方法。这个方法被包装后就传入了 map() 中,map() 的源码是这样的:

1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); // 3
}

方法中第一行验证 mapper 是否为空;第二行中的 RxJavaPlugin.onAssenbly() 是一个钩子方法,据说是用来调试 RxJava 库用的,你可以把它理解为什么都没做,因此 map() 方法可以简化为:return new ObservableMap<T, R>(this, mapper);,它返回的是一个 Observable 子类 ObservableMap 的一个实例。现在我重点看看 ObservableMap 这个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//用来做数据变换的函数式对象,就是 map() 的参数,且称其为变换器
final Function<? super T, ? extends U> function;
//source 是实际 Observalbe 对象,也就是 map 变换之前的那个 Observable
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
//调用 Observable 的 subscribe() 方法发起订阅时,subscribe() 最终会调用此方法
@Override
public void subscribeActual(Observer<? super U> t) { // 5
//新建一个 MapObserver 观察者,用来订阅实际 Observable 对象 source
source.subscribe(new MapObserver<T, U>(t, function)); // 6
}
//此类的构造方法传入的参数是变换器 mapper 和真实观察者 actual
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}

@Override
public void onNext(T t) { // 7
......
U v;
try {
//通过转换器把从原 Observable 对象发出的数据转化成真实观察者要求的数据
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
//如果出错,最终会调用 actual 的 onError() 方法
fail(ex); // 8.1
return;
}
//将转换后的数据传给真实观察者
actual.onNext(v); // 8.2
}
......
}

因为代码不多,我全部贴上来,便于大家理解。在上面的过程中,有这几类角色:1)实际被观察者;2)新创建的被观察者;3)实际观察者;4)新创建的观察者;5)转换器 mapper。它们的关系比较复杂,我们按照代码的执行顺序来将思路理一遍:

  • Observable 通过 just() 这个静态工厂方法创建了一个 Observable 对象,此对象即“实际被观察者”;
  • map() 方法接受一个转换器对象,在内部新建了一个被观察者后将其返回,也就是 “新创建的被观察者”;
  • 用 map 变换后返回的那个 “新创建的被观察者” 订阅 “实际观察者”,最终 “新创建的被观察者” 的 subscribeActual() 方法被调用,此方法触发的操作有:
  • 创建一个观察者,也就是 “新创建的观察者”,向 “实际的被观察者” 订阅;
  • 实际的被观察者发出数据,通过调用 “新创建的观察者” 的 onNext() 方法向其传递数据;
  • 在新创建的被观察者的内部先用 “转换器” 的 apply() 方法将数据类型进行转换,然后调用 “实际观察者” 的 onNext() 方法将转换后的数据传给 “实际观察者”

整个过程有点复杂,所以我在上述代码片段中用序号来表示各个过程的调用顺序,同时用一张图来表示 map 变换过程中以上几个角色的交互过程:

map 变换

说明:

  • A 表示实际被观察者,B 表示新创建的观察者,C 表示新创建的被观察者,D 表示实际的观察者,E 代表转换器;
  • 虚线框中的 B 和 C 是 map 变换导致的新创建的角色,它们没有发生订阅关系,但是 C 一旦被订阅就会导致 B 的创建和对实际观察者的订阅;
  • B 和 D 都是观察者,它们进行了直接的接触:B 将信息转换后交给 D;

如果大家理解了一次 map 变换的原理,连续两次 map 变换的的原理就不难理解了,在这里就不赘述了,贴一张和上面类似的交互图,请大家自行理解:

2次连续 map 变换

flatMap 变换

此处先占坑,日后来填