RxJava 原理探究

RxJava 原理探究

Posted by lwenkun on September 3, 2016

RxJava 原理探究

关于 RxJava 的文章网上有很多,这里只是个人的学习总结,阅读前需要对 RxJava 有一定的了解,并不能作为学习资料,如果想要入门这里有更好的选择:给 Android 开发者的 RxJava 详解(虽然版本有点久,很多 API 都已过时,但原理差不多,不影响理解)。

如果只是想了解 API 的使用,可以看看 ReactiveX/RxJava文档中文版

什么是 RxJava

RxJava is a Java VM implementation of ReactiveX (Reactive Extensions): a library for composing asynchronous and event-based programs by using observable sequences.

翻译一下就是:RxJava 是响应式扩展库在 Java 虚拟机上的实现,它使用观察者模式为异步和基于事件程序的编写提供便利。

为什么是 RxJava

关于 RxJava 的优点网上总结了很多,总的说来就是使得代码整洁、逻辑清晰。它能将本来一段逻辑复杂、外观参差错落的代码,变成一条逻辑清晰的长链,使得可阅读性和可维护性大大提高。

以下例子改编自《给 Android 开发者的 RxJava 详解》:

界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而如果使用 RxJava ,实现方式是这样的:

Observable.from(folders)
    .flatMap(new Function<File, Observable<File>>() {
        @Override
        public Observable<File> apply(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Function<File, Boolean>() {
        @Override
        public Boolean apply(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Function<File, Bitmap>() {
        @Override
        public Bitmap apply(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Bitmap>() {
        @Override
        public void accept(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

简单解释下这段代码:观察者发出数据源,然后对数据源进行三次数据类型变换,之后指定被观察者和观察者所在线程,最后对观察者进行订阅。

如果对 java 8 引入的 Stream 熟悉的话,就会发现这与对 Stream 的操作很像,从 flatMap()observeOn() 的操作相当于对流的 Intermediate 操作,而最后的 subscribe() 相当是对流的 Terminal 操作。而且前面的操作也都是惰性化(lazy)的,最后的订阅才会触发前面一系列的变换操作(其实 subscribeOn()observeOn() 这两个方法也是基于变换的)。怎么理解呢,就相当于 RxJava 库将这些操作先 “缓存” 起来了,直到最后的订阅观察者才将那些操作执行,怎么 “缓存” 是库做的事,逻辑上我们可以把那些操作看作是从上到下按顺序执行的。

可见,RxJava 虽然增加了程序代码,但是可阅读性和可扩展性大大增强。

了解过 RxJava 的人都会觉得 RxJava 很神奇,几行代码便完成了数据流的转换和线程的切换。那么它的内部原理是怎样的呢,通过阅读源码和网上的资料,我对 RxJava 的实现原理有了大概的了解,接下来就和大家分享一下。

对 RxJava 实现原理的粗浅理解

约定

由于 RxJava 同时应用了观察者模式(别名:发布-订阅模式)和生产者-消费者模式,所以其 API 的名称有点混乱。为了避免读者理解困难,这里约定如下:

  • Observable 及其子类称为 被观察者
  • Comsumer(消费者) 和 Observer(观察者) 及其子类统称为 观察者(两者在 RxJava 含义相似);
  • 由于 RxJava 订阅的动作(subscribe)定义在 Observale 中,给我们的感觉是 被观察者 订阅 观察者,因此为了叙述方便和避免含义混乱,被观察者 订阅 观察者观察者 订阅 观被察者 是同样的意思;
  • 本文基于 RxJava 2.0.7 版。

变换

变换是 RxJava 的亮点和理解上的难点,变换的作用是将一种数据类型转换成另一种数据类型。当被观察者发出的原始数据类型并不是观察者想要的数据类型时,我们就可以用变换来实现源数据类型和目的数据类型的转换。RxJava 中有很多种变换,如 map 变换、flatMap 变换、filter 变换等。其中 map 变换比较基础也较易理解,因此我们先从 map 变换讲起。

map 变换

map 变换能够将一种类型的数据源转换成另一种类型的数据源,比如要将 int 类型的图片资源 id 转换成 Bitmap 类型的对象并将其设置给一个 ImageView ,我们可以这样做:

Observble.just(R.id.img_example) // 1
         .map(new Function<Integer, Bitmap>() { // 2
              @Override
              public Bitmap apply(Integer resourceId) {
                 return BitmapFactory.decodeResource(resourceId);
              }
          })
          .subscribe(new Consumer<String>() { // 4
            @Override
            public void accept(@NonNull Bitmap b) throws Exception {
                imageView.setImageBitmap(b);
            }
        });

上面的例子里将上游的 int 数据类型转换成了 Bitmap 类型的数据,供下游的观察者接收。这个过程涉及了一次 map 变换,我们将透过这个例子分析 map 变换的原理。

首先从变换方法 public Bitmap apply(Integer resourceId) 看起,这个方法的参数类型是 Integer,返回值类型是 Bitmap ,因此我们很容易猜想到它就是实现变换的核心方法。这个方法被包装后就传入了 map() 中,map() 的源码是这样的:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
     ObjectHelper.requireNonNull(mapper, "mapper is null");
     return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); // 3
}

方法中第一行验证 mapper 是否为空;第二行中的 RxJavaPlugin.onAssenbly() 是一个钩子方法,据说是用来调试 RxJava 库用的,你可以把它理解为什么都没做,因此 map() 方法可以简化为:return new ObservableMap<T, R>(this, mapper);,它返回的是一个 Observable 子类 ObservableMap 的一个实例。现在我重点看看 ObservableMap 这个类:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    //用来做数据变换的函数式对象,就是 map() 的参数,且称其为变换器
    final Function<? super T, ? extends U> function;
    //source 是实际 Observalbe 对象,也就是 map 变换之前的那个 Observable
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    //调用 Observable 的 subscribe() 方法发起订阅时,subscribe() 最终会调用此方法
    @Override
    public void subscribeActual(Observer<? super U> t) {  // 5
        //新建一个 MapObserver 观察者,用来订阅实际 Observable 对象 source
        source.subscribe(new MapObserver<T, U>(t, function)); // 6
    }
    //此类的构造方法传入的参数是变换器 mapper 和真实观察者 actual
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) { // 7
            ......
            U v;
            try {
                //通过转换器把从原 Observable 对象发出的数据转化成真实观察者要求的数据
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                //如果出错,最终会调用 actual 的 onError() 方法
                fail(ex); // 8.1
                return;
            }
            //将转换后的数据传给真实观察者
            actual.onNext(v); // 8.2
        }
        ......
}

因为代码不多,我全部贴上来,便于大家理解。在上面的过程中,有这几类角色:1)实际被观察者;2)新创建的被观察者;3)实际观察者;4)新创建的观察者;5)转换器 mapper。它们的关系比较复杂,我们按照代码的执行顺序来将思路理一遍:

  • Observable 通过 just() 这个静态工厂方法创建了一个 Observable 对象,此对象即“实际被观察者”;
  • map() 方法接受一个转换器对象,在内部新建了一个被观察者后将其返回,也就是 “新创建的被观察者”;
  • 用 map 变换后返回的那个 “新创建的被观察者” 订阅 “实际观察者”,最终 “新创建的被观察者” 的 subscribeActual() 方法被调用,此方法触发的操作有:
  • 创建一个观察者,也就是 “新创建的观察者”,向 “实际的被观察者” 订阅;
  • 实际的被观察者发出数据,通过调用 “新创建的观察者” 的 onNext() 方法向其传递数据;
  • 在新创建的被观察者的内部先用 “转换器” 的 apply() 方法将数据类型进行转换,然后调用 “实际观察者” 的 onNext() 方法将转换后的数据传给 “实际观察者”

整个过程有点复杂,所以我在上述代码片段中用序号来表示各个过程的调用顺序,同时用一张图来表示 map 变换过程中以上几个角色的交互过程:

map 变换

说明:

  • A 表示实际被观察者,B 表示新创建的观察者,C 表示新创建的被观察者,D 表示实际的观察者,E 代表转换器;
  • 虚线框中的 B 和 C 是 map 变换导致的新创建的角色,它们没有发生订阅关系,但是 C 一旦被订阅就会导致 B 的创建和对实际观察者的订阅;
  • B 和 D 都是观察者,它们进行了直接的接触:B 将信息转换后交给 D;

如果大家理解了一次 map 变换的原理,连续两次 map 变换的的原理就不难理解了,在这里就不赘述了,贴一张和上面类似的交互图,请大家自行理解:

2次连续 map 变换

flatMap 变换

此处先占坑,日后来填