大概从2015年开始,RxJava1.0开始快速流行起来,短短两年时间,RxJava在Android开发中已经算是无人不知无人不晓了,加之它与Retrofit等流行框架的完美结合,已经成为Android项目开发的必备利器。随手记作为一个大型项目,引入三方框架一直比较慎重,但也从今年初开始,正式引入了RxJava2.0,并配合Retrofit对项目的网络框架和繁琐的异步逻辑进行重构。RxJava虽然好用,但伴随而来的是不可避免的学习成本,为了让大家快速的了解RxJava的来龙去脉以及快速上手使用,特地总结该篇文章。本文将详细讲解如何快速理解RxJava的操作符,并从源码角度来分析RxJava操作符的原理。
RxJava的优点
简单来讲RxJava是一个简化异步调用的库,但其实它更是一种优雅的编程方式和编程思想,当你熟悉RxJava的使用方式之后,会很容易爱上它。 我总结它的优点主要有两个方面:
简洁,免除传统异步代码逻辑中的callback hell
增加业务逻辑代码的可读性
关于第一点大家应该都会认同,关于第二点可能有人会有疑惑,因为很多人觉得RxJava大量不明所以的操作符会让代码的可读性变得更差,其实产生这种印象恰恰就是因为没有掌握RxJava操作符的使用和原理所导致的。 比如随手记项目中绑定用户QQ账号的业务逻辑,这段逻辑的代码涉及三个异步接口,两个是QQ登录SDK的,一个是随手记后台的,在使用RxJava重构前,这段代码使用了3个AsyncTask,也就是三个嵌套的回调,代码复杂,可读性非常差。而改造之后,它变成了下面这样子
如果你对这里面的几个RxJava操作符比较熟悉的话,你会迅速了解我这段代码做了什么事情,而且不用再去梳理一堆嵌套回调了,这就是RxJava带来的可读性。 所以,学习RxJava,理解和掌握操作符是不可避免的第一步。
RxJava2.0与RxJava1.0的关系
从RxJava1.0到RxJava2.0,基本思想没有变化,但RxJava2.0按照Reactive-Streams规范对整个架构进行了重新设计,并变更了Maven仓库依赖地址和包名。所以现在RxJava的github网站中,RxJava1.0和RxJava2.0是两个独立的分支,不相互兼容 ,也不能同时使用,而且RxJava1.0再过一段时间也将不再维护。所以,目前还使用RxJava1.0的,建议尽早切换到RxJava2.0,而如果没有接触过RxJava1.0,直接使用和学习RxJava2.0就可以了。如果想了解RxJava1.0和RxJava2.0的详细区别,请参考官方文档。 为行文方便,从此处开始,本文使用Rx来表示RxJava2.x。
Rx的操作符有哪些
刚接触Rx的人面对一堆各式各样的操作符会觉得不知如何去学习记忆,其实你只需要从整体上了解Rx操作符的类别和掌握一些使用频率较高的操作符就足够了,至于其他的操作符,你只需要知道它的使用场景和掌握如何快速理解一个操作符的方法,就可以在需要的时候快速拿来用了。 下图是我根据官方文档总结的Rx操作符的分类及每个类别下的代表性操作符从上图可以看出,Rx的操作符主要十个大类别,每个类别下常用的操作符也就三五个左右,所以只要掌握这些,你就可以应付大部分的业务场景了。
如何快速理解一个Rx操作符
提到Rx操作符,相信很多人都会对描述Rx操作符的花花绿绿的宝石图有很大印象。要快速理解Rx操作符,看懂宝石图是个快捷有效的方式,现在我们就来详细分析一下构成宝石图的各个主要元素。 首先,我们有必要回顾一下Rx中的几个主要的基类
io .reactivex .Flowable
: 事件源(0..N个元素), 支持 Reactive-Streams and 背压io .reactivex .Observable
:事件源(0..N个元素), 不支持背压io .reactivex .Single
: 仅发射一个元素或产生error的事件源,io .reactivex .Completable
: 不发射任何元素,只产生completion或error的事件源io .reactivex .Maybe
: 不发射任何元素,或只发射一个元素,或产生error的事件源Subject
: 既是事件源,也是事件接受者 可以看到Rx中最重要的概念就是事件源
了,基本上所有的操作符都是针对事件源来进行一些转换、组合等操作,而我们最常用的事件源就是Observable
了。
本文中我们就以 Observable
事件源为例来讲解Rx的操作符, Observable
发射的事件我们统一称之为item。首先我们需要详细了解一下宝石图中各个图像元素的含义:
—>
:Observable
的时间线,从左至右流动★
:星星、圆、方块等表示Observable
发射的item|
:时间线最后的小竖线表示Observable
的事件流已经成功发射完毕了X
:时间线最后的X符合表示由于某种原因Observable
非正常终止发射,产生了error
上面几种元素组合在一起代表一个完整的 Observable
,也可以称为源 Observable
-->
方向朝下的虚线箭头表示以及中间的长方框表示正在对上面的源 Observable
进行某种转换。长方框里的文字展示了转换的性质。下面的 Observable
是对上面的源 Observable
转换后的结果。
掌握了宝石图的含义,我们就可以根据某个操作符的宝石图快速理解这个操作符了。举几个例子:
1. map
可以看到,这幅图表达的意思是一个源 Observable
先后发射了1、2、3的三个item,而经过 map
操作符一转换,就变成了一个发射了10、20、30三个item的新的 Observable
。描述操作符的长方框中也清楚的说明了该 map
操作符进行了何种具体的转换操作(图中的10*x只是一个例子,这个具体的转换函数是可以自定义的)。 于是,我们就很快速地理解了 map
操作符的含义和用法,简单来讲,它就是通过一个函数将一个 Observable
发射的item逐个进行某种转换。 示例代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer * 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer result) throws Exception {
Log.i(TAG, "accept : " + result +"\n" );
}
});
输出结果:
2. zip
根据 zip
的宝石图,可以知道zip操作符的作用是把多个源 Observable
发射的item通过特定函数组合在一起,然后发射组合后的item。从图中还可以看到一个重要的信息是,最终发射的item是对上面的两个源 Observable
发射的item按照发射顺序逐个组合的结果,而且最终发射的 1A
等item的发射时间是由组合它的 1
和 A
等item中发射时间较晚的那个item决定的,也正是如此, zip
操作符经常可以用在需要同时组合处理多个网络请求的结果的业务场景中。 示例代码:
Observable.zip(Observable.just(1, 2, 3),
Observable.just("A", "B", "C"),
new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, "zip : accept : " + s + "\n");
}
});
输出结果:
3. concat
从宝石图可以看出, concat
操作符的作用就是将两个源 Observable
发射的item连接在一起发射出来。这里的连接指的是整体连接,被 concat
操作后产生的 Observable
会先发射第一个源 Observable
的所有item,然后紧接着再发射第二个源 Observable
的所有的item。 示例代码:
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "\n");
}
});
输出结果:
大部分操作符都配有这样的宝石图,通过官方文档或者直接在Rx源码中查看JavaDoc就可以找到,不再过多举例。你也可以在rxmarbles这样的网站上查看更多可以动态交互的宝石图。
Rx操作符的原理
要了解操作符的原理,肯定要从源码入手喽。所以我们先来简单撸一遍Rx的最基本的Create操作符的源码。 Rx的源码目录结构是比较清晰的,我们先从 Observable.create
方法来分析
-
Observable .create (new ObservableOnSubscribe <String >() {
-
@Override
-
public void subscribe (@NonNull ObservableEmitter <String > e ) throws Exception {
-
e .onNext ("s" );
-
}
-
}). subscribe( new Observer< String>() {
-
@Override
-
public void onSubscribe (@NonNull Disposable d) {
-
// 创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。
-
}
-
-
@Override
-
public void onNext (@NonNull String s) {
-
-
}
-
-
@Override
-
public void onError (@NonNull Throwable e) {
-
-
}
-
-
@Override
-
public void onComplete () {
-
-
}
-
});
create
方法如下
-
@CheckReturnValue
-
@SchedulerSupport (SchedulerSupport .NONE )
-
public static < T> Observable <T > create (ObservableOnSubscribe <T > source ) {
-
ObjectHelper. requireNonNull( source, "source is null" );
-
return RxJavaPlugins. onAssembly( new ObservableCreate< T>( source));
-
}
代码很简单,第一行判空不用管,第二行调用 RxJavaPlugins
的方法是为了实现Rx的hook功能,我们暂时也无需关注,在一般情况下,第二行代码会直接返回它的入参即 ObservableCreate
对象, ObservableCreate
是 Observable
的子类,实现了 Observable
的一些抽象方法比如 subscribeActual
。事实上Rx的每个操作符都对应 Observable
的一个子类。 这里 create
方法接受的是一个 ObservableOnSubscribe
的接口实现类:
-
/**
-
* A functional interface that has a {@code subscribe()} method that receives
-
* an instance of an {@link ObservableEmitter} instance that allows pushing
-
* events in a cancellation-safe manner.
-
*
-
* @param <T> the value type pushed
-
*/
-
public interface ObservableOnSubscribe <T > {
-
-
/**
-
* Called for each Observer that subscribes.
-
* @param e the safe emitter instance, never null
-
* @throws Exception on error
-
*/
-
void subscribe (@NonNull ObservableEmitter <T > e ) throws Exception;
-
}
通过注释可以知道这个接口的作用是通过一个 subscribe
方法接受一个 ObservableEmitter
类型的实例,俗称发射器。 Observable.create
方法执行时,我们传入的就是一个 ObservableOnSubscribe
类型的匿名内部类,并实现了它的 subscribe
方法,然后它又被传入 create
方法的返回对象 ObservableCreate
,最终成为 ObservableCreate
的成员 source
-
public final class ObservableCreate <T > extends Observable< T> {
-
final ObservableOnSubscribe< T> source;
-
-
public ObservableCreate( ObservableOnSubscribe< T> source) {
-
this. source = source;
-
}
-
...
接着我们来看 Observable
的 subscribe
方法,它的入参是一个 Observer
(即观察者,也就是事件接收者)
-
@SchedulerSupport (SchedulerSupport .NONE )
-
@Override
-
public final void subscribe( Observer<? super T> observer) {
-
ObjectHelper. requireNonNull( observer, "observer is null" );
-
try {
-
observer = RxJavaPlugins. onSubscribe( this, observer);
-
-
ObjectHelper. requireNonNull( observer, "Plugin returned null Observer" );
-
-
subscribeActual (observer );
-
} catch (NullPointerException e) { // NOPMD
-
throw e ;
-
} catch (Throwable e) {
-
Exceptions. throwIfFatal( e);
-
// can't call onError because no way to know if a Disposable has been set or not
-
// can't call onSubscribe because the call might have set a Subscription already
-
RxJavaPlugins. onError( e);
-
-
NullPointerException npe = new NullPointerException( "Actually not, but can't throw other exceptions due to RS");
-
npe .initCause (e );
-
throw npe ;
-
}
-
}
最终它会调用它的子类 ObservableCreate
的 subscribeActual
方法:
-
@Override
-
protected void subscribeActual( Observer<? super T> observer) {
-
CreateEmitter< T> parent = new CreateEmitter <T >(observer );
-
observer .onSubscribe (parent );
-
-
try {
-
source .subscribe (parent );
-
} catch (Throwable ex) {
-
Exceptions. throwIfFatal( ex);
-
parent .onError (ex );
-
}
-
}
在 subscribeActual
里首先创建了用于发射事件的 CreateEmitter
对象 parent
, CreateEmitter
实现了接口 Emitter
和 Disposable
,并持有 observer
。 这段代码的关键语句是 source.subscribe(parent)
,这行代码执行后,就会触发事件源进行发射事件,即 e.onNext("s")
会被调用。细心的同学也会注意到这行代码之前, parent
先被传入了 observer
的 onSubscribe()
方法,而在上面我们说过, observer
的 onSubscribe()
方法接受一个 Disposable
类型的参数,可以用于解除订阅,之所以能够解除订阅,正是因为在触发事件发射之前调用了 observer
的 onSubscribe()
,给了我们调用 CreateEmitter
的解除订阅的方法 dispose()
的机会。 继续来看 CreateEmitter
的 onNext()
方法,它最终是通过调用 observer
的 onNext()
方法将事件发射出去的
-
static final class CreateEmitter <T >
-
extends AtomicReference <Disposable >
-
implements ObservableEmitter <T >, Disposable {
-
-
-
private static final long serialVersionUID = -3434801548987643227L ;
-
-
final Observer<? super T> observer;
-
-
CreateEmitter( Observer<? super T> observer) {
-
this. observer = observer;
-
}
-
-
@Override
-
public void onNext (T t ) {
-
if (t == null) {
-
onError (new NullPointerException ("onNext called with null. Null values are generally not allowed in 2.x operators and sources." ));
-
return;
-
}
-
// 在真正发射之前,会先判断该CreateEmitter是否已经解除订阅
-
if (!isDisposed ()) {
-
observer .onNext (t );
-
}
-
}
-
...
-
}
至此,Rx事件源的创建和订阅的流程就走通了。
下面我们从 map
操作符来入手看一下Rx操作符的原理, map
方法如下
-
@CheckReturnValue
-
@SchedulerSupport (SchedulerSupport .NONE )
-
public final < R> Observable <R > map (Function <? super T , ? extends R > mapper ) {
-
ObjectHelper. requireNonNull( mapper, "mapper is null" );
-
return RxJavaPlugins. onAssembly( new ObservableMap< T, R>( this, mapper));
-
}
map
方法接受一个Function类型的参数 mapper
,返回了一个 ObservableMap
对象,它也是继承自 Observable
,而 mapper
被传给了 ObservableMap
的成员 function
,同时当前的源 Observable
被传给 ObservableMap
的成员 source
,进入 ObservableMap
类
-
public final class ObservableMap <T , U > extends AbstractObservableWithUpstream< T, U> {
-
final Function<? super T, ? extends U> function ;
-
-
public ObservableMap( ObservableSource< T> source, Function <? super T , ? extends U > function) {
-
super( source);
-
this. function = function;
-
}
-
-
@Override
-
public void subscribeActual (Observer <? super U > t ) {
-
source .subscribe (new MapObserver <T , U >(t , function));
-
}
-
-
-
static final class MapObserver< T, U> extends BasicFuseableObserver <T , U > {
-
final Function<? super T, ? extends U> mapper;
-
-
MapObserver( Observer<? super U> actual, Function <? super T , ? extends U > mapper ) {
-
super( actual);
-
this. mapper = mapper;
-
}
-
-
@Override
-
public void onNext (T t ) {
-
if (done ) {
-
return;
-
}
-
-
if (sourceMode != NONE ) {
-
actual .onNext (null );
-
return;
-
}
-
-
U v ;
-
-
try {
-
v = ObjectHelper. requireNonNull( mapper. apply( t), "The mapper function returned a null value." );
-
} catch (Throwable ex) {
-
fail (ex );
-
return;
-
}
-
actual .onNext (v );
-
}
-
-
@Override
-
public int requestFusion (int mode) {
-
return transitiveBoundaryFusion (mode );
-
}
-
-
@Nullable
-
@Override
-
public U poll () throws Exception {
-
T t = qs .poll ();
-
return t != null ? ObjectHelper.< U> requireNonNull( mapper. apply( t), "The mapper function returned a null value." ) : null;
-
}
-
}
-
}
可以看到这里用到了装饰者模式, ObservableMap
持有来自它上游的事件源 source
, MapObserver
持有来自它下游的事件接收者和我们实现的转换方法 function
,在 subscribeActual()
方法中完成 ObservableMap
对 source
的订阅,触发 MapObserver
的 onNext()
方法,继而将来自 source
的原始数据经过函数 mapper
转换后再发射给下游的事件接收者,从而实现map这一功能。
现在我们终于能够来总结一下包含多个操作符时的订阅流程了,以下面这段代码为例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("holen");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return s.length();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
执行代码时,自上而下每一步操作符都会创建一个新的 Observable
(均为 Observable
的子类,对应不同的操作符),当执行 create
时,创建并返回了 ObservableCreate
,当执行 map
时,创建并返回了 ObservableMap
,并且每一个新的 Observable
都持有它上游的源 Observable
(即 source
)及当前涉及到的操作函数 function
。当最后一步执行订阅方法 subscribe
时会触发 ObservableMap
的 subscribeActual()
方法,并将最下游的 Observer
包装成 MapObserver
,同时该方法又会继续调用它所持有 ObservableCreate
的订阅方法(即执行 source.subscribe
),由此也会触发 ObservableCreate
的 subscribeActual()
方法,此时我们的发射器 CreateEmitter
才会调用它的 onNext()
方法发射事件,再依次调用 MapObserver
的操作函数 mapper
和 onNext()
方法,最终将事件传递给了最下游的 Observer
的 onNext()
方法。
我简单的将这段逻辑用下面这幅图来表示
操作符 lift
和 compose
lift
和 compose
在Rx中是两个比较特殊的操作符。 lift
让我们可以对 Observer
进行封装,在RxJava1.0中大部分变换都基于 lift
这个神奇的操作符。
-
@CheckReturnValue
-
@SchedulerSupport (SchedulerSupport .NONE )
-
public final < R> Observable <R > lift (ObservableOperator <? extends R , ? super T > lifter ) {
-
ObjectHelper. requireNonNull( lifter, "onLift is null" );
-
return RxJavaPlugins. onAssembly( new ObservableLift< R, T>( this, lifter));
-
}
lift
操作符接受一个 ObservableOperator
对象
-
/**
-
* Interface to map/wrap a downstream observer to an upstream observer.
-
*
-
* @param <Downstream> the value type of the downstream
-
* @param <Upstream> the value type of the upstream
-
*/
-
public interface ObservableOperator <Downstream , Upstream> {
-
/**
-
* Applies a function to the child Observer and returns a new parent Observer.
-
* @param observer the child Observer instance
-
* @return the parent Observer instance
-
* @throws Exception on failure
-
*/
-
@NonNull
-
Observer<? super Upstream > apply (@NonNull Observer <? super Downstream> observer) throws Exception ;
-
}
看注释可以知道,这是一个将下游订阅者包装成一个上游订阅者的接口。类似Map操作符中的MapObserver。
而 compose
操作符让我们可以对 Observable
进行封装
-
@SuppressWarnings ("unchecked" )
-
@CheckReturnValue
-
@SchedulerSupport (SchedulerSupport .NONE )
-
public final < R> Observable <R > compose (ObservableTransformer <? super T , ? extends R > composer ) {
-
return wrap (((ObservableTransformer <T , R >) ObjectHelper. requireNonNull( composer, "composer is null" )).apply (this ));
-
}
wrap
方法如下,仅仅是走了 RxJavaPlugins
的流程
-
@CheckReturnValue
-
@SchedulerSupport (SchedulerSupport .NONE )
-
public static < T> Observable <T > wrap (ObservableSource <T > source ) {
-
ObjectHelper. requireNonNull( source, "source is null" );
-
if (source instanceof Observable) {
-
return RxJavaPlugins. onAssembly(( Observable< T>) source);
-
}
-
return RxJavaPlugins. onAssembly( new ObservableFromUnsafeSource< T>( source));
-
}
compose
方法接受一个 ObservableTransformer
对象
-
/**
-
* Interface to compose Observables.
-
*
-
* @param <Upstream> the upstream value type
-
* @param <Downstream> the downstream value type
-
*/
-
public interface ObservableTransformer <Upstream , Downstream> {
-
/**
-
* Applies a function to the upstream Observable and returns an ObservableSource with
-
* optionally different element type.
-
* @param upstream the upstream Observable instance
-
* @return the transformed ObservableSource instance
-
*/
-
@NonNull
-
ObservableSource< Downstream> apply( @NonNull Observable< Upstream> upstream);
-
}
ObservableSource
即为我们的基类 Observable
继承的唯一接口。看注释可以知道, ObservableTransformer
是一个组合多个 Observable
的接口,它通过一个 apply()
方法接收上游的 Observable
,进行一些操作后,返回新的 Observable
。 这里组合多个 Observable
的意思其实就是组合多个操作符,比如我们经常会需要在使用Rx进行网络异步请求时进行线程变化,这个操作一般都是差不多的,每次都写会比较烦,这时我们就可以使用 compose
把常用的线程变换的几个操作符组合起来
-
private final ObservableTransformer schedulersObservable = new ObservableTransformer () {
-
@Override
-
public ObservableSource apply (Observable upstream) {
-
return upstream .subscribeOn (Schedulers .io ())
-
.unsubscribeOn (Schedulers .io ())
-
.observeOn (AndroidSchedulers .mainThread ());
-
}
-
};
-
-
protected void testCompose() {
-
getNetObservable ()
-
.compose (schedulersObservable )
-
.subscribe (new Consumer <String >() {
-
@Override
-
public void accept (@NonNull String s) throws Exception {
-
mRxOperatorsText .append (s );
-
}
-
});
-
}
关于 compose
的典型应用,大家有兴趣还可以去看一下开源项目RxLifecycle,它就是巧妙地利用 compose
操作符来解决了使用Rx可能会出现的内存泄露问题。
Rx操作符的应用场景
说了这么多,其实我们最关心的还是Rx操作符的应用场景。其实只要存在异步的地方,都可以优雅地使用Rx操作符。比如很多流行的Rx周边开源项目
而针对自己想要实现的功能情景,如何去选择特定的操作符,官网的文档中也列出了一些指导——Rx操作符决策树。
当然除了这些,我们在开发项目时,还会有各种具体的业务场景需要选择合适的操作符,这里我总结了一些经常遇到的场景以及适合它们的操作符
只要我们理解了Rx操作符的原理,熟练掌握了一些使用频率较高的操作符,就能够在以上场景中轻松地使用,不再让自己的代码被复杂的业务逻辑搞得混乱。
以上就是本文的全部内容,关于Rx还有很多东西值得深入地学习研究,后续有机会再跟大家分享更多Rx的使用心得。
参考
RxJava2Examples