Androidx:Rxjava2中使用AutoDispose解决内存泄漏的原理分析

Rxjava中使用AutoDispose(版本1.4.0)解决内存泄漏的原理分析
原文

问题:AutoDispose主要是通过监听view的生命周期来解决使用Rxjava时的内存泄漏的,那么view的生命周期和Rxjava的链式调用是如何关联的?

在Activity中使用非常简单,入口如下:

  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            }
            //分析点1,入口
        }).as(AutoDispose.<String>autoDisposable(AndroidLifecycleScopeProvider.from(this)))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {

                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        
                    }
                });

分析点1:autoDisposable中传入的是ScopeProvider ,首先看一下AndroidLifecycleScopeProvider.from(this))是如何生成ScopeProvider

 //分析点1,入口
as(AutoDispose.<String>autoDisposable(AndroidLifecycleScopeProvider.from(this)))

//分析点2
 public static <T> AutoDisposeConverter<T> autoDisposable(final ScopeProvider provider) {
    checkNotNull(provider, "provider == null");
    //分析点3,分析比较靠后
    return autoDisposable(completableOf(provider));
  }

分析点2:AndroidLifecycleScopeProvider.from(this))的流程分析

 public static AndroidLifecycleScopeProvider from(LifecycleOwner owner) {
   //获取对应的lifecycle
    return from(owner.getLifecycle());
  }
  
 public static AndroidLifecycleScopeProvider from(Lifecycle lifecycle) {
    //使用默认的生命周期对应关系
    return from(lifecycle, DEFAULT_CORRESPONDING_EVENTS);
  }
  
 public static AndroidLifecycleScopeProvider from(
      Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
    //分析点4 封装lifecycle和boundaryResolver
    return new AndroidLifecycleScopeProvider(lifecycle, boundaryResolver);
  }

//这里是订阅和取消订阅对应的生命周期,比如在onCreate中订阅,那么在onDestroy中取消订阅
//重点啊,后边要用到,这里省略的方法是apply()
private static final CorrespondingEventsFunction<Lifecycle.Event> DEFAULT_CORRESPONDING_EVENTS =
      lastEvent -> {
        switch (lastEvent) {
          case ON_CREATE:
            return Lifecycle.Event.ON_DESTROY;
          case ON_START:
            return Lifecycle.Event.ON_STOP;
          case ON_RESUME:
            return Lifecycle.Event.ON_PAUSE;
          case ON_PAUSE:
            return Lifecycle.Event.ON_STOP;
          case ON_STOP:
          case ON_DESTROY:
          default:
            throw new LifecycleEndedException("Lifecycle has ended! Last event was " + lastEvent);
        }
      };

分析点4:AndroidLifecycleScopeProvider封装lifecycle和boundaryResolver

private AndroidLifecycleScopeProvider(
      Lifecycle lifecycle, CorrespondingEventsFunction<Lifecycle.Event> boundaryResolver) {
    //分析点5:将lifecycle封装为LifecycleEventsObservable
    this.lifecycleObservable = new LifecycleEventsObservable(lifecycle);
    this.boundaryResolver = boundaryResolver;
  }

  @Override
  //重点啊,后边要用到 ,返回封装的被观察者对象lifecycleObservable
  public Observable<Lifecycle.Event> lifecycle() {
    return lifecycleObservable;
  }

  @Override
  //重点啊,后边要用到,返回分析点2中提到的DEFAULT_CORRESPONDING_EVENTS 
  public CorrespondingEventsFunction<Lifecycle.Event> correspondingEvents() {
    return boundaryResolver;
  }

  @Override
  //重点啊,后边要用到 返回当前的生命周期
  public Lifecycle.Event peekLifecycle() { 
    lifecycleObservable.backfillEvents();
    return lifecycleObservable.getValue();
  }

  @Override
  //重点啊,后边要用到
  public CompletableSource requestScope() {
    // 分析点6 :创建CompletableSource 
    return LifecycleScopes.resolveScopeFromLifecycle(this);
  }

分析点5:将lifecycle封装为LifecycleEventsObservable,LifecycleEventsObservable在被订阅时,实现对view生命周期的绑定

class LifecycleEventsObservable extends Observable<Event> {

  private final Lifecycle lifecycle;
  private final BehaviorSubject<Event> eventsObservable = BehaviorSubject.create();

  @SuppressWarnings("CheckReturnValue")
  LifecycleEventsObservable(Lifecycle lifecycle) {
   //带参的构造函数
    this.lifecycle = lifecycle;
  }
  //返回生命周期
  Event getValue() {
    return eventsObservable.getValue();
  }

  /** * Backfill if already created for boundary checking. We do a trick here for corresponding events * where we pretend something is created upon initialized state so that it assumes the * corresponding event is DESTROY. */
  void backfillEvents() {
    @Nullable Lifecycle.Event correspondingEvent;
    switch (lifecycle.getCurrentState()) {
      case INITIALIZED:
        correspondingEvent = ON_CREATE;
        break;
      case CREATED:
        correspondingEvent = ON_START;
        break;
      case STARTED:
      case RESUMED:
        correspondingEvent = ON_RESUME;
        break;
      case DESTROYED:
      default:
        correspondingEvent = ON_DESTROY;
        break;
    }
    eventsObservable.onNext(correspondingEvent);
  }

  @Override
  //重点啊,后边要用到 
  //在被订阅时,实现对view生命周期的绑定
  protected void subscribeActual(Observer<? super Event> observer) {
   //创建一个生命周期观察者archObserver
    ArchLifecycleObserver archObserver =
        new ArchLifecycleObserver(lifecycle, observer, eventsObservable);
    observer.onSubscribe(archObserver);
    if (!isMainThread()) {
      observer.onError(
          new IllegalStateException("Lifecycles can only be bound to on the main thread!"));
      return;
    }
    //添加View生命周期的观察者
    lifecycle.addObserver(archObserver);
    if (archObserver.isDisposed()) {
      lifecycle.removeObserver(archObserver);
    }
  }

  //View生命周期的观察者
  static final class ArchLifecycleObserver extends MainThreadDisposable
      implements LifecycleObserver {
    private final Lifecycle lifecycle;
    private final Observer<? super Event> observer;
    private final BehaviorSubject<Event> eventsObservable;

    ArchLifecycleObserver(
        Lifecycle lifecycle,
        Observer<? super Event> observer,
        BehaviorSubject<Event> eventsObservable) {
      this.lifecycle = lifecycle;
      this.observer = observer;
      this.eventsObservable = eventsObservable;
    }

    @Override
    protected void onDispose() {
      lifecycle.removeObserver(this);
    }

    @OnLifecycleEvent(Event.ON_ANY)
    void onStateChange(@SuppressWarnings("unused") LifecycleOwner owner, Event event) {
      if (!isDisposed()) {
        if (!(event == ON_CREATE && eventsObservable.getValue() == event)) {
          // Due to the INITIALIZED->ON_CREATE mapping trick we do in backfill(),
          // we fire this conditionally to avoid duplicate CREATE events.
          eventsObservable.onNext(event);
        }
        //重点啊,后边要用到
        //view生命周期变化时,调用 LifecycleEventsObservable.onNext,观察者在分析点8中
        observer.onNext(event);
      }
    }
  }
}

分析点6:resolveScopeFromLifecycle()传入的是分析点4创建的AndroidLifecycleScopeProvider

 @Override
  public CompletableSource requestScope() {
    //分析点6:
    return LifecycleScopes.resolveScopeFromLifecycle(this);
  }
 //provider为AndroidLifecycleScopeProvider
 public static <E> CompletableSource resolveScopeFromLifecycle(
      final LifecycleScopeProvider<E> provider, final boolean checkEndBoundary)
      throws OutsideScopeException {
      //获取生命周期
    E lastEvent = provider.peekLifecycle();
    CorrespondingEventsFunction<E> eventsFunction = provider.correspondingEvents();
    if (lastEvent == null) {
      throw new LifecycleNotStartedException();
    }
    E endEvent;
    try {
     //eventsFunction为DEFAULT_CORRESPONDING_EVENTS 参考分析点2和4
     //获取取消订阅的生命周期
      endEvent = eventsFunction.apply(lastEvent);
    } catch (Exception e) {
      。。。。。。。
    }
    //传入LifecycleEventsObservable(参考分析点4)和取消订阅的生命周期
    return resolveScopeFromLifecycle(provider.lifecycle(), endEvent);
  }
  
//最终执行到这里,参数依次为LifecycleEventsObservable,endEvent和比较器
public static <E> CompletableSource resolveScopeFromLifecycle(
      Observable<E> lifecycle, final E endEvent, @Nullable final Comparator<E> comparator) {
    Predicate<E> equalityPredicate;
    if (comparator != null) {
      equalityPredicate = e -> comparator.compare(e, endEvent) >= 0;
    } else {
      equalityPredicate = e -> e.equals(endEvent);
    }
     //重点啊,后边要用到
     //注意这里只返回了CompletableSource,并没有添加观察者进行订阅,在分析点8中添加的
     //创建CompletableSource ,在满足取消订阅的条件时执行ignoreElements,
     //ignoreElements只会执行onComplete和onError
    return lifecycle.skip(1).takeUntil(equalityPredicate).ignoreElements();
  }
}

分析点3:provider为AndroidLifecycleScopeProvider(参考分析点2)

 //分析点3,分析比较靠后
    return autoDisposable(completableOf(provider));

 public static Completable completableOf(ScopeProvider scopeProvider) {
    return Completable.defer(
        () -> {
          try {
            //返回的是分析点6中生成的 CompletableSource 
            return scopeProvider.requestScope();
          } catch (OutsideScopeException e) {
         。。。。。。。。。
        });
  }

//传入分析点6中生成的 CompletableSource 
public static <T> AutoDisposeConverter<T> autoDisposable(final CompletableSource scope) {
    checkNotNull(scope, "scope == null");
    return new AutoDisposeConverter<T>() {
    。。。。。。。。。。。。。。
      @Override
      //Rxjava的as()操作符中调用AutoDisposeConverter的apply(),返回一个ObservableSubscribeProxy对象
      public ObservableSubscribeProxy<T> apply(final Observable<T> upstream) {
        if (!AutoDisposePlugins.hideProxies) {
          return new AutoDisposeObservable<>(upstream, scope);
        }
        return new ObservableSubscribeProxy<T>() {
       。。。。。。。。。。。。
          @Override
          public Disposable subscribe(
              Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
             //当ObservableSubscribeProxy被订阅时,生成封装的AutoDisposeObservable(上游的Observable,分析点6中生成的 CompletableSource )
             //分析点7
            return new AutoDisposeObservable<>(upstream, scope).subscribe(onNext, onError);
          }

       。。。。。。。。。。。。。
      }
      。。。。。。。。。。。。。。。。。。。
  }

分析点7:生成封装的AutoDisposeObservable(上游的Observable,分析点6中生成的 CompletableSource ),并执行subscribe,实际上就是执行了AutoDisposeObservable的subscribeActual()

   //分析点7
  return new AutoDisposeObservable<>(upstream, scope).subscribe(onNext, onError);

final class AutoDisposeObservable<T> extends Observable<T> implements ObservableSubscribeProxy<T> {
  private final ObservableSource<T> source;
  private final CompletableSource scope;

  AutoDisposeObservable(ObservableSource<T> source, CompletableSource scope) {
    //上游的Observable
    this.source = source;
    //分析点6中生成的 CompletableSource
    this.scope = scope;
  }

  @Override
  protected void subscribeActual(Observer<? super T> observer) {
  //分析点8:上游的Observable被封装的AutoDisposingObserverImpl()的观察者订阅
    source.subscribe(new AutoDisposingObserverImpl<>(scope, observer));
  }
}

分析点8:上面分析了那么多,这里才是真正的实现的地方。
上游的Observable被封装的AutoDisposingObserverImpl()的观察者订阅,重点在onSubscribe()的回调处理

final class AutoDisposingObserverImpl<T> extends AtomicInteger implements AutoDisposingObserver<T> {

。。。。。。。。。。。。。。。。。
  AutoDisposingObserverImpl(CompletableSource scope, Observer<? super T> delegate) {
    //分析点6中生成的 CompletableSource
    this.scope = scope;
    //原有的观察者
    this.delegate = delegate;
  }

  @Override
  public Observer<? super T> delegateObserver() {
    return delegate;
  }

  @Override
  public void onSubscribe(final Disposable d) {
    //创建一个DisposableCompletableObserver ,只有onError和onComplete回调
    //这个是和分析点6中提到的ignoreElements()是对应的
    DisposableCompletableObserver o =
        new DisposableCompletableObserver() {
          @Override
          //设置链式调用为Dispose
          public void onError(Throwable e) {
            scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
            AutoDisposingObserverImpl.this.onError(e);
          }

          @Override
           //设置链式调用为Dispose
          public void onComplete() {
            scopeDisposable.lazySet(AutoDisposableHelper.DISPOSED);
            AutoDisposableHelper.dispose(mainDisposable);
          }
        };
    if (AutoDisposeEndConsumerHelper.setOnce(scopeDisposable, o, getClass())) {
      delegate.onSubscribe(this);
      //分析点6中生成的 CompletableSource是没有添加观察者的
      //反向订阅,最终回调到分析点5中LifecycleEventsObservable的subscribeActual()实现对view生命周期的绑定
      //当生命周期发生变化时,会调用LifecycleEventsObservable.onNext(生命周期),
      //这行代码翻译过来,实际上就是LifecycleEventsObservable.skip(1).takeUntil("是取消订阅的生命周期").ignoreElements().subscribe(DisposableCompletableObserver )
      scope.subscribe(o);
      AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
    }
  }

  @Override
  public boolean isDisposed() {
    return mainDisposable.get() == AutoDisposableHelper.DISPOSED;
  }

  @Override
  public void dispose() {
    AutoDisposableHelper.dispose(scopeDisposable);
    AutoDisposableHelper.dispose(mainDisposable);
  }

  @Override
  public void onNext(T value) {
    if (!isDisposed()) {
      if (HalfSerializer.onNext(delegate, value, this, error)) {
        // Terminal event occurred and was forwarded to the delegate, so clean up here
        mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
        AutoDisposableHelper.dispose(scopeDisposable);
      }
    }
  }

  @Override
  public void onError(Throwable e) {
    if (!isDisposed()) {
      mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
      AutoDisposableHelper.dispose(scopeDisposable);
      HalfSerializer.onError(delegate, e, this, error);
    }
  }

  @Override
  public void onComplete() {
    if (!isDisposed()) {
      mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
      AutoDisposableHelper.dispose(scopeDisposable);
      HalfSerializer.onComplete(delegate, this, error);
    }
  }
}

总结:现在回答上文的问题
AutoDispose主要是通过监听view的生命周期来解决使用Rxjava时的内存泄漏的,那么view的生命周期和Rxjava的链式调用是如何关联的?
1:AndroidLifecycleScopeProvider.from(this))中获取当前view的lifecycle,并创建一个LifecycleEventsObservable(参考分析点5),在其subscribeActual中实现了对View生命周期的订阅。
当生命周期发生变化时,会执行LifecycleEventsObservable.onNext(“生命周期”).
2:在Rxjava的as操作符中AutoDisposeConverter.apply()中生成了观察者的代理实现类AutoDisposingObserverImpl(参考分析点8),并在onSubscribe中实现了观察者的订阅,经过翻译代码如下

LifecycleEventsObservable.skip(1).takeUntil("取消订阅生命周期").ignoreElements()
.subscribe(DisposableCompletableObserver )
点赞