Rxjava中使用AutoDispose(版本1.4.0)解决内存泄漏的原理分析
原文
问题:AutoDispose主要是通过监听view的生命周期来解决使用Rxjava时的内存泄漏的,那么view的生命周期和Rxjava的链式调用是如何关联的?
在Activity中使用非常简单,入口如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
}
//分析点1,入口
}).as(AutoDispose.<String>autoDisposable(AndroidLifecycleScopeProvider.from(this)))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
分析点1:autoDisposable中传入的是ScopeProvider ,首先看一下AndroidLifecycleScopeProvider.from(this))是如何生成ScopeProvider
//分析点1,入口
as(AutoDispose.<String>autoDisposable(AndroidLifecycleScopeProvider.from(this)))
//分析点2
public static <T> AutoDisposeConverter<T> autoDisposable(final ScopeProvider provider) {
checkNotNull(provider, "provider == null");
//分析点3,分析比较靠后
return autoDisposable(completableOf(provider));
}
分析点2:AndroidLifecycleScopeProvider.from(this))的流程分析
public static AndroidLifecycleScopeProvider from(LifecycleOwner owner) {
//获取对应的lifecycle
return from(owner.getLifecycle());
}
public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle) {
//使用默认的生命周期对应关系
return from(lifecycle, DEFAULT_CORRESPONDING_EVENTS);
}
public static AndroidLifecycleScopeProvider from(
Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
//分析点4 封装lifecycle和boundaryResolver
return new AndroidLifecycleScopeProvider(lifecycle, boundaryResolver);
}
//这里是订阅和取消订阅对应的生命周期,比如在onCreate中订阅,那么在onDestroy中取消订阅
//重点啊,后边要用到,这里省略的方法是apply()
private static final CorrespondingEventsFunction<Lifecycle.Event> DEFAULT_CORRESPONDING_EVENTS =
lastEvent -> {
switch (lastEvent) {
case ON_CREATE:
return Lifecycle.Event.ON_DESTROY;
case ON_START:
return Lifecycle.Event.ON_STOP;
case ON_RESUME:
return Lifecycle.Event.ON_PAUSE;
case ON_PAUSE:
return Lifecycle.Event.ON_STOP;
case ON_STOP:
case ON_DESTROY:
default:
throw new LifecycleEndedException("Lifecycle has ended! Last event was " + lastEvent);
}
};
分析点4:AndroidLifecycleScopeProvider封装lifecycle和boundaryResolver
private AndroidLifecycleScopeProvider(
Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
//分析点5:将lifecycle封装为LifecycleEventsObservable
this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
this.boundaryResolver = boundaryResolver;
}
@Override
//重点啊,后边要用到 ,返回封装的被观察者对象lifecycleObservable
public Observable<Lifecycle.Event> lifecycle() {
return lifecycleObservable;
}
@Override
//重点啊,后边要用到,返回分析点2中提到的DEFAULT_CORRESPONDING_EVENTS
public CorrespondingEventsFunction<Lifecycle.Event> correspondingEvents() {
return boundaryResolver;
}
@Override
//重点啊,后边要用到 返回当前的生命周期
public Lifecycle.Event peekLifecycle() {
lifecycleObservable.backfillEvents();
return lifecycleObservable.getValue();
}
@Override
//重点啊,后边要用到
public CompletableSource requestScope() {
// 分析点6 :创建CompletableSource
return LifecycleScopes.resolveScopeFromLifecycle(this);
}
分析点5:将lifecycle封装为LifecycleEventsObservable,LifecycleEventsObservable在被订阅时,实现对view生命周期的绑定
class LifecycleEventsObservable extends Observable<Event> {
private final Lifecycle lifecycle;
private final BehaviorSubject<Event> eventsObservable = BehaviorSubject.create();
@SuppressWarnings("CheckReturnValue")
LifecycleEventsObservable(Lifecycle lifecycle) {
//带参的构造函数
this.lifecycle = lifecycle;
}
//返回生命周期
Event getValue() {
return eventsObservable.getValue();
}
/** * Backfill if already created for boundary checking. We do a trick here for corresponding events * where we pretend something is created upon initialized state so that it assumes the * corresponding event is DESTROY. */
void backfillEvents() {
@Nullable Lifecycle.Event correspondingEvent;
switch (lifecycle.getCurrentState()) {
case INITIALIZED:
correspondingEvent = ON_CREATE;
break;
case CREATED:
correspondingEvent = ON_START;
break;
case STARTED:
case RESUMED:
correspondingEvent = ON_RESUME;
break;
case DESTROYED:
default:
correspondingEvent = ON_DESTROY;
break;
}
eventsObservable.onNext(correspondingEvent);
}
@Override
//重点啊,后边要用到
//在被订阅时,实现对view生命周期的绑定
protected void subscribeActual(Observer<? super Event> observer) {
//创建一个生命周期观察者archObserver
ArchLifecycleObserver archObserver =
new ArchLifecycleObserver(lifecycle, observer, eventsObservable);
observer.onSubscribe(archObserver);
if (!isMainThread()) {
observer.onError(
new IllegalStateException("Lifecycles can only be bound to on the main thread!"));
return;
}
//添加View生命周期的观察者
lifecycle.addObserver(archObserver);
if (archObserver.isDisposed()) {
lifecycle.removeObserver(archObserver);
}
}
//View生命周期的观察者
static final class ArchLifecycleObserver extends MainThreadDisposable
implements LifecycleObserver {
private final Lifecycle lifecycle;
private final Observer<? super Event> observer;
private final BehaviorSubject<Event> eventsObservable;
ArchLifecycleObserver(
Lifecycle lifecycle,
Observer<? super Event> observer,
BehaviorSubject<Event> eventsObservable) {
this.lifecycle = lifecycle;
this.observer = observer;
this.eventsObservable = eventsObservable;
}
@Override
protected void onDispose() {
lifecycle.removeObserver(this);
}
@OnLifecycleEvent(Event.ON_ANY)
void onStateChange(@SuppressWarnings("unused") LifecycleOwner owner, Event event) {
if (!isDisposed()) {
if (!(event == ON_CREATE && eventsObservable.getValue() == event)) {
// Due to the INITIALIZED->ON_CREATE mapping trick we do in backfill(),
// we fire this conditionally to avoid duplicate CREATE events.
eventsObservable.onNext(event);
}
//重点啊,后边要用到
//view生命周期变化时,调用 LifecycleEventsObservable.onNext,观察者在分析点8中
observer.onNext(event);
}
}
}
}
分析点6:resolveScopeFromLifecycle()传入的是分析点4创建的AndroidLifecycleScopeProvider
@Override
public CompletableSource requestScope() {
//分析点6:
return LifecycleScopes.resolveScopeFromLifecycle(this);
}
//provider为AndroidLifecycleScopeProvider
public static <E> CompletableSource resolveScopeFromLifecycle(
final LifecycleScopeProvider<E> provider, final boolean checkEndBoundary)
throws OutsideScopeException {
//获取生命周期
E lastEvent = provider.peekLifecycle();
CorrespondingEventsFunction<E> eventsFunction = provider.correspondingEvents();
if (lastEvent == null) {
throw new LifecycleNotStartedException();
}
E endEvent;
try {
//eventsFunction为DEFAULT_CORRESPONDING_EVENTS 参考分析点2和4
//获取取消订阅的生命周期
endEvent = eventsFunction.apply(lastEvent);
} catch (Exception e) {
。。。。。。。
}
//传入LifecycleEventsObservable(参考分析点4)和取消订阅的生命周期
return resolveScopeFromLifecycle(provider.lifecycle(), endEvent);
}
//最终执行到这里,参数依次为LifecycleEventsObservable,endEvent和比较器
public static <E> CompletableSource resolveScopeFromLifecycle(
Observable<E> lifecycle, final E endEvent, @Nullable final Comparator<E> comparator) {
Predicate<E> equalityPredicate;
if (comparator != null) {
equalityPredicate = e -> comparator.compare(e, endEvent) >= 0;
} else {
equalityPredicate = e -> e.equals(endEvent);
}
//重点啊,后边要用到
//注意这里只返回了CompletableSource,并没有添加观察者进行订阅,在分析点8中添加的
//创建CompletableSource ,在满足取消订阅的条件时执行ignoreElements,
//ignoreElements只会执行onComplete和onError
return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
}
}
分析点3:provider为AndroidLifecycleScopeProvider(参考分析点2)
//分析点3,分析比较靠后
return autoDisposable(completableOf(provider));
public static Completable completableOf(ScopeProvider scopeProvider) {
return Completable.defer(
() -> {
try {
//返回的是分析点6中生成的 CompletableSource
return scopeProvider.requestScope();
} catch (OutsideScopeException e) {
。。。。。。。。。
});
}
//传入分析点6中生成的 CompletableSource
public static <T> AutoDisposeConverter<T> autoDisposable(final CompletableSource scope) {
checkNotNull(scope, "scope == null");
return new AutoDisposeConverter<T>() {
。。。。。。。。。。。。。。
@Override
//Rxjava的as()操作符中调用AutoDisposeConverter的apply(),返回一个ObservableSubscribeProxy对象
public ObservableSubscribeProxy<T> apply(final Observable<T> upstream) {
if (!AutoDisposePlugins.hideProxies) {
return new AutoDisposeObservable<>(upstream, scope);
}
return new ObservableSubscribeProxy<T>() {
。。。。。。。。。。。。
@Override
public Disposable subscribe(
Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
//当ObservableSubscribeProxy被订阅时,生成封装的AutoDisposeObservable(上游的Observable,分析点6中生成的 CompletableSource )
//分析点7
return new AutoDisposeObservable<>(upstream, scope).subscribe(onNext, onError);
}
。。。。。。。。。。。。。
}
。。。。。。。。。。。。。。。。。。。
}
分析点7:生成封装的AutoDisposeObservable(上游的Observable,分析点6中生成的 CompletableSource ),并执行subscribe,实际上就是执行了AutoDisposeObservable的subscribeActual()
//分析点7
return new AutoDisposeObservable<>(upstream, scope).subscribe(onNext, onError);
final class AutoDisposeObservable<T> extends Observable<T> implements ObservableSubscribeProxy<T> {
private final ObservableSource<T> source;
private final CompletableSource scope;
AutoDisposeObservable(ObservableSource<T> source, CompletableSource scope) {
//上游的Observable
this.source = source;
//分析点6中生成的 CompletableSource
this.scope = scope;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//分析点8:上游的Observable被封装的AutoDisposingObserverImpl()的观察者订阅
source.subscribe(new AutoDisposingObserverImpl<>(scope, observer));
}
}
分析点8:上面分析了那么多,这里才是真正的实现的地方。
上游的Observable被封装的AutoDisposingObserverImpl()的观察者订阅,重点在onSubscribe()的回调处理
final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDisposingObserver<T> {
。。。。。。。。。。。。。。。。。
AutoDisposingObserverImpl(CompletableSource scope, Observer<? super T> delegate) {
//分析点6中生成的 CompletableSource
this.scope = scope;
//原有的观察者
this.delegate = delegate;
}
@Override
public Observer<? super T> delegateObserver() {
return delegate;
}
@Override
public void onSubscribe(final Disposable d) {
//创建一个DisposableCompletableObserver ,只有onError和onComplete回调
//这个是和分析点6中提到的ignoreElements()是对应的
DisposableCompletableObserver o =
new DisposableCompletableObserver() {
@Override
//设置链式调用为Dispose
public void onError(Throwable e) {
scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposingObserverImpl.this.onError(e);
}
@Override
//设置链式调用为Dispose
public void onComplete() {
scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(mainDisposable);
}
};
if (AutoDisposeEndConsumerHelper.setOnce(scopeDisposable, o, getClass())) {
delegate.onSubscribe(this);
//分析点6中生成的 CompletableSource是没有添加观察者的
//反向订阅,最终回调到分析点5中LifecycleEventsObservable的subscribeActual()实现对view生命周期的绑定
//当生命周期发生变化时,会调用LifecycleEventsObservable.onNext(生命周期),
//这行代码翻译过来,实际上就是LifecycleEventsObservable.skip(1).takeUntil("是取消订阅的生命周期").ignoreElements().subscribe(DisposableCompletableObserver )
scope.subscribe(o);
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}
@Override
public boolean isDisposed() {
return mainDisposable.get() == AutoDisposableHelper.DISPOSED;
}
@Override
public void dispose() {
AutoDisposableHelper.dispose(scopeDisposable);
AutoDisposableHelper.dispose(mainDisposable);
}
@Override
public void onNext(T value) {
if (!isDisposed()) {
if (HalfSerializer.onNext(delegate, value, this, error)) {
// Terminal event occurred and was forwarded to the delegate, so clean up here
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(scopeDisposable);
}
}
}
@Override
public void onError(Throwable e) {
if (!isDisposed()) {
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(scopeDisposable);
HalfSerializer.onError(delegate, e, this, error);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(scopeDisposable);
HalfSerializer.onComplete(delegate, this, error);
}
}
}
总结:现在回答上文的问题
AutoDispose主要是通过监听view的生命周期来解决使用Rxjava时的内存泄漏的,那么view的生命周期和Rxjava的链式调用是如何关联的?
1:AndroidLifecycleScopeProvider.from(this))中获取当前view的lifecycle,并创建一个LifecycleEventsObservable(参考分析点5),在其subscribeActual中实现了对View生命周期的订阅。
当生命周期发生变化时,会执行LifecycleEventsObservable.onNext(“生命周期”).
2:在Rxjava的as操作符中AutoDisposeConverter.apply()中生成了观察者的代理实现类AutoDisposingObserverImpl(参考分析点8),并在onSubscribe中实现了观察者的订阅,经过翻译代码如下
LifecycleEventsObservable.skip(1).takeUntil("取消订阅生命周期").ignoreElements()
.subscribe(DisposableCompletableObserver )