小点知识——观察者模式,由浅谈到理解

观察者模式

自己对观察者模式的理解:
定义:Define a one-to-many dependency between objects so that when one object changes state, all its dependents aer notified and updated automatically.

意图:定义对象间的一种一对多的依赖关系,当一个对象改变状态时,则所有依赖于它的对象都会得到通知并被自动更新。

主要解决:一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。

使用场景:

1、有多个子类共有的方法,且逻辑相同。
2、重要的、复杂的方法,可以考虑作为模板方法。

优缺点:

优点:

1、观察者和被观察者是抽象耦合的。
2、建立一套触发机制。

缺点:

1、如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间。
2、如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃。
3、观察者模式没有相应的机制让观察者知道所观察的目标对象是怎么发生变化的,而仅仅只是知道观察目标发生了变化。

注意事项:

  1. JAVA 中已经有了对观察者模式的支持类。
  2. 避免循环引用。
  3. 如果顺序执行,某一观察者错误会导致系统卡壳,一般采用异步方式。

观察者模式的组成:

  1. 观察者,我们称它为Observer,有时候我们也称它为订阅者,即Subscriber。
  2. 被观察者,我们称它为Observable,即可以被观察的东西,有时候还会称之为主题,即Subject。

撸起袖子,为了验证我们的观察者模式,一波观察者模式的场景,首先定义一个Theif为观察目标。


    public class Theif {
        //描述小偷形象
        private String description;
    
        public Theif(String description){
            this.description = description;
        }
    
        public String getDescription() {
            return description;
        }
    
        public void setDescription(String description) {
            this.description = description;
        }
    
        @Override
        public String toString() {
            return "一名"+description+"小偷在进行扒窃";
        }
    }

定义我们的被观察者,我们希望能够通用,所以定义成泛型。泛型指具体的被观察者,内部暴露出register和unRegister方法供观察者订阅和取消订阅,至于观察者们的保存,我们用ArrayList集合储存即可,另外,当小偷发生变化的时候,需要通知观察者来做出响应,还需要一个notifyObservers方法。

public class Observable<T> {
    //|存放观察者们
    List<Observer<T>> mObservers = new ArrayList<>();
    //供观察者订阅 传入观察者即可
    public void register(Observer<T> observer) {    
        if (observer == null) {    
            throw new NullPointerException("observer == null");    
        }
        //多线程 添加同步锁
        // 如果线程A进入了该代码,线程B 在等待,这是A线程创建完一个实例出来后,线程B 获得锁进入同步代码,
        // 实例已经存在,木有必要再添加一个
        synchronized (this) {    
            if (!mObservers.contains(observer))    
                mObservers.add(observer);    
        }    
    }
    //根据传入的观察者 供观察者取消订阅
    public synchronized void unregister(Observer<T> observer) {    
        mObservers.remove(observer);    
    }    
    //通知观察者们
    public void notifyObservers(T data) {    
        for (Observer<T> observer : mObservers) {    
            observer.onFindThief(this, data);
        }    
    }    

}    

定义Observer接口为观察者,接收到消息后,即进行更新操作,对接收到的信息进行处理。观察者们 只需要实现Observer接口,该接口也是泛型的

public interface Observer<T> {
    //观察者发现目标,通过调用这个方法,并携带被观察者和数据 进行更新
    void onFindThief(Observable<T> observable,T data);
}

一旦订阅,发现小偷有变化,就会调用onFindThief接口。一定要用register方法来注册,否则观察者收不到变化的信息,而一旦不感兴趣,就可以调用unregister方法。

public class TestJava {

    public static void main(String agr[]){

        //一旦订阅,发现小偷有变化,就会调用onFindThief接口
        //Observable被观察者 被观察者的对象是小偷(Theif)
        Observable<Theif> observable=new Observable<>();
        //为了简便直接在这里创建一个观察者对象。观察者1  对小偷进行观察,一旦发现小偷就打印出目标
        Observer<Theif> observer1 = new Observer<Theif>() {
            @Override
            public void onFindThief(Observable<Theif> observable, Theif data) {
                System.out.println("观察者1——发现目标:"+data.toString());
            }
        };
        //观察者2  对小偷进行观察,一旦发现小偷就打印出目标
        Observer<Theif> observer2 = new Observer<Theif>() {
            @Override
            public void onFindThief(Observable<Theif> observable, Theif data) {
                System.out.println("观察者2——发现目标:"+data.toString());
            }
        };
        //一定要用register方法来注册,否则观察者收不到变化的信息,而一旦不感兴趣,就可以调用unregister方法
        observable.register(observer1);
        observable.register(observer2);

        //目标
        Theif theif = new Theif("穿红色外套");
        //被观察的目标有变化,通知观察者们
        observable.notifyObservers(theif);
        //观察者1 吃饭时间到,不再观察了 调用unregister方法注销被观察对象,其他观察者还有观察
        observable.unregister(observer1);

        //目标
        Theif theif2 = new Theif("中年男子,染黄头发");
        //被观察的目标有变化,通知观察者们
        observable.notifyObservers(theif2);

    }
}

输出结果为:

观察者1——发现目标:一名穿红色外套小偷在进行扒窃
观察者2——发现目标:一名穿红色外套小偷在进行扒窃
观察者2——发现目标:一名中年男子,染黄头发小偷在进行扒窃

一个简单的观察者模式就出来了,当然上面的代码还可以优化。有兴趣的可以自己进一步优化。

小结

观察者模式主要包括两个部分:

  • Subject被观察者。是一个接口或者是抽象类,定义被观察者必须实现的职责,它必须能偶动态地增加、取消观察者,管理观察者并通知观察者。
  • Observer观察者。观察者接收到消息后,即进行更新操作,对接收到的信息进行处理。
  1. ConcreteSubject具体的被观察者。定义被观察者自己的业务逻辑。
  2. ConcreteObserver具体观察者。每个观察者在接收到信息后处理的方式不同,各个观察者有自己的处理逻辑。
  3. 上面的例子Observable为被观察者 被观察者的对象是小偷(Theif),也即是ConcreteSubject具体的被观察者
  4. 上面的例子:可以自己定义一个类并实现Observer接口,那么这个类就是ConcreteObserver具体观察者。

开源框架EventBus、Rxjava等也是基于观察者模式的,观察者模式的注册,取消,发送事件三个典型方法都有。以EventBus3.0作为例子,分析其源码。

什么是EventBus

EventBus是一个消息总线,以观察者模式实现,用于简化程序的组件、线程通信,可以轻易切换线程、开辟线程。EventBus3.0跟先前版本的区别在于加入了annotation @Subscribe,取代了以前约定命名的方式

优点

开销小,代码优雅。将发送者和接受者解耦。

EventBus的用法——常用的几个方法

EventBus.getDefault().register(Object subscriber);    
EventBus.getDefault().unregister(Object subscriber);    
EventBus.getDefault().post(Object event);  
EventBus.getDefault().postSticky(Object event);  

当我们要调用EventBus的功能时,比如注册或者发送事件等,总会调用EventBus.getDefault()来获取EventBus实例

//源码
static volatile EventBus defaultInstance;
/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}
public EventBus() {
  this(DEFAULT_BUILDER);
}

从源码看出很明显这是一个单例模式。如果对单例模式感兴趣可以去研究一下单例模式。一看构造函数this(DEFAULT_BUILDER)做了些什么。这里DEFAULT_BUILDER是默认的EventBusBuilder用来构造EventBus:

private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

进入EventBusBuilder类看看,这么多个可选的配置属性,这里变量含义大家直接看我的注释,就不多作解释了。我们主要看最终的建造方法:

/**
 * 根据参数创建对象,并赋值给EventBus.defaultInstance, 必须在默认的eventbus对象使用以前调用
 *
 * @throws EventBusException if there's already a default EventBus instance in place
 */
public EventBus installDefaultEventBus() {
    synchronized (EventBus.class) {
        if (EventBus.defaultInstance != null) {
            throw new EventBusException("Default instance already exists." +
                    " It may be only set once before it's used the first time to ensure " +
                    "consistent behavior.");
        }
        EventBus.defaultInstance = build();
        return EventBus.defaultInstance;
    }
}

/**
 * 根据参数创建对象
 */
public EventBus build() {
    return new EventBus(this);
}

EventBusBuilder类提供了两种建造方法,还记得之前的getDefault()方法吗,维护了一个单例对象,installDefaultEventBus() 方法建造的EventBus对象最终会赋值给那个单例对象,但是有一个前提就是我们之前并没有创建过那个单例对象.

为什么如果EventBus.defaultInstance不为null程序要抛出异常?先把这个问题留着,继续往下看。

回到EventBus的构造方法中,this(DEFAULT_BUILDER)是指EventBus一个带实参的构造函数:

EventBus(EventBusBuilder builder) {
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();
    mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);
    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);
    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}

EventBusBuilder类提供了这么多个可选的配置属性,这里Map变量含义大家直接看我的注释。EventBus构造函数也涉及到了构造模式,感兴趣的可以查阅一下,这里就不多说了。

这里说说EventBus中的三个Poster类(HandlerPoster、BackgroundPoster、AsyncPoster)。三个Poster类的作用是什么?Poster应该是发布时候会涉及到,不过我们之后看代码再作解答,先解决这三个类,看代码。

private final HandlerPoster mainThreadPoster; //前台发送者
private final BackgroundPoster backgroundPoster; //后台发送者
private final AsyncPoster asyncPoster;   //后台发送者(只让队列第一个待订阅者去响应)

这几个Poster的设计可以说是整个EventBus的一个经典部分。每个Poster中都有一个发送任务队列,PendingPostQueue queue。进到队列PendingPostQueue里面再看。

private PendingPost head; //待发送对象队列头节点
private PendingPost tail;//待发送对象队列尾节点

接着我们再看这个PendingPost类的实现:

//单例池,复用对象
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event; //事件类型
Subscription subscription; //订阅者
PendingPost next; //队列下一个待发送对象

首先是提供了一个池的设计,类似于我们的线程池,目的是为了减少对象创建的开销,当一个对象不用了,我们可以留着它,下次再需要的时候返回这个保留的而不是再去创建。接着看最后的变量,PendingPost next 非常典型的队列设计,队列中每个节点都有一个指向下一个节点的指针。

/**
 * 首先检查复用池中是否有可用,如果有则返回复用,否则返回一个新的
 *
 * @param subscription 订阅者
 * @param event        订阅事件
 * @return 待发送对象
 */
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
    synchronized (pendingPostPool) {
        int size = pendingPostPool.size();
        if (size > 0) {
            PendingPost pendingPost = pendingPostPool.remove(size - 1);
            pendingPost.event = event;
            pendingPost.subscription = subscription;
            pendingPost.next = null;
            return pendingPost;
        }
    }
    return new PendingPost(event, subscription);
}

/**
 * 回收一个待发送对象,并加入复用池
 *
 * @param pendingPost 待回收的待发送对象
 */
static void releasePendingPost(PendingPost pendingPost) {
    pendingPost.event = null;
    pendingPost.subscription = null;
    pendingPost.next = null;
    synchronized (pendingPostPool) {
        // 防止池无限增长
        if (pendingPostPool.size() < 10000) {
            pendingPostPool.add(pendingPost);
        }
    }
}

obtainPendingPost()方法中对池复用的实现,每次新创建的节点尾指针都为 null 。细心的人还看到了一个关键字synchronized。Synchronized是Java并发编程中最常用的用于保证线程安全的方式,其使用相对也比较简单。(但是如果能够深入了解其原理,对监视器锁等底层知识有所了解,一方面可以帮助我们正确的使用Synchronized关键字,另一方面也能够帮助我们更好的理解并发编程机制,有助我们在不同的情况下选择更优的并发策略来完成任务。对平时遇到的各种并发问题,也能够从容的应对。不过这都是题个话了,有兴趣可以多了解一下。)

releasePendingPost(),回收pendingPost对象,既然可以从池中取,当然需要可以存。非常细心的可以注意到池的大小小于10000,不禁会想这是为什么呢,想表达什么呢?,if (pendingPostPool.size() < 10000) 其实我觉得10000都很大了,1000就够了,我们一次只可能创建一个pendingPost,如果ArrayList里面存了上千条都没有取走,那么肯定是使用出错了。当然ArrayList里不会存放那么多池,我只能猜测他们也只是预防一下,但也不是不可能不存在这样的情况。

三个Poster类的最底层都是PendingPost类,PendingPost的代码我们就看完了。那么继续往上一级PendingPostQueue类了解。PendingPostQueue这是一个队列的设计。那么我们首先看看PendingPostQueue的入队方法:

synchronized void enqueue(PendingPost pendingPost) {
    if (pendingPost == null) {
        throw new NullPointerException("null cannot be enqueued");
    }
    if (tail != null) {
        tail.next = pendingPost;
        tail = pendingPost;
    } else if (head == null) {
        head = tail = pendingPost;
    } else {
        throw new IllegalStateException("Head present, but no tail");
    }
    notifyAll();
}

从源码可以看出首先来个判空处理。接着将当前节点的上一个节点(入队前整个队列的最后一个节点)的尾指针指向当期正在入队的节点(传入的参数pendingPost),并将队列的尾指针指向自己(自己变成队列的最后一个节点),这样就完成了入队。如果是队列的第一个元素(队列之前是空的),那么直接将队列的头尾两个指针都指向自身就行了。

接着看看PendingPostQueue的出队也是类似的队列指针操作:

synchronized PendingPost poll() {
    PendingPost pendingPost = head;
    if (head != null) {
        head = head.next;
        if (head == null) {
            tail = null;
        }
    }
    return pendingPost;
}

首先将出队前的头节点保留一个临时变量(它就是要出队的节点),拿到这个将要出队的临时变量的下一个节点指针,将出队前的第二个元素(出队后的第一个元素)的赋值为现在队列的头节点,出队完成。

非常细心的人会发现 PendingPostQueue的所有方法都声明了synchronized。这意味着在多线程下它依旧可以正常工作。如果对synchronized还是不太了解的可以查阅资料。

既然所有方法都声明了synchronized,那就必定有他的道理。那么我们往上一级看看就知道方法前添加synchronized关键字是不是存在的意义。接着看HandlerPoster的入队方法enqueue():

/**
 * @param subscription 订阅者
 * @param event        订阅事件
 */
void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}

入队方法会根据参数创建,待发送对象pendingPost并加入队列,从代码上可以看出验证了synchronized关键字是有存在的意义。 handleMessage()没有在运行中,则发送一条空消息让handleMessage响应,那继续看看handleMessage()方法里做了些什么:

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // 双重校验,类似单例中的实现
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            //如果订阅者没有取消注册,则分发消息
            eventBus.invokeSubscriber(pendingPost);
            
            //如果在一定时间内仍然没有发完队列中所有的待发送者,则退出
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

handleMessage()不停的在待发送队列queue中去取消息。直得注意的是在循环之外有个临时boolean变量rescheduled,最后是通过这个值去修改了handlerActive。而 handlerActive 是用来判断当前queue中是否有正在发送对象的任务,看到上面的入队方法enqueue(),如果已经有任务在跑着了,就不需要再去sendMessage()唤起我们的handleMessage()。最终通过eventBus对象的invokeSubscriber()最终发送出去,并回收这个pendingPost。那么这里有个问题了,什么时候才会通过eventBus对象的invokeSubscriber()发送出去?HandlePoster类已经看完了,其实三个Poster类里面的实现都很相似易懂的,以HandlePoster类来介绍,另外两个类(BackgroundPoster、AsyncPoster)异步的发送者实现代码也差不多,唯一的区别就是另外两个是工作在异步,实现的Runnable接口。

整理一下上面出现和问题:

  1. 为什么如果EventBus.defaultInstance不为null程序要抛出异常?
  2. 三个Poster类(HandlerPoster、BackgroundPoster、AsyncPoster)的作用是什么?
  3. invokeSubscriber()什么时候才触发发送出去?

回顾一下Poster、PendingPostQueue、PendingPost这三个类,是不是有种似曾相识的感觉。哈哈,没有错。那是Handler、Message、Looper的工作原理。如果不了解Handler可以查阅一下资料。

EventBus注册

获取到EventBus后,可以将订阅者注册到EventBus中,下面来看一下register方法:

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    // 用 subscriberMethodFinder 提供的方法,找到在 subscriber 这个类里面订阅的内容。
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

通过SubscriberMethodFinder#findSubscriberMethods方法找出一个SubscriberMethod的集合,也就是传进来的订阅者所有的订阅的方法,接下来遍历订阅者的订阅方法来完成订阅者的订阅操作。对于SubscriberMethod(订阅方法)类中,主要就是用保存订阅方法的Method对象、线程模式、事件类型、优先级、是否是粘性事件等属性,就不贴代码了。那说说SubscriberMethodFinder类中的findSubscriberMethods方法吧,SubscriberMethodFinder从字面理解,就是订阅者方法发现者。

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
   //从缓存中获取SubscriberMethod集合
   List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
   if (subscriberMethods != null) {
       return subscriberMethods;
   }
   //ignoreGeneratedIndex属性表示是否忽略注解器生成的MyEventBusIndex
   if (ignoreGeneratedIndex) {
    //通过反射获取subscriberMethods
       subscriberMethods = findUsingReflection(subscriberClass);
   } else {
       subscriberMethods = findUsingInfo(subscriberClass);
   }
   //在获得subscriberMethods以后,如果订阅者中不存在@Subscribe注解并且为public的订阅方法,则会抛出异常。
   if (subscriberMethods.isEmpty()) {
       throw new EventBusException("Subscriber " + subscriberClass
               + " and its super classes have no public methods with the @Subscribe annotation");
   } else {
       METHOD_CACHE.put(subscriberClass, subscriberMethods);
       return subscriberMethods;
   }
}

首先从缓存中查找,如果找到了就返回subscriberMethods。如果缓存中没有的话,则根据 ignoreGeneratedIndex选择如何查找订阅方法,ignoreGeneratedIndex属性表示是否忽略注解器生成的MyEventBusIndex。ignoreGeneratedIndex 默认就是false,如果ignoreGeneratedIndex是true 那么通反射获取subscriberMethods,可以通过EventBusBuilder来设置它的值。最后,找到订阅方法后,放入缓存,以免下次继续查找。

可以看到其中findUsingInfo()方法就是去索引中查找订阅者的回调方法,我们戳进去看看这个方法的实现:

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    // 最新版的EventBus3中,寻找方法时所需的临时变量都被封装到了FindState这个静态内部类中
    FindState findState = prepareFindState(); // 到对象池中取得上下文,避免频繁创造对象,这个设计很赞
    findState.initForSubscriber(subscriberClass); // 初始化寻找方法的上下文
    while (findState.clazz != null) { // 子类找完了,会继续去父类中找
        findState.subscriberInfo = getSubscriberInfo(findState); // 获得订阅者类的相关信息
        if (findState.subscriberInfo != null) { // 上一步能拿到相关信息的话,就开始把方法数组封装成List
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    // checkAdd是为了避免在父类中找到的方法是被子类重写的,此时应该保证回调时执行子类的方法
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else { // 索引中找不到,降级成运行时通过注解和反射去找
            findUsingReflectionInSingleClass(findState);
        }
        findState.moveToSuperclass(); // 上下文切换成父类
    }
    return getMethodsAndRelease(findState); // 找完后,释放FindState进对象池,并返回找到的回调方法
}

通过prepareFindState()找到所需的临时变量都被封装到了FindState这个静态内部类中,初始化寻找方法的上下文,子类找完了,会继续去父类中找。然后getSubscriberInfo方法来获取订阅者信息。如果获得到订阅者类的相关信息,不仅会去遍历父类,而且还会避免因为重写方法导致执行多次回调,过虑了重重保险关卡,然后subscriberMethods添加subscriberMethod。如果没有获得到订阅者类的相关信息,便会执行findUsingReflectionInSingleClass方法,将订阅方法保存到findState中。最后再通过getMethodsAndRelease方法对findState做回收处理并反回订阅方法的List集合。其中需要关心的是getSubscriberInfo()是如何返回索引数据的。

private SubscriberInfo getSubscriberInfo(FindState findState) {
    if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) { // subscriberInfo已有实例,证明本次查找需要查找上次找过的类的父类
        SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
        if (findState.clazz == superclassInfo.getSubscriberClass()) { // 确定是所需查找的类
            return superclassInfo;
        }
    }
    if (subscriberInfoIndexes != null) { // 从我们传进来的subscriberInfoIndexes中获取相应的订阅者信息
        for (SubscriberInfoIndex index : subscriberInfoIndexes) {
            SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
            if (info != null) { return info; }
        }
    }
    return null;
}

在getSubscriberInfo方法中,传入的实参如果已有实例,证明本次查找需要查找上次找过的类的父类。如果没有实例,应用到了我们生成的索引,避免我们需要在findSubscriberMethods时去调用耗时的findUsingReflection方法。关于注解为我们生成的索引这里先不解答,后面会有代码解说(在我们开始查找订阅方法的时候并没有忽略注解器为我们生成的索引MyEventBusIndex,如果我们通过EventBusBuilder配置了MyEventBusIndex,便会获取到subscriberInfo,调用subscriberInfo的getSubscriberMethods方法便可以得到订阅方法相关的信息,这个时候就不在需要通过注解进行获取订阅方法。如果没有配置MyEventBusIndex,便会执行findUsingReflectionInSingleClass方法,将订阅方法保存到findState中。)

下面就来看一下findUsingReflectionInSingleClass的执行过程:

private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        methods = findState.clazz.getMethods();
        findState.skipSuperClasses = true;
    }
    for (Method method : methods) {
        int modifiers = method.getModifiers();
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    Class<?> eventType = parameterTypes[0];
                    if (findState.checkAdd(method, eventType)) {
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

在这里主要是使用了Java的反射和对注解的解析。首先通过反射来获取订阅者中所有的方法。并根据方法的类型,参数和注解来找到订阅方法。找到订阅方法后将订阅方法相关信息保存到FindState当中。

当获取到了subscriberMethods数据集,接着开始对subscriberMethods数据集遍历,在订阅方法里进行注册:

 private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    ////根据传入的响应方法名获取到响应事件(参数类型)
    Class<?> eventType = subscriberMethod.eventType;
      //根据订阅者和订阅方法构造一个订阅事件
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
      //获取当前订阅事件中Subscription的List集合
        //通过响应事件作为key,并取得这个事件类型将会响应的全部订阅者
        //没个订阅者至少会订阅一个事件,多个订阅者可能订阅同一个事件(多对多)
        //key:订阅的事件,value:订阅这个事件的所有订阅者集合
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
     //该事件对应的Subscription的List集合不存在,则重新创建并保存在subscriptionsByEventType中
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
    //订阅者已经注册则抛出EventBusException
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }
    //遍历订阅事件,找到比subscriptions中订阅事件小的位置,优先级插入到订阅者集合中
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
     //通过订阅者获取该订阅者所订阅事件的集合(当前订阅者订阅了哪些事件)
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
      //将当前的订阅事件添加到subscribedEvents中
    subscribedEvents.add(eventType);
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            //粘性事件的处理
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                //如果eventtype是candidateEventType同一个类或是其子类
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    //调用checkPostStickyEventToSubscription()做一次安全判断,就调用
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

checkPostStickyEventToSubscription()#postToSubscription(…)

private void postToSubscription(...) {
    switch (threadMode) {
        case PostThread:
            //直接调用响应方法
            invokeSubscriber(subscription, event);
            break;
        case MainThread:
            //如果是主线程则直接调用响应事件,否则使用handle去在主线程响应事件
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
            //。。。
    }
}

订阅的代码主要就做了两件事,第一件事是将我们的订阅方法和订阅者封装到subscriptionsByEventType和typesBySubscriber中,subscriptionsByEventType是我们投递订阅事件的时候,就是根据我们的EventType找到我们的订阅事件,从而去分发事件,处理事件的;typesBySubscriber在调用unregister(this)的时候,根据订阅者找到EventType,又根据EventType找到订阅事件,从而对订阅者进行解绑。第二件事,如果是粘性事件的话,就立马投递、执行。

我们就知道了 EventBus 中那几个map的全部含义。同时也回答了上面的第一个问题:为什么如果EventBus.defaultInstance不为null以后程序要抛出异常,就是因为这几个Map集合不同了。Map变了以后,订阅的事件就全部变为另一个EventBus对象的了,就没办法响应之前那个EventBus对象的订阅方法了。

如果不是sticky粘性事件都直接不执行了,还怎么响应。如果是sticky粘性事件,最后是调用checkPostStickyEventToSubscription()做一次安全判断,就调用postToSubscription()发送事件了。这里就关联到了我们之前讲的Poster类的作用了。也回答了上面的第二个问题:三个Poster类的作用,Poster类就是只负责粘滞事件的代码。同时也回答了上面的第三个问题:invokeSubscriber(。。。)什么时候触发的。之前我们不知道subscriberMethod是什么,现在我们能看懂了,就是通过反射调用订阅者类subscriber的订阅方法onEventXXX(),并将event作为参数传递进去。源码如下:

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

前面提到注解器为我们生成的索引MyEventBusIndex。索引是在初始化EventBus时通过EventBusBuilder.addIndex(SubscriberInfoIndex index)方法传进来的。下面就说说添加事件总线注释预处理器生成的索引。

//源码
/** Adds an index generated by EventBus' annotation preprocessor. */
public EventBusBuilder addIndex(SubscriberInfoIndex index) {
    if(subscriberInfoIndexes == null) {
        subscriberInfoIndexes = new ArrayList<>();
    }
    subscriberInfoIndexes.add(index);
    return this;
}

可以看到,传进来的索引信息会保存在subscriberInfoIndexes这个List中,后续会通过EventBusBuilder传到相应EventBus的SubscriberMethodFinder实例中。我们先来分析SubscriberInfoIndex这个参数:

public interface SubscriberInfoIndex {
    SubscriberInfo getSubscriberInfo(Class<?> subscriberClass);
}

可见索引只需要做一件事情——就是能拿到订阅者的信息。而实现这个接口的类如果我们没有编译过,是找不到的。

EventBus3.X新特性Subscriber Index,在默认属性中有一个属性为ignoreGeneratedIndex,在后面的源码
分析中会介绍到这个属性为true时会使用反射方法获取订阅者的事件处理函数,为false时会使用subscriber Index生成的SubscriberInfo来获取订阅者的事件处理函数,具体内容会在源码分析中介绍
首先,Subscriber Index会在编译期间生成SubscriberInfo,然后在运行时使用SubscriberInfo中保存的事件处理函数处理事件,减少了反射时需要是耗时,会有运行速度上的提升,但是用起来会比较麻烦。

注意事项:只有用注解@Subscriber描述的public方法才能被索引,并且由于Java的特性匿名内部类就算用@Subscriber描述也不能被索引,不过别担心,当EventBus不能使用索引时会自动在运行时使用反射方法获取事件处理函数。

要添加事件处理函数到索引需要借助EventBus的注解处理器,添加EventBus的注解处理器到Module的Gradle文件中:

buildscript {
    dependencies {
        classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8'
    }
}
apply plugin: 'com.neenbedankt.android-apt'
dependencies {
    compile 'org.greenrobot:eventbus:3.0.0'
    apt 'org.greenrobot:eventbus-annotation-processor:3.0.1'
}
apt {
    arguments {
        eventBusIndex "包名.MyEventBusIndex"//生成索引的名称
    }
}

在导入这些库没有问题后,Gradle之后build项目。这个时候就可以初始化EventBus,上面讲过,自定义EventBus的两种方式。

我们可以自定义设置自己的EventBus来为其添加MyEventBusIndex对象。代码如下所示:

EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();

我们也能够将MyEventBusIndex对象安装在默认的EventBus对象当中。代码如下所示:

EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
// Now the default instance uses the given index. Use it like this:
EventBus eventBus = EventBus.getDefault();

如果有多个索引文件还可以使用下面的代码:

EventBus eventBus = EventBus.builder()
    .addIndex(new MyEventBusAppIndex())
    .addIndex(new MyEventBusLibIndex()).build();

顺便说一句,MyEventBusIndex的路径为:项目根目录\build\generated\source\apt\debug\包名\MyEventBusIndex.java

EventBus 事件的发送

一、post方式

在获取到EventBus对象以后,可以通过post方法将给定事件发布到事件总线。

public void post(Object event) {
     //PostingThreadState保存着事件队列和线程状态信息
      PostingThreadState postingState = currentPostingThreadState.get();
     //获取事件队列,并将当前事插入到事件队列中
      List<Object> eventQueue = postingState.eventQueue;
      eventQueue.add(event);
      if (!postingState.isPosting) {
          postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
          postingState.isPosting = true;
          if (postingState.canceled) {
              throw new EventBusException("Internal error. Abort state was not reset");
          }
          try {
            //处理队列中的所有事件
              while (!eventQueue.isEmpty()) {
                  postSingleEvent(eventQueue.remove(0), postingState);
              }
          } finally {
              postingState.isPosting = false;
              postingState.isMainThread = false;
          }
      }
  }

我们知道,对应的值,是通过post去完成的。这里用ThreadLocal的线程包装类,去完成对应的线程记录和操作
对应的泛型PostingThreadState对象,记录对应的信息,将对应的线程放入一个List中,通过对应的状态,判断Looper.getMainLooper() == Looper.myLooper() 是否是主线程等操作, 将状态存入PostingThreadState的属性中,遍历容器中的Event,最后通过存储的Subscription中的subscriberMethod的method,反射去invoke调用方法。

先从PostingThreadState对象中取出事件队列,然后再将当前的事件插入到事件队列当中。如果postingState.isPosting为true说明正在发布,则不做处理。如果不是正在发布,则最后将队列中的事件依次交由postSingleEvent方法进行处理,并移除该事件。这里也涉及到了Handler、looper、ThreadLocal就不进行解答了。

进入postSingleEvent方法中了解其操作过程:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
       Class<?> eventClass = event.getClass();
       boolean subscriptionFound = false;
       //eventInheritance表示是否向上查找事件的父类,默认为true
       if (eventInheritance) {
           List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
           int countTypes = eventTypes.size();
           for (int h = 0; h < countTypes; h++) {
               Class<?> clazz = eventTypes.get(h);
               subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
           }
       } else {
           subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
       }
         //找不到该事件时的异常处理
       if (!subscriptionFound) {
           if (logNoSubscriberMessages) {
               Log.d(TAG, "No subscribers registered for event " + eventClass);
           }
           if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                   eventClass != SubscriberExceptionEvent.class) {
               post(new NoSubscriberEvent(this, event));
           }
       }
   }

eventInheritance表示是否向上查找事件的父类,它的默认值为true,可以通过在EventBusBuilder中来进行配置。当eventInheritance为true时,则通过lookupAllEventTypes找到所有的父类事件并存在List中,然后通过postSingleEventForEventType方法对事件逐一处理。

进入postSingleEventForEventType方法看看是怎么进一步对事件处理:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
      CopyOnWriteArrayList<Subscription> subscriptions;
         //取出该事件对应的Subscription集合
      synchronized (this) {
          subscriptions = subscriptionsByEventType.get(eventClass);
      }
      if (subscriptions != null && !subscriptions.isEmpty()) {
      //将该事件的event和对应的Subscription中的信息(包扩订阅者类和订阅方法)传递给postingState
          for (Subscription subscription : subscriptions) {
              postingState.event = event;
              postingState.subscription = subscription;
              boolean aborted = false;
              try {
                 //对事件进行处理
                  postToSubscription(subscription, event, postingState.isMainThread);
                  aborted = postingState.canceled;
              } finally {
                  postingState.event = null;
                  postingState.subscription = null;
                  postingState.canceled = false;
              }
              if (aborted) {
                  break;
              }
          }
          return true;
      }
      return false;
  }

同步取出该事件对应的Subscription集合并遍历该集合将事件event和对应Subscription传递给postingState并调用postToSubscription方法对事件进行处理。

进一步进入postToSubscription方法,了解内部是怎么处理的:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            //直接调用响应方式
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            //如果是主线程则直接调用响应事件,否则使用handle去在主线程响应事件
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

首先获取threadMode,即订阅方法运行的线程模式,如果是POSTING,那么直接调用invokeSubscriber()方法即可,如果是MAIN,则要判断当前线程是否是MAIN线程,如果是也是直接调用invokeSubscriber()方法,否则会交给mainThreadPoster来处理,其他情况相类似。举个例子,如果线程模式是MAIN,提交事件的线程是主线程的话则通过反射,直接运行订阅的方法,如果不是主线程,我们需要mainThreadPoster将我们的订阅事件入队列,mainThreadPoster是HandlerPoster类型的继承自Handler,通过Handler将订阅方法切换到主线程执行。

二、postSticky方式

粘性事件发布

/**
 * Posts the given event to the event bus and holds on to the event (because it is sticky). The most recent sticky
 * event of an event's type is kept in memory for future access by subscribers using {@link Subscribe#sticky()}.
 */
public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}

将给定事件发布到事件总线并保存到事件(因为它是粘性的)。最近的粘性事件类型的事件被保存在内存中以供订阅者使用。用stickyEvents来存放粘性事件,并且从这个put我们可以知道stickyEvents的key是event的Class对象,value是event,然后进行Post流程。我们可以去验证一下,在一个对象如Activity中先调用EventBus.getDefault().postSticky(new MessageEvent(“Hello everyone!”));,再调用EventBus.register,结果如何呢?从结果可以预想在register(更准确的说应该是subscribe)中肯定会有post的调用。带着这个设想,我们继续看subcribe方法,里面就有粘性事件的判断。

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
     
    。。。。

      //将当前的订阅事件添加到subscribedEvents中
    subscribedEvents.add(eventType);
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            //粘性事件的处理
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

在代码端,接收事件时这样。

@Subscribe(threadMode = ThreadMode.MAIN)
public void onJump(MsgEvent msg){
    
}

//粘性事件
@Subscribe(threadMode = ThreadMode.MAIN,sticky = true)
public void onJump(MsgEvent msg){
    
}   

注解@Subscribe

ThreadMode 是enum(枚举)类型,threadMode默认值是POSTING。ThreadMode有四种模式:

  1. POSTING :Subscriber会在post event的所在线程回调,故它不需要切换线程来分发事件,因此开销最小。它要求task完成的要快,不能请求MainThread,适用简单的task。

  2. MAIN :Subscriber会在主线程(有时候也被叫做UI线程)回调,如果post event所在线程是MainThread,则可直接回调。注意不能阻塞主线程。

  3. BACKGROUND :Subscriber会在后台线程中回调。如果post event所在线程不是MainThread,那么可直接回调;如果是MainThread,EventBus会用单例background thread来有序地分发事件。注意不能阻塞background thread。

  4. ASYNC:当处理事件的Method是耗时的,需要使用此模式。尽量避免同时触发大量的耗时较长的异步操作,EventBus使用线程池高效的复用已经完成异步操作的线程。

再回到@Subscribe,还有两个参数,sticky(粘性)默认值是false,如果是true,那么可以通过EventBus的postSticky方法分发最近的粘性事件给该订阅者(前提是该事件可获得)。

EventBus 取消注册

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    } else {
        Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}


private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

unsubscribeByEventType方法中获取相应事件中的相应列表删除掉该订阅者。typesBySubscriber我们在订阅者注册的过程中讲到过这个属性,他根据订阅者找到EventType,然后根据EventType和订阅者来得到订阅事件来对订阅者进行解绑。

对EventBus小结

弄清楚上面的代码之后,我们一起来梳理一下设计者是如何完成观察者模式中的订阅和发送事件以及取消订阅。

  • 我们先看subscriber。看到注册方法register(Object subscriber),传入的就是一个Objec对象。一个subscriber可以订阅多个类型的Event,而且对于同一种类型的Event可以有多个method进行处理(尽管我们一般不会这样做)。

  • 在subscribe方法中,引入Subscription对subscriber、event进行了封装。经过判断之后,把“合格”的subscription加入subscriptionsByEventType中。

  • 当我们分发事件时,也就是post(Object event)时,他会间接调用postSingleEventForEventType这个方法,通过传入的参数event,在subscriptionsByEventType找到event对应的value,再继续相应的操作。

  • 我们取消一个subscriber的订阅是,也就是unregister(Object subscriber).我们会在typesBySubscriber中找到该subsriber对应的evnt。然后再由此event去subscriptionsByEventType找到一系列的subscription,并把他们remove。

  • EventBus订阅了,post发布了事件,就会触发invokeSubscriber()方法,订阅者类subscriber的订阅方法onEventXXX(),并将event作为参数传递进去。即调用subscription.subscriberMethod.method.invoke(subscription.subscriber, event)方法去响应。那么订阅者类就会收到信息

    原文作者:安仔夏天勤奋
    原文地址: https://www.jianshu.com/p/a26b7325e8e8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞