RxJava2.X 源码解析(二) :神秘的取消订阅流程

RxJava2.X 源码解析(二) :神秘的取消订阅流程

Original
2017-07-25
Angels_安杰
码个蛋 码个蛋
码个蛋
《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

WeChat ID codeegg

Intro 每天更新优质文章:Android、职场干货,不定期大神语音分享。


作者博客

http://www.cherylgood.cn/

前言

基于RxJava2.1.1

我们在前一篇# RxJava2.0源码解析(一)初步分析了RxJava从创建到执行的流程

本篇我们将探索RxJava2.x提供给我们的Disposable能力的来源。

要相信,任何神奇的功能,当你探索了其本质之后,收获都是巨大的。

从Demo到原理

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

( ̄∇ ̄)猜猜会输出什么呢?

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

在发送玩hello之后,成功终止了后面的Reactive流。从结果我们还发现,后面的Reactive流被终止了,也就是订阅者或者观察者收不到后面的信息了,但是生产者或者说被订阅者、被观察者的代码还是会继续执行的。

Ok,我们从哪开始入手呢?我们发现,在我们执行了 disposable.dispose();后,触发了该事件,我们看下 disposable.dispose();到底做了什么呢,很开心的,我们点进 disposable.dispose();的源码,╮(╯_╰)╭,好吧,只是接口

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

此时我们要回忆一下上一篇的一段代码

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

我们之前分析到在执行source.subscribe(parent);触发数据分发事件之前先执行了observer.onSubscribe(parent);这句代码,所传入的 parent也就对应了我们的Disposable

parent是CreateEmitter类型的,但是CreateEmitter是实现了 Disposable接口的一个类。而parent又是我们的observer的一个包装后的对象。

OK,分析到这里我们来整理下前面的环节,根据Demo来解释下:首先在执行下面代码之前

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

先执行了observer.onSubscribe(parent);,我们在demo中也是通过传入的parent调用其dispose方法来终止 Reactive流,而执行分发hello等数据的e也是我们的parent,也就是他们都是同一个对象。而执行 e.onNext(“hello”);的e对象也是observer的一个包装后的ObservableEmitter类型的对象。

总结:Observer自己来控制了Reactive流状态。


Ok,此时如果我说关键点应该在ObservableEmitter这个类上面,你觉得可能性有多少呢?( ̄∇ ̄)

关键点就是CreateEmitter<T> parent = new CreateEmitter<T>(observer);这个包装的过程,我们来看下其源码

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》
《RxJava2.X 源码解析(二) :神秘的取消订阅流程》
《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

因为其实现了ObservableEmitter<T>, Disposable接口类,所以需实现其方法。这里其实是使用了装饰者模式,其魅力所在一会就会看到了。

我们可以看到在ObservableEmitter内部通过final Observersuper T> observer;存储了我们的 observer,这样有什么用呢?看Demo,我们在调用e.onNext(“hello”);时,调用的时ObservableEmitter对象的 onNext方法,然后ObservableEmitter对象的onNext方法内部再通过observer调用 onNext方法,但是从源码我们可以发现,其并不是简单的调用哦。

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

1、先判断传入的数据是否为null

2、判断isDisposed(),如果isDisposed()返回false则不执行onNext。

isDisposed()什么时候会返回false呢?按照demo,也就是我们调用了disposable.dispose();后,disposable前面分析了就是 CreateEmitter<T> parent,我们看CreateEmitter.dispose()

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

里面调用DisposableHelper.dispose(this);,我们看isDisposed()

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

RxJava的onComplete();与onError(t);只有一个会被执行的秘密原来是它?


再看另外两个方法的调用

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

其内部也基本做了同样的操作,先判断!isDisposed()后再决定是否执行。

但是再这里还有一点哦,我们应该知道onComplete();和onError(t)只有一个会发生,其实现原理也是通过isDisposed这个方法哦,我们可以看到,不关是先执行 onComplete();还是先执行onError(t),最终都会调用dispose();,而调用了dispose();后, isDisposed()为false,也就不会再执行另外一个了。而且如果人为先调用onError再调用onComplete, onComplete不会被触发,而且会抛出NullPointerException异常。

小结:

此时我们的目的基本达到了,我们知道了Reactive流是如何被终止的以及RxJava的onComplete();与 onError(t);只有一个会被执行的原因。

我们虽然知道了原因,但是秉着刨根问底的态度,抵挡不住内心的好奇,我还是决定挖一挖DisposableHelper这个类,当然如果不想了解DisposableHelper的话,看到这里也就可以了;

Ok,前面分析到,代码里调用了DisposableHelper类的静态方法,我们看下其调用的两个静态方法分别做了什么?

《RxJava2.X 源码解析(二) :神秘的取消订阅流程》

1、DISPOSED:作为是否要终止的枚举类型的标识

2、isDisposed:判断上次记录的终点标识的是否是 当前执行的Observer,如果是返回true

3、dispose:采用了原子性引用类AtomicReference,目的是防止多线程操作出现的错误。

更详细的分析放入了代码中

总结

通过本次,1、我们了解了RxJava的随意终止Reactive流的能力的来源;2、过程中也明白了RxJava的onComplete();与onError(t);只有一个会被执行的秘密。

实现该能力的主要方式还是利用了装饰者模式

从中体会了设计模式的魅力所在,当然我们还接触了AtomicReference这个类,在平时估计很少接触到。

后续会继续分析RxJava的各种魔力点。

    原文作者:Android源码分析
    原文地址: https://juejin.im/entry/59769b58f265da6c43675d35
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞