在之前的博客中简单介绍了Rxjava的使用和与Retrofit的API配合使用;那么在这里我们来看下Rxjava的源码;
1.Observable–>(subscribe)Subscriber
首先来看完整代码:
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.drawable.t2);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) //Observable.OnSubscribe中Call方法所在线程
.observeOn(AndroidSchedulers.mainThread()) //Subscribe中回调所在的线程
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Drawable drawable) {
mBg.setImageDrawable(drawable);
}
1.1.首先来看被观察者的创建:create
//Observable.create
//参数onSubscribe的实例化 作为参数传入create中
//由于继承自Action1所以会有call方法需要实现
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
public interface Action1<T> extends Action {
void call(T t);
}
-----------------------------------------------------------------
@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
从上面代码可以看到 Observable.create其实进行两步操作:
1.将参数传入Observable的构造,实例化呗观察者
2.在Observable类中传入的Observable.OnSubscribe赋值给Observable的一个成员属性onSubscribe 在后面会有用到
而RxJavaHooks.onCreate(f)其实就是对Observable.OnSubscribe的一些封装,在这里不是很重要,所以没有必要关心:
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
if (f != null) {
return f.call(onSubscribe);
}
return onSubscribe;
}
看到没有其实返回的还是onSubscribe;
1.2被观察者 已经创建完成 然后开始订阅观察者:
.subscribe(new Subscriber<Drawable>()
ok 首先来看subscribe的源码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */
}
// new Subscriber so onStart it
subscriber.onStart();
/* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}
在上面代码中 第二个subscribe方法有点长 我们来看其核心代码: //首先来看
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
其实这个方法是对参数的一个检验吧 SafeSubscriber是Subscriber的子类 其中也复写了onError onNext onComplete方法
然侯看订阅的核心代码:
RxJavaHooks.onObservableStart(
observable,observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
上面的代码中 :
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable,
Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
RxJavaHooks.onObservableStart(observable,observable.onSubscribe)返回被观察者创建时传入的onSubscribe(可以看前面代码 有提到过); 然后.call(subscriber)将初始化的观察者传入OnSubscribe的call方法参数中;调用call方法 在被观察者创建时候在OnNext中传入参数 然后在订阅观察者的时候重写OnNext方法 对传入的参数进行操作 即如下代码:
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.drawable.t2);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) //Observable.OnSubscribe中Call方法所在线程
.observeOn(AndroidSchedulers.mainThread()) //Subscribe中回调所在的线程
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Drawable drawable) {
mBg.setImageDrawable(drawable);
}
其中:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
是对Observable.Onsubscribe—>call()和Subscribe中复写方法的线程指定,在前面博文 Rxjava从入门到熟练中有讲过;
这样到这里 罪常用的Rxjava调用就介绍完了;
2.Rxjava的不完全调用
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getResources().getDrawable(R.drawable.t2);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) //Observable.OnSubscribe中Call方法所在线程
.observeOn(AndroidSchedulers.mainThread()) //Subscribe中回调所在的线程
.subscribe(new Action1<Drawable>() {
@Override
public void call(Drawable drawable) {
mBg.setImageDrawable(drawable);
}
});
和之前的代码差不多 唯一的不能是 subscribe实例化 换成Action1
ActionX是Rxjava对外开放的一部分接口: ok~ 直接来看源码:
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext,
onError, onCompleted));
}
```在RXjava中ActionX 会根据传入的参数 返回对应的方法:
<div class="se-preview-section-delimiter"></div>
这里写代码片 “`
Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted
传入Throwable 就是onError(xxx)
传入空参 就是onComplete(Action0)
传入其他为onNext(xxx)
所以:
.subscribe(new Action1<Drawable>() {
@Override
public void call(Drawable drawable) {
mBg.setImageDrawable(drawable);
}
});
就是 直接会调onNext();
Rxjava的源码 先介绍到这里 之后的博文会对flmap map 等其他API的源码 进行分析 感谢阅读 期待高效的建议 谢谢~