说明
mMessageHelper.appendLine("Observable emit 3");
理解成如下打Log,详见 https://github.com/tik5213/DUnit
Log.e("TAG","Observable emit 3");
rxjava2-编译器提示“The result of subscribe is not used”
如下书写,编译器提示 “The result of subscribe is not used”
Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
mMessageHelper.printLine("Observable emit 1");
emitter.onNext(1);
mMessageHelper.appendLine("Observable emit 2");
emitter.onNext(2);
mMessageHelper.appendLine("Observable emit 3");
emitter.onNext(3);
mMessageHelper.appendLine("Observable emit 4");
emitter.onNext(4);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
mMessageHelper.printLine("accept " + integer);
}
});
原因
没有对 subscribe 的返回的对象进行生命周期管理,可能产生内存泄露
解决方案
- 创建一个 CompositeDisposable 对象
- 将 subscribe 的返回值添加到上面创建的对象中
- 在适当的时候调 用CompositeDisposable 对象的 dispose() 方法(比如,在Activity将要销毁的时候)
改正后的代码
CompositeDisposable compositeDisposable = new CompositeDisposable();
@Override
public void callUnit() {
Disposable disposable = Observable.create(new ObservableOnSubscribe<Integer>() { // 第一步:初始化Observable
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
mMessageHelper.printLine("Observable emit 1");
emitter.onNext(1);
mMessageHelper.appendLine("Observable emit 2");
emitter.onNext(2);
mMessageHelper.appendLine("Observable emit 3");
emitter.onNext(3);
mMessageHelper.appendLine("Observable emit 4");
emitter.onNext(4);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
mMessageHelper.appendLine("accept " + integer);
}
});
compositeDisposable.add(disposable);
//在 Activity 的 onDestroy 的里面调
//compositeDisposable.dispose();
}
根本原因
rx 团队在 subscribe 方法上添加了 @CheckReturnValue 注解,当 FindBugs 检查到一个函数调用返回值被丢弃时会给出警告。
示例:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public int test(){
return 1 + 2;
}
参考网址
The result of subscribe is not used
onCompleted、onNext、onError 三者的关系
onCompleted():事件队列完结。RxJava 仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError():事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中,onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
RxJava1 Action1 与 Func1的区别
Func1与Action1非常相似,也是RxJava的一个接口,用于包装含有一个参数的方法。Func1 和 Action1 的区别在于,Func1 包装的是有返回值的方法。
RxJava1 在 map 变换时 return null 会继续执行,RxJava2 则直接报错
RxJava1 会继续执行
/**
* subscribeOn 放在第一位
* subscribeOn 只有第一次出现时才有效,并且,无论 subscribeOn 在哪个位置,永远控制 Observable 所在的执行线程。
*/
@DUnit(group = RxJava2Or1SchedulersGroup.class ,name = "RxJava1Schedulers_subscribeOn_fist - rx1 return null会继续执行")
public static class RxJava1Schedulers_subscribeOn_fist extends AbstractDisplayUnit {
@Override
public void callUnit() {
new Thread(new Runnable() {
@Override
public void run() {
Observable.unsafeCreate(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
mMessageHelper.printLine("onNext 1 io - thread:" + Thread.currentThread().getName());
subscriber.onNext(1);
}
})
.subscribeOn(rx.schedulers.Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
mMessageHelper.appendLine("main - thread:" + Thread.currentThread().getName());
return null;
}
})
.observeOn(rx.schedulers.Schedulers.newThread())
.map(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
mMessageHelper.appendLine("new - thread:" + Thread.currentThread().getName());
return null;
}
})
.observeOn(rx.schedulers.Schedulers.io())
.map(new Func1<Boolean, String>() {
@Override
public String call(Boolean aBoolean) {
mMessageHelper.appendLine("io - thread:" + Thread.currentThread().getName());
return null;
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
mMessageHelper.appendLine("main - thread:" + Thread.currentThread().getName());
return null;
}
})
.observeOn(rx.schedulers.Schedulers.io())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
mMessageHelper.appendLine("io - thread:" + Thread.currentThread().getName());
}
});
}
}, "testThread001")
.start();
}
}
结果:
msg: onNext 1 io - thread:RxIoScheduler-7
msg: main - thread:main
msg: new - thread:RxNewThreadScheduler-2
msg: io - thread:RxIoScheduler-6
msg: main - thread:main
msg: io - thread:RxIoScheduler-5
RxJava2 map 返回 null 会报错进入 error 中(或者闪退)
/**
* https://maxwell-nc.github.io/android/rxjava2-1.html
*
* http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2016/0907/6604.html
* 首先,创建Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext)和通知(onError/onComplete)。
* 其次,创建的Observer中多了一个回调方法onSubscribe,传递参数为Disposable ,Disposable相当于RxJava1.x中的Subscription,用于解除订阅。
* 你可能纳闷为什么不像RxJava1.x中订阅时返回Disposable,而是选择回调出来呢。官方说是为了设计成Reactive-Streams架构。
* 不过仔细想想这么一个场景还是很有用的,假设Observer需要在接收到异常数据项时解除订阅,
* 在RxJava2.x中则非常简便,如下操作即可。
*/
@DUnit(group = RxJava2Or1SchedulersGroup.class ,name = "RxJava2Schedulers_Single_Consumer - rx2 return null不会继续执行")
public static class RxJava2Schedulers_Single_Consumer extends AbstractDisplayUnit {
@Override
public void callUnit() {
Disposable disposable = Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> emitter) throws Exception {
mMessageHelper.appendLine("emitter new - thread:" + Thread.currentThread().getName());
emitter.onSuccess(123);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
mMessageHelper.appendLine("main - thread:" + Thread.currentThread().getName());
return null;
}
})
.observeOn(Schedulers.newThread())
.map(new Function<String, Boolean>() {
@Override
public Boolean apply(String s) throws Exception {
mMessageHelper.appendLine("new - thread:" + Thread.currentThread().getName());
return null;
}
})
.observeOn(io.reactivex.android.schedulers.AndroidSchedulers.mainThread())
.map(new Function<Boolean, Integer>() {
@Override
public Integer apply(Boolean aBoolean) throws Exception {
mMessageHelper.appendLine("main - thread:" + Thread.currentThread().getName());
return null;
}
})
.observeOn(io.reactivex.schedulers.Schedulers.single())
.toObservable()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
mMessageHelper.appendLine("accept success single - thread:" + Thread.currentThread().getName());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
mMessageHelper.appendLine("accept error single - thread:" + Thread.currentThread().getName());
}
});
if(SystemClock.currentThreadTimeMillis() % 2 == 0){
// disposable.dispose();
}
}
}
结果:
msg: emitter new - thread:RxNewThreadScheduler-1
msg: main - thread:main
msg: accept error single - thread:RxSingleScheduler-1