概述
书接上文,上节我们分析了Rxjava是如何对被观察线程进行调度的,这节我们来分析下Rxjava是如何对观察者线程进行调度的。还是之前的套路,先看个简单的demo。
简单的例子
private void doSomeWork() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.i("lx", " subscribe: " + Thread.currentThread().getName());
e.onNext("a");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String str) {
Log.i("lx", " onNext: " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i("lx", " onError: " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
Log.i("lx", " onComplete: " + Thread.currentThread().getName());
}
});
}
看看运行结果:
com.rxjava2.android.samples I/lx: onSubscribe: main
com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1
com.rxjava2.android.samples I/lx: onNext: main
com.rxjava2.android.samples I/lx: onComplete: main
从结果可以看出,事件的生产线程运行在RxCachedThreadScheduler-1中,而事件的消费线程则被调度到了main线程中。关键代码是因为这句.observeOn(AndroidSchedulers.mainThread())
。 下面我们着重分析下这句代码都做了哪些事情。
AndroidSchedulers.mainThread()
先来看看AndroidSchedulers.mainThread()
是什么?贴代码
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
注释已经说的很明白了,是一个在主线程执行任务的scheduler,接着看
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
if (f == null) {
return callRequireNonNull(scheduler);
}
return applyRequireNonNull(f, scheduler);
}
代码很简单,这个AndroidSchedulers.mainThread()
想当于new HandlerScheduler(new Handler(Looper.getMainLooper()))
,原来是利用Android
的Handler
来调度到main
线程的。
我们再看看HandlerScheduler
,它与我们上节分析的IOScheduler
类似,都是继承自Scheduler
,所以AndroidSchedulers.mainThread()
其实就是是创建了一个运行在main thread
上的scheduler。
好了,我们再回过头来看observeOn
方法。
observeOn
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
重点是这个new ObservableObserveOn
,看名字是不是有种似成相识的感觉,还记得上篇的ObservableSubscribeOn
吗? 它俩就是亲兄弟,是继承自同一个父类。
重点还是这个方法,我们前文已经提到了,Observable的subscribe方法最终都是调用subscribeActual
方法。下面看看这个方法的实现:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// scheduler 就是前面提到的 HandlerScheduler,所以进入else分支
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 创建 HandlerWorker
Scheduler.Worker w = scheduler.createWorker();
// 调用上游Observable的subscribe,将订阅向上传递
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
从上面代码可以看到使用了ObserveOnObserver
类对observer
进行装饰,好了,我们再来看看ObserveOnObserver
。
我们已经知道了,事件源发射的事件,是通过observer的onNext
,onError
,onComplete
发射到下游的。所以看看ObserveOnObserver
的这三个方法是如何实现的。
由于篇幅问题,我们只分析onNext
方法,onError
和onComplete
方法有兴趣的同学可以自己分析下。
@Override
public void onNext(T t) {
if (done) {
return;
}
// 如果是非异步方式,将上游发射的时间加入到队列
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
// 保证只有唯一任务在运行
if (getAndIncrement() == 0) {
// 调用的就是HandlerWorker的schedule方法
worker.schedule(this);
}
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
schedule
方法将传入的run
调度到对应的handle
所在的线程来执行,这个例子里就是有main
线程来完成。 再回去看看前面传入的run
吧。
回到ObserveOnObserver
中的run
方法:
@Override
public void run() {
// 此例子中代码不会进入这个分支,至于这个drainFused是什么,后面章节再讨论。
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
// 从队列中queue中取出事件
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//调用下游observer的onNext将事件v发射出去
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
至此我们明白了RXjava是如何调度消费者线程了。
消费者线程调度流程概括
Rxjava调度消费者现在的流程,以observeOn(AndroidSchedulers.mainThread())
为例。
-
AndroidSchedulers.mainThread()
先创建一个包含handler
的Scheduler
, 这个handler
是主线程的handler
。 -
observeOn
方法创建ObservableObserveOn
,它是上游Observable
的一个装饰类,其中包含前面创建的Scheduler
和bufferSize
等. - 当订阅方法
subscribe
被调用后,ObservableObserveOn
的subscribeActual
方法创建Scheduler.Worker
并调用上游的subscribe
方法,同时将自身接收的参数’observer’用装饰类ObserveOnObserver
装饰后传递给上游。 - 当上游调用被
ObserveOnObserver
的onNext
、onError
和onComplete
方法时,ObserveOnObserver
将上游发送的事件通通加入到队列queue
中,然后再调用scheduler
将处理事件的方法调度到对应的线程中(本例会调度到main thread)。 处理事件的方法将queue
中保存的事件取出来,调用下游原始的observer再发射出去。 - 经过以上流程,下游处理事件的消费者线程就运行在了
observeOn
调度后的thread中。
总结
经过前面两节的分析,我们已经明白了Rxjava是如何对线程进行调度的。
- Rxjava的
subscribe
方法是由下游一步步向上游进行传递的。会调用上游的subscribe
,直到调用到事件源。
如:
source.subscribe(xxx);
而上游的source
往往是经过装饰后的Observable
, Rxjava就是利用ObservableSubscribeOn
将subscribe
方法调度到了指定线程运行,生产者线程最终会运行在被调度后的线程中。但多次调用subscribeOn
方法会怎么样呢? 我们知道因为subscribe
方法是由下而上传递的,所以事件源的生产者线程最终都只会运行在第一次执行subscribeOn
所调度的线程中,换句话就是多次调用subscribeOn
方法,只有第一次有效。
- Rxjava发射事件是由上而下发射的,上游的
onNext
、onError
、onComplete
方法会调用下游传入的observer的对应方法。往往下游传递的observer对象也是经过装饰后的observer对象。Rxjava就是利用ObserveOnObserver
将执行线程调度后,再调用下游对应的onNext
、onError
、onComplete
方法,这样下游消费者就运行再了指定的线程内。 那么多次调用observeOn
调度不同的线程会怎么样呢? 因为事件是由上而下发射的,所以每次用observeOn
切换完线程后,对下游的事件消费都有效,比如下游的map操作符。最终的事件消费线程运行在最后一个observeOn
切换后线程中。 - 另外通过源码可以看到
onSubscribe
运行在subscribe
的调用线程中,这个就不具体分析了。