RxJava中subscribe流程源码分析

接触RxJava算是比较早了,早期RxJava 1.0出来的时候就结合Retrofit然后搭配MVP框架使用了, 年初加入新公司也有用到RxJava不过中间比较长一段时间没有接触,准备彻底深入的学习一下。

首先,先来看一段代码:

 Observable.create(new ObservableOnSubscribe<String>() {
     @Override
     public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("1");
        e.onNext("2");
        e.onNext("3");
        e.onComplete();
        e.onNext("4");
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.w("Jayuchou", "=== onSubscribe === ");
   }

   @Override
   public void onNext(String s) {
      Log.w("Jayuchou", "=== onNext === " + s);
   }

   @Override
   public void onError(Throwable e) {
      Log.w("Jayuchou", "=== onError === ");
   }

   @Override
   public void onComplete() {
     Log.w("Jayuchou", "=== onComplete === ");
   }
});

代码跑起来很自然的输出了一些数据出来,从logcat复制粘贴看看一下:

com.neacy.rxjava W/Jayuchou: === onSubscribe ===
com.neacy.rxjava W/Jayuchou: === onNext === 1
com.neacy.rxjava W/Jayuchou: === onNext === 2
com.neacy.rxjava W/Jayuchou: === onNext === 3
com.neacy.rxjava W/Jayuchou: === onComplete === 

为什么e.onNext("4")这个没有输出呢,毕竟这么简单的一段代码肯定不会写错的,所以很有必要看看里面做了什么猫腻,这就是本篇文章所想表达的意图。

源码分析

我们从Observable.create声明开始看起,左手键盘右手鼠标轻轻一点:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
   ObjectHelper.requireNonNull(source, "source is null");
   return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

这里面的RxJavaPlugins.onAssembly返回的就是ObservableCreate所以我们进入ObservableCreate类看看里面做了什么操作:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}

好像这么一步的操作也没有什么非常神奇的操作,只是把我们new出来的ObservableOnSubscribe赋值给他的成员变量source,看过去声明过程并不是很复杂很麻烦的相对还是比较清晰。

那么我们看下subscribe之后发生了什么:

public final void subscribe(Observer<? super T> observer) {
   // 删除一些无关代码
   try {
       observer = RxJavaPlugins.onSubscribe(this, observer);
       subscribeActual(observer);
   }
   // 删除一些无关代码
   ...
}

RxJavaPlugins.onSubscribe返回的是我们传入的Observer对象用于接收onNext等操作的传递出来的数据。主要是subscribeActual这个方法,但是在Observable中这个方法是个abstract抽象方法,所以我们找到了ObservableCreate中的相对应的方法:

@Override
protected void subscribeActual(Observer<? super T> observer) {
   CreateEmitter<T> parent = new CreateEmitter<T>(observer);
   observer.onSubscribe(parent);
   try {
       source.subscribe(parent);
   } catch (Throwable ex) {
       Exceptions.throwIfFatal(ex);
       parent.onError(ex);
   }
}

一旦看到陌生的类我们就自然的点进去看看内部做了哪些操作,就比如CreateEmitter这个陌生的类。而这个类implements ObservableEmitter所以我们自然就找到了我们一直想要看的方法:

public interface Emitter<T> {
    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

我们再回头看一下subscribeActual这个方法,声明完CreateEmitter对象之后就直接执行了:

observer.onSubscribe(parent);

//执行完上面的方法会在AS的logCat输出如下的日志:
com.neacy.rxjava W/Jayuchou: === onSubscribe ===

这样子我们就明白了Observer.onSubscribe这个方法是一执行subscribe之后就会开始被调用了。
这时候方法就会继续执行下去,就会执行:

source.subscribe(parent);
这段代码很有意思就是直接代理调用我们在Activity中声明:

public void subscribe(ObservableEmitter<String> e) throws Exception {
     e.onNext("1");
     e.onNext("2");
     e.onNext("3");
     e.onComplete();
     e.onNext("4");
}

Ok,知道了什么时候会开始输出数据了,仿佛就流程就走的差不多了。但是,并不是这样的我们要看下具体每个方法做了什么为什么最后一个onNext没有输出呢?

onNext:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

看了一眼就会后悔,早知道不看了,确实onNext没有做什么操作直接通过observer把数据输出来,当然有稍微判断一下是否这个Observable是disposed。

onComplete:

public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}

其实onComplete的判断也是比较简单,不过我们要当心finally中的代码执行了dispose

public void dispose() {
   DisposableHelper.dispose(this);
}

// DisposableHelper类:
public static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
        // 把当前的Disposable赋值成DISPOSED 也就是说isDisposed = true
        current = field.getAndSet(d);
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            return true;
        }
    }
    return false;
}

所以很清楚了最后isDisposed = true这样子在onNext的时候不是会判断!isDisposed()这里就会返回false那么久无法再次执行onNext所以说这就是为什么执行了onComplete之后就不会再次执行onNext的原因了。
这时候你再看一眼onError你会发现跟onComplete是一样的原理。

总结:一旦执行了onComplete或者onError之后就不会再次执行onNext了。

    原文作者:Android源码分析
    原文地址: https://juejin.im/entry/5a44a4ec6fb9a0450e766acd
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞