Rxjava从0到1

Rxjava框架在七猫的安卓客户端项目中几乎无处不在,框架的优点非常明显:链式编程、丰富的操作符、与其他主流框架的互动(retrofit、room等)、函数式编程的思想让异步事件看起来像同步事件一样符合人们的思维习惯,得益于这些优点也是被Google官方推荐使用。但欲戴王冠必承其重,从某种程度上来说,Rxjava的上手比较难的,在看了源码及Rxjava相关的文档后,有了耳目一新的理解,给大家带来一些分享,希望可以让大家对这款优秀的框架有不一样的认识。

一、Rxjava存在的意义

Rxjava最经典的一段代码就是:

//子线程遍历文件获取图片切换到主线程展示图片
Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

这段代码的链式调用让它看起来格式非常整洁有道,可是除了整洁之外还有什么,Rxjava给我们解决了什么问题让它如此受欢迎?毕竟我们不会为了代码好看去给项目中引入一个框架,好看背后的意义才是Rxjava的灵魂所在。在我们平日的代码中,我们会遇到很多这样的代码:

//设置点击事件
btn.setOnClickListener(new OnClickListener() {
    @Override
    public void onClick(View v) {
       
    }
});
//服务绑定
private ServiceConnection mConnection = new ServiceConnection() {
    @Override
    public void onServiceConnected(ComponentName className, IBinder service) {
        
    }
    @Override
    public void onServiceDisconnected(ComponentName arg0) {
        
    }
};
//异步请求
Call<List<Photo>> call = service.getUserInfo();
call.enqueue(new Callback<List<Photo>>() {
    @Override
    public void onResponse(Call<List<Photo>> call, Response<List<Photo>> response) {
       
    }
    @Override
    public void onFailure(Call<List<Photo>> call, Throwable t) {
        
    }
});

面对大量需要设置callback来做回调处理的异步事件,不可避免我们需要各种各样的回调接口类,需要创建、记住各式各样的回调接口类。即使我们只是需要知道一个回调结果而已,这时候我们可以做哪些优化呢?首先我们可以简化一下看看他们的特点,然后写成下面的形式:

interface Callback<View>{
  onClick(view);
}
interface Callback<ComponentName,IBinder>{
  onServiceConnected(ComponentName name,IBinder binder);
  onServiceDisconnected(ComponentName name);
}
interface Callback<Response>{
  onResponse(Response response);
  onFailure(Throwable throwable)
}

除此之外我们还能怎么优化?在我们的项目中有一个专门的异步任务回调封装类:

public interface ITaskCallBack<T> {
    void onTaskSuccess(T dataSource);
    void onTaskFail(T dataSource, int errorCode);
}

当把他们联系在一起时,就有了大胆的想法,我们完全可以用这个封装类来替换上面三个回调函数:

1、对于类似View点击事件不会出现failure情况,只需要一个回调的,只需要空实现onTaskFail即可

2、服务绑定需要传递不止一个参数,可以合并成一个对象T传递

用这两个方法,我们可以用这个封装类替换所有回调类,这时候如果再把名字做一下修改:

ITaskCallback改成Observer;

onTaskSuccess(T)方法改成onNext(T);

onTaskFail(T,int)方法改成onError(Throwable throwable);

那么这不就和Rxjava的Observer一个模子出来的嘛!所以Rxjava的一个基本要义就是统一了异步接口的形式,但是Rxjava想要的更多,我不仅能处理异步,我还要处理同步事件,毕竟我们实际编程面临的情况往往是两种同时存在。所以,我这个Callback做的事情就不是仅仅一个回调结果就草草了事了,因为你可能还有更多事情要做,所以我把我自己这个Callback再返回给你,你想做什么事情就继续处理吧,这个思想便催生了Rxjava的操作符,类似于map、flatmap、线程调度observeOn、subscribeOn等等。也正是需要满足这样一个核心诉求,所以Rxjava的链式编程设计模式,可以说从一开始就确定了,而不是为了好看、简洁选择了链式编程。

如果能实现同步、异步的统一,那么我们编程想完成的一个事情便是可以顺序的发生,像水“流”一样,不断朝着最终目的地“流”动,这个事件就是Observable,最终目的地就是我们创建的Observer,这期间你可以不断操作这个“流”,还可以通过操作符把多个“流”合并成一个,控制“流”的流速、更换管道等等都是后话。

二、Rxjava源码设计

前面我们知道了Rxjava的野心是想要完成同步、异步事件的统一这样一个伟业,接下来就看看实际的源码实现吧。秉承着物理学家费曼的学习思想

“What i can't create,i don't understand"

我不能创造的,就是我不理解的。

所以讲源码这块,不是通过分析源码流程,而是通过“造轮子”。这个过程可以让我们更深刻的了解Rxjava的设计。如果我们可以造出简易的Rxjava,那么才可以说我们理解了Rxjava。接下来我会参考Rxjava2源码,忽略掉dispose等其他代码,通过造轮子的过程尽量来走一遍源码的核心代码。

a.观察者模式

实现一个简单的观察者模式只需要两步:给被观察者设置观察者、被观察者调用观察者的回调函数

public class Observable{
  private Observer observer;
   void setObserver(Observer observer) {
        this.observer = observer;
    }

    void onChange() {
        observer.onNext();
    }
}

但Rxjava采用的并不是简单的观察者模式,这里我们只保留Observable的核心方法:subscribe订阅函数,相当于上面的setObserver函数,Observer类上面我们已经阐述了,看一下这两个类简化版:

//Observer
public interface Observer<T> {
    void onNext(T var1);
    void onError(Throwable t);
}
//Observable
public abstract class Observable<T> implements ObservableSource<T> {
    public abstract void subscribe(Observer<? super T> observer);
}
//ObservableSource
public interface ObservableSource<T>{
  void subscribe(Observer<? super T> observer);
}

注意看:

1、这里用ObservableSource来作为接口抽象出subscribe订阅这个能力,好处之一是方便后面在Observable类中定义各种操作符

2、观察者模式的核心在于被观察者持有观察者对象,而目前我们的代码中并没有体现这一点,因为我们会在Observable的子类中去做持有observer这件事

b.链式调用

为了实现链式调用我们需要加入创建操作符create(OnSubscribeObservable onsubscribe)和变换操作符map(Function<T,K> f),这是两个相对简单却非常有助于理解源码的操作符。此外我们参考了Rxjava源码的一个重要设计模式:装饰者模式来完成这一设计即:每一个操作符都需要返回一个Observable,那么,create操作符会返回ObservableCreate对象,map操作符会返回ObservableMap对象,它们都继承自Observable,但它们可以在基类Observable上增加自己需要实现的功能。

首先,在Observable类中添加create静态方法,这样就可以实现Observable.create(xxxx)形式的调用,为了和源码一致,这里保留了ObservableOnSubscribe类来实现数据发射,再加入非静态方法map方法

public abstract class Observable<T> implements ObservableSource<T> {
    static <T> ObservableCreate<T> create(ObservableOnSubscribe<T> observableOnSubscribe) {
        return new ObservableCreate<>(observableOnSubscribe);
    }
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
        return new ObservableMap<T,R>(this,mapper);
    }
    public abstract void subscribe(Observer<? super T> observer);
}

其次,实现ObservableCreate的关键在于,我要把Observer传给Observable对象,而这里采用的是内部类形式将observer

public class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
      //1.将我们的事件源头传递进来
        this.source = source;
    }

    @Override
    public void subscribe(Observer<? super T> observer) {
      //核心:将observer装饰成ObserverEmitter,传给source
        ObserverEmitter<T> emitter = new ObserverEmitter<T>(observer);
       //3.捕获异常
        try{
          source.subscribe(emitter);
        }catch(Exception e){
           emitter.onError(e);
        } 
    }
    //2.observer的代理类
    static final class ObserverEmitter<T> implements Observer<T> {
        private Observer<? super T> observer;

        public ObserverEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onError(Throwable t) {

        }

        @Override
        public void onNext(T var1) {
            //4.调用实际observer的onNext方法
            observer.onNext(var1);
        }
    }
}
public class ObservableMap<T, R> extends Observable<R> {
    private ObservableSource<T> source;
    public Function<? super T, ? extends R> mapper;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends R> mapper) {
        //1.将上游observable传递进来  
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(Observer<? super R> observer) {
      //核心:将observer装饰成ObserverEmitter,传给source
        source.subscribe(new ObserverMap<>(observer, mapper));
    }
    //2.observer的代理类
    static final class ObserverMap<T, R> implements Observer<T> {
        private final Observer<? super R> observer;
        public Function<? super T, ? extends R> mapper;
        public ObserverMap(Observer<? super R> observer, Function<? super T, ? extends R> mapper) {
            this.observer = observer;
            this.mapper = mapper;
        }
        @Override
        public void onNext(T var1) {
             R r=null;
            try {
              //3.捕获异常
                r = mapper.apply(var1);
            } catch (Exception e) {
                onError(e);
            }
            //4.调用实际observer的onNext方法
            observer.onNext(r);
        }

        @Override
        public void onError(Throwable t) {

        }
    }
}

实际调用:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableCreate.ObserverEmitter<String> emitter) {
                 emitter.onNext("abc");
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) {
                return s.length();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onNext(Integer var1) {
                 Log.d("myRxjava","var1: "+var1)
            }

            @Override
            public void onError(Throwable t) {

            }
        });

这里有几个关键点:

1.装饰者模式不仅是ObservableCreate装饰Observable,其内部类ObserverEmitter代理了Observer,并且像代理一样调用了实际observer的onNext和onError方法

2.如何持有观察者的?通过自己的内部类代理并持有,理解如何持有观察者是理解源码的核心

3.Rxjava为什么在某个环节出错总能走到最后的onError?从上面可以看到,在每个操作符中都有try-catch代码块。

c.整体的理解代码设计

map是一个非常简单、有助于我们理解源码设计的操作符,我们可以看到,Rxjava其实有三条“流”:

构建流:在没有执行调用subscribe函数之前。在Observable的构造方法中,只负责把上游Observable传递给下层Observable每一个Observable会成为下一个Observable的source

订阅流:在调用subscribe方法时发生。这个需要从最后我们创建的Observer倒着看,后面的操作符会把当前observer装饰成自己内部类的类型xxxObserver,并调用这一层subscribe方法传递observer,例如ObserverMap类,将上一个source与自己内部类新的observer建立subscribe关系,不断的进行包装装饰。所以subscribe方法可以理解成封装传递observer的过程。Observer-->ObserverMap-->ObserverCreate不断套娃。

经典的图:

回调流:当subscribe动作发生之后。像是在剥洋葱一样,进行一层一层回调xxxObserver的onNext()函数,直到我们最开始创建的Observer。ObserverCreate-->ObserverMap-->Observer,和订阅流是一个相反的过程。

订阅流和回调流对于理解Rxjava线程切换原理是至关重要的。

当然,最好的方式是自己能够在思考理解一遍之后,手写一遍,代码并不多,一共才百来行,却能感受到设计的精妙!源码中观察者模式、装饰者模式以及范型的使用都会让我们受益匪浅。

三、强大的调度能力

Rxjava的框架相当于给我们搭好了一个无限可能的舞台,如何组织调度事件、使用操作符写出“诗”一样的代码,就是全看开发者的理解和创造力了。

a.线程调度能力

线程调度是我们使用最多、也是Rxjava最为直观体现其优越之处的地方,仅仅通过一个简单的操作符就可以实现线程的切换。而subscribeOn和ObserveOn两个操作符的区别和原理如果没读过源码,而仅仅通过字面意思或者网上总结的结论,其实是很容易在真实场景中出错的。比如近期本人在做需求中,需要同时查询两张数据库表,这两个都是耗时操作需要放在子线程,所以当我不假思索的加上subscribeOn(Schedulers.io())时:

        //查询KMBook操作
        Observable<List<KMBookRecord>> kmBookObservable = kmBookDBProvider.queryAllBookRecords();
        //查询AudioBook操作
        Observable<List<AudioHistory>> audioBookObservable = kmBookDBProvider.queryAllAudioHistories();
        //合并两个结果
        Observable.zip(kmBookObservable, audioBookObservable, new BiFunction<List<KMBookRecord>, List<AudioHistory>,               List<CommonBookRecord>>() {  
            @Override
            public List<CommonBookRecord> apply(List<KMBookRecord> kmBookRecords, List<AudioHistory> audioHistories) throws Exception {
                ....
                return commonBookRecords;
            }
        }).subscribeOn(Schedulers.io());//切换到子线程

却发现并没有完成线程的切换,于是在小渠道收到了ANR问题。开始令我百思不得其解,然后发现我们所有的数据库操作都封装了一次线程切换:

return Observable.fromCallable(callable).compose(new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(AndroidSchedulers.from(databaseThread.getLooper()))
                        .observeOn(AndroidSchedulers.mainThread());
            }
        });

就是说我的subscribeOn切换线程是在observeOn切换后面再此切换。由于之前没有看过线程调度的源码,反而更让我不理解,为什么我后面加的subscribeOn切换操作符没有生效?现在结合前面说的订阅流和回调流,这里可以阐述subscribeOn和observeOn切换线程的原理:

subscribeOn:将当前subscribe()函数放在新的线程里执行,而subscribe函数是从最后一个往前开始回调流的,所以会影响前面所有操作符线程。

observeOn:将onNext()方法放在新的线程中,也就是实际调用onNext方法的时候切换线程,所以就切换了整个回调流,会对下游所有操作符生效。

而当这两个组合起来呢?显然由于是两个不同的流,subscribeOn并不会对observeOn切换产生影响,这也是我上面切换失败的原因所在。

b.多任务组合能力

从上面的例子我们用zip操作符将两个耗时操作组合到一起进行处理。如果没有Rxjava的情况下,我们需要在两个任务中分别判断另一个任务是否完成,两个任务都完成之后才能进行合并处理操作,这无疑需要创建中间变量、变量的多线程修改一堆让人头疼的问题,而Rxjava只需要通过zip操作符就可以轻松解决,至于如何实现的就需要阅读源码去感受其中的魅力了。

说到zip就不得不提与之相关的merge、concat操作符这三个都是合并操作符。试想这样一个场景,我们要处理两个耗时任务,且需要保证两个任务的执行顺序时,这比上面的合并两个耗时的复杂度多了一层,而concat就是天生来解决顺序问题的,而merge则是无序的合并。那么有没有更复杂的实际例子,也是有的:

首页不同的模块展示顺序由服务端下发配置文件,然后不同模块数据来自于不同的接口请求,同时我们要实现渐进的展示,不是等所有的接口返回再展示出数据。暂且不谈一个页面数据由多个接口返回的合理性,这样一个复杂场景,我们肯定是会需要定义全局数据集合来持有数据、判断各个接口返回情况,代码会非常的散。而Rxjava却可以“流”式解决这个问题,贴出一段伪代码:

networkApi.getColumns()
    .map(types -> {
        List<Observable<? extends List<? extends Item>>> requestObservableList = new ArrayList<>();
        for (String type : types) {
            switch (type) {
                case "a":
                    requestObservableList.add(
                        networkApi.getItemListOfTypeA().startWith(new ArrayList<ItemA>())
                    );
                    break;
                case "b":
                    requestObservableList.add(
                        networkApi.getItemListOfTypeB().startWith(new ArrayList<ItemB>())
                    );
                    break;
                case "c":
                    requestObservableList.add(
                        networkApi.getItemListOfTypeC().startWith(new ArrayList<ItemC>())
                    );
                    break;
            }
        }
        return requestObservableList;
    })
    .flatMap(requestObservables -> Observable.combineLatest(requestObservables, objects -> {
        List<Item> items = new ArrayList<>();
        for (Object response : objects) {
            items.addAll((List<? extends Item>) response);
        }
        return items;
    }))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(data -> {
        adapter.setData(data);
        adapter.notifyDataSetChanged();
    });

四、那么代价是什么

Rxjava的优点非常多,为人所称道,但是没有完美的框架,Rxjava也是有它的缺点的。

a.操作符学习成本高

上面的复杂情景实现,需要熟悉startWith、flatMap、combineLatest三个没那么简单的操作符,并且把它们恰到好处的组合在一起,可以说丝毫不亚于解一道数学题的难度。所以Rxjava的首要代价就是操作符的学习成本其实是比较高的,学习难度的曲线是陡增的,开始上手很简单,但是想用好、用来解决复杂问题是比较难的一件事。拿flatMap操作符来说,这个操作符和map操作符的区别,能讲清楚已经很不容易了,我们项目中flatMap有不少都是当作map在用。再比如Rxjava的Subject类作为和Observable一样不可或缺的事件对象,使用过这个类的人是相当少的,并不是因为遇不到使用它的场景,而是不了解的情况下压根想不到原来还可以用这个操作符来解决问题。我们项目中用的一个地方是,用户是否点击同意了隐私协议弹窗,这种是在用户点击之后发射事件的场景,就是Subject类的天然属性。

b.创建对象问题

Rxjava由于装饰者模式,每个操作符都会产生中间变量,这些变量在整个流事件结束之后才会被释放,因此产生了背压概念,通过背压策略来控制改善这一问题。所以就产生了另一个重要的大类Flowable。

c.内存泄漏问题

由于Rxjava对象的持有不能立马释放,如果我们再忘了释放就会导致内存泄漏(如下图)这就涉及到dispose的处理。

总结:上面的每一个Rxjava的痛点都需要对Rxjava不断学习、实践有足够的了解才能克服这些问题。本次分享旨在让大家了解Rxjava的设计理念、源码架构以及在解决实际问题时的强大之处,让大家后续在学习一个操作符时不仅仅是百度,可以自己通过源码分析,当然最重要的是其设计之美。