随笔博文

EventBus 源码解析(很细 很长)

2022-12-20 12:31:22 michael007js 86

前言

相信大家都用过EventBus这个开源库,它基于发布/订阅者模式用于组件间的沟通,解耦合避免出现回调地狱,使用起来十分简单也很好用。这样的开源库是很值得我们学习的,今天就来学习一下他的源码与设计思想。 发布订阅模式


使用方法

使用方法很简单,按照官方文档介绍,分为三个步骤。

步骤1:定义事件

public static class MessageEvent { }

步骤2:准备订阅者

定义订阅方法,来处理收到的事件。

@Subscribe(threadMode = ThreadMode.MAIN)  
public void onMessageEvent(MessageEvent event) {/* Do something */};

并在Activity、Fragment中按照其生命周期进行注册与注销。

 @Override
public void onStart() {
    super.onStart();
    EventBus.getDefault().register(this);
}

@Override
public void onStop() {
    super.onStop();
    EventBus.getDefault().unregister(this);
}

步骤3:发送事件

将定义好的事件,发送给订阅者。

 EventBus.getDefault().post(new MessageEvent());


源码解析

使用方法很简洁,下面就根据使用步骤来解析一下源码。

准备订阅者

准备订阅者这一步骤,分为注册 注销以及准备订阅方法两步。

准备订阅方法

通过使用方法也可以看出,订阅方法是通过注解@Subscribe的方式来实现的。

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
   /**
    * 线程模式,默认为 POSTING
    */
   ThreadMode threadMode() default ThreadMode.POSTING;

   /**
    * 是否是粘性事件,默认为 false
    */
   boolean sticky() default false;

   /**
    * 事件订阅的优先级,默认为0。
    * 当订阅者们处于同一线程模式中,优先级才会起作用,优先级高的订阅者将先收到推送过来的事件。
    */
   int priority() default 0;
}

源码中用到了三个元注解,分别是:

  • @Documented:表示使用该注解的元素应被javadoc或类似工具文档化。

  • @Retention(RetentionPolicy.RUNTIME):表示注解会被保留在class文件中,运行期间也会被识别,所以可以使用反射机制获取注解信息。

  • @Target({ElementType.METHOD}):表示使用范围,该注解作用于描述方法。

每个订阅者都会有一个线程模式,线程模式决定其订阅方法运行在哪个线程中,这几种线程模式,分别为:

  • POSTING:默认的线程模式,在哪个线程发送事件就在对应的线程处理事件。

  • MAIN:如果是在主线程发送事件,直接在主线程处理事件。反之,如果在子线程中发送事件,则需要切换到主线程来处理事件。(在Android中使用比较多)

  • MAIN_ORDERED:不管在哪个线程发送事件,都会将事件入队列,在主线程上有序执行。

  • BACKGROUND:如果是在子线程中发送事件,则直接在该子线程中处理事件。反之,如果是在主线程中发送事件,则需要将该事件入消息队列,切换到子线程,用线程池来有序处理该事件。(如果不是Android中使用,总是使用该模式)

  • ASYNC:无论是在哪个线程发送事件,都会将该事件入消息队列,通过线程池在子线程上处理事件。如果订阅者方法的执行可能需要一些时间(如网络访问),则应使用此模式。

注册

如上述使用方法中所介绍,只需要一行即可完成订阅者的注册。

EventBus.getDefault().register(this);

EventBus.getDefault()方法其实就是通过单例模式返回EventBus的实例,我们直接来看看register方法。

public void register(Object subscriber) {
   //获取订阅者类
   Class<?> subscriberClass = subscriber.getClass();
   //根据订阅者类来获取到订阅方法
   List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
   synchronized (this) {
       for (SubscriberMethod subscriberMethod : subscriberMethods) {
           //遍历订阅方法,调用订阅方法
           subscribe(subscriber, subscriberMethod);
     }
 }
}

方法中的参数subscriber就是我们调用方法是传入的this,所以也就是表示ActivityFragment。简单概括一下就是:我们通过获取订阅者的类对象,然后找到其订阅方法,调用subscribe订阅方法进行订阅。 所以重点就要看看他是怎么找到订阅方法以及怎么订阅方法里面做了什么?往下走:

找到订阅方法

/**
* 找到订阅方法
*
* @param subscriberClass 订阅者类
* @return 订阅方法列表
*/
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
   //先从缓存里找订阅方法
   //METHOD_CACHE -> Map<Class<?>, List<SubscriberMethod>>: key为订阅者类,value为订阅方法列表
   List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
   if (subscriberMethods != null) {
       //如果缓存里有,直接返回使用
       return subscriberMethods;
 }
//是否使用 subscriber index,ignoreGeneratedIndex默认为false
   if (ignoreGeneratedIndex) {
       subscriberMethods = findUsingReflection(subscriberClass);
 } else {
       subscriberMethods = findUsingInfo(subscriberClass);
 }
   if (subscriberMethods.isEmpty()) {
       //如果没有找到任何订阅方法,抛出异常,提醒用户使用 @Subscribe 方法来声明订阅方法
       //也就是说,如果用户register注册了,但是没有任何@Subscribe订阅方法,会抛出异常来提示用户
       throw new EventBusException("Subscriber " + subscriberClass
               + " and its super classes have no public methods with the @Subscribe annotation");
 } else {
       //如果订阅方法不为空,放入缓存中,以方便下次复用,key为订阅类的类名
       METHOD_CACHE.put(subscriberClass, subscriberMethods);
       return subscriberMethods;
 }
}

简单概括一下就是:先根据订阅者类去METHOD_CACHE中查找,找到则直接返回订阅者方法列表,找不到则根据是否使用subscriber index 来决定是否使用 findUsingInfo还是findUsingReflection方法。找到订阅方法列表,加入到METHOD_CACHE中方便下次使用,反之,找不到订阅方法,抛出异常。

接下来看看是怎么找到并返回订阅者列表的,先看看findUsingReflection方法,即直接使用反射,不使用 subscriber index

private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        findUsingReflectionInSingleClass(findState);
        //查找父类,具体执行要看 skipSuperClasses 标志位
        findState.moveToSuperclass();
    }
    //返回订阅方法列表
    return getMethodsAndRelease(findState);
}

/**
 * 通过类的反射提取订阅信息
 *
 * @param findState
 */
private void findUsingReflectionInSingleClass(FindState findState) {
    Method[] methods;
    try {
        // This is faster than getMethods, especially when subscribers are fat classes like Activities
        // getMethods(): 返回由类或接口声明的以及从超类和超接口继承的所有公共方法。
        // getDeclaredMethods(): 返回类声明的方法,包括 public, protected, default (package),但不包括继承的方法
        // 所以,相对比于 getMethods 方法,该方法速度更加快,尤其是在复杂的类中,如 Activity。
        methods = findState.clazz.getDeclaredMethods();
    } catch (Throwable th) {
        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
        try {
            methods = findState.clazz.getMethods();
        } catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
            String msg = "Could not inspect methods of " + findState.clazz.getName();
            if (ignoreGeneratedIndex) {
                //请考虑使用 EventBus annotation processor 来避免反射
                msg += ". Please consider using EventBus annotation processor to avoid reflection.";
            } else {
                msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
            }
            //找不到该类中方法,抛出异常
            throw new EventBusException(msg, error);
        }
        // 因为getMethods()方法已经获取了超类的方法,所以这里设置不再去检查超类
        findState.skipSuperClasses = true;
    }
    //遍历找到的方法
    for (Method method : methods) {
        //获取方法修饰符: public->1;private->2;protected->4;static->8;final->16
        int modifiers = method.getModifiers();
        //如果是public,且不是 abstract | static 类的
        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
            //获取方法参数类型
            Class<?>[] parameterTypes = method.getParameterTypes();
            if (parameterTypes.length == 1) {
                //获取方法的注解 Subscribe
                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                if (subscribeAnnotation != null) {
                    //第一个参数就是事件类型
                    Class<?> eventType = parameterTypes[0];
                    //检查是否已经添加了订阅该类型事件的订阅方法,true->没有添加;false->已添加
                    if (findState.checkAdd(method, eventType)) {
                        //没有添加过,根据找到的参数来新建一个订阅方法对象,加入 subscriberMethods 列表中
                        ThreadMode threadMode = subscribeAnnotation.threadMode();
                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                    }
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException("@Subscribe method " + methodName +
                        "must have exactly 1 parameter but has " + parameterTypes.length);
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException(methodName +
                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
        }
    }
}

关于 checkAdd方法,进一步深入:

final Map<Class, Object> anyMethodByEventType = new HashMap<>();
final Map<String, Class> subscriberClassByMethodKey = new HashMap<>();

boolean checkAdd(Method method, Class<?> eventType) {
    // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
    // Usually a subscriber doesn't have methods listening to the same event type.
    // 通常情况下,一个订阅者不会有多个订阅方法来订阅同一个类型事件(出现这种情况,不就是用户瞎JB写么)

    // 扩充:HashMap put() 方法返回值说明:
    // 如果已经存在一个相同的key, 则返回的是前一个key对应的value,同时该key的新value覆盖旧value;
    // 如果是新的一个key,则返回的是null;
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        //表示还没有存在订阅该类型事件的订阅方法
        return true;
    } else {
        //已经存在订阅该类型事件的订阅方法了
        //existing就是先存入anyMethodByEventType的订阅统一类型事件的订阅方法
        if (existing instanceof Method) {
            if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                // Paranoia check
                throw new IllegalStateException();
            }
            // Put any non-Method object to "consume" the existing Method
            anyMethodByEventType.put(eventType, this);
        }
        return checkAddWithMethodSignature(method, eventType);
    }
}

private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append('>').append(eventType.getName());

    // "方法名>事件类型"
    // 意图:这样如果存在在同一个类中,有多个订阅方法订阅了同一个事件,这样当这个事件分发的时候,所有的订阅方法都会收到该事件。
    String methodKey = methodKeyBuilder.toString();
    Class<?> methodClass = method.getDeclaringClass();
    Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
    /* 扩充:isAssignableFrom() 方法说明:
     * 当前的Class对象所表示的类,是不是参数中传递的Class对象所表示的类的父类,超接口,或者是相同的类型。
     * 是则返回true,否则返回false。
     */
    //methodClassOld 为空,表示没有添加过,或者是methodClassOld是methodClass的父类
    if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
        // Only add if not already found in a sub class
        // 只有在子类中没有找到的情况下才添加
        return true;
    } else {
        // Revert the put, old class is further down the class hierarchy
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}
使用 Subscriber Index

刚刚看了直接调用findUsingReflection方法利用反射来找到订阅方法,接着我们看看如何利用 Subscribe index 来找到订阅方法。

Note: we highly recommend the EventBus annotation processor with its subscriber index. This will avoid some reflection related problems seen in the wild.

正如官网所介绍,EventBus 推荐你使用注解处理器,避免在运行时使用反射来查找订阅方法,而是在编译的时候查找。

使用annotationProcessor来生成index

apply plugin: 'kotlin-kapt' // ensure kapt plugin is applied
 
dependencies {
    def eventbus_version = '3.2.0'
    implementation "org.greenrobot:eventbus:$eventbus_version"
    kapt "org.greenrobot:eventbus-annotation-processor:$eventbus_version"
}
 
kapt {
    arguments {
        arg('eventBusIndex', 'com.example.myapp.MyEventBusIndex')
    }
}

将项目rebuild生成MyEventBusIndex,例如:

/** This class is generated by EventBus, do not edit. */
public class MyEventBusIndex implements SubscriberInfoIndex {
    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

    static {
        //key -> 订阅者类对象;value -> 订阅信息
        SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

        putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("firstOnTestEvent", TestEvent.class, ThreadMode.MAIN),
            new SubscriberMethodInfo("messageEvent", MessageEvent.class, ThreadMode.BACKGROUND, 6, true),
        }));

        putIndex(new SimpleSubscriberInfo(SecondActivity.class, true, new SubscriberMethodInfo[] {
            new SubscriberMethodInfo("onTestEvent", TestEvent.class),
        }));

    }

    private static void putIndex(SubscriberInfo info) {
        SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
    }

    @Override
    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
        SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
        if (info != null) {
            return info;
        } else {
            return null;
        }
    }
}

然后将MyEventBusIndex实例传递给EventBus

EventBus eventBus = EventBus.builder().addIndex(new MyEventBusIndex()).build();

接着我们回过头看利用 Subscriber Index查找订阅方法findUsingInfo()

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    //从FindState池中获取一个FindState对象并进行初始化
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    //这里使用了一个while循环,表示子类查找完了,会去父类继续查找
    while (findState.clazz != null) {
        //去 index 文件中查找订阅信息
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            //遍历订阅方法
            for (SubscriberMethod subscriberMethod : array) {
                //检查是否已经添加了该订阅方法
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    //未添加,将找到的订阅方法添加到订阅方法列表中
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            //如果EventBusIndex返回的订阅方法为空,则使用反射方法来查找订阅方法
            findUsingReflectionInSingleClass(findState);
        }
        //查找父类
        findState.moveToSuperclass();
    }
    //返回订阅方法列表
    return getMethodsAndRelease(findState);
}

private SubscriberInfo getSubscriberInfo(FindState findState) {
    //subscriberInfo 不为空,表示已经找到了订阅信息,则这次需要往父类查找
    if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
        SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
        //确定此次查找的正是父类
        if (findState.clazz == superclassInfo.getSubscriberClass()) {
            return superclassInfo;
        }
    }
    //subscriberInfoIndexes 就是 EventBus.addIndex(MyEventBusIndex()) 加进来的
    if (subscriberInfoIndexes != null) {
        for (SubscriberInfoIndex index : subscriberInfoIndexes) {
            //就是执行 MyEventBusIndex 类中的 getSubscriberInfo 方法,来获取订阅信息
            SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
            if (info != null) {
                return info;
            }
        }
    }
    return null;
}

原理也很简单,就是在编译期间生成了index文件,这样我们就不需要在运行时通过反射来查找了,直接通过index文件来查找。另外,通过生成的index文件,我们也可以很清晰的看到我们声明的订阅方法分布情况。

使用 Subscriber Index注意事项:

  • @Subscribe 方法及其类必须是公共的。

  • 事件类必须是公共的。

  • @Subscribe 不能在匿名类中使用。

订阅

上面我们看了是如何找到订阅方法的,接着就是进一步看看订阅方法里面到底是如何实现订阅动作的。

/**
 * 事件类型与订阅对象列表的一个map集合
 *
 * key -> eventType 事件类型
 * value -> Subscription 订阅对象列表,这里 Subscription 是订阅者与订阅方法的一个封装类
 */
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
/**
 * 记录订阅者与其订阅的所有事件类型列表的一个map集合
 *
 * key -> 订阅者
 * value -> 订阅者订阅的所有事件类型列表
 */
private final Map<Object, List<Class<?>>> typesBySubscriber;
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    //通过订阅方法获得事件类型参数
    Class<?> eventType = subscriberMethod.eventType;
    //通过订阅者与订阅方法来构造出一个 订阅对象
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    //通过事件类型,找到 订阅对象的集合,这边是以 CopyOnWriteArrayList 的形式
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        //如果订阅对象集合为空,则表明还没有注册过订阅了该类型事件的订阅方法。
        //新建一个list,然后将 该事件类型与这个新建的list,放入 subscriptionsByEventType Map 中
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        //如果这个订阅对象集合中已经包含该newSubscription
        if (subscriptions.contains(newSubscription)) {
            //抛出异常来提示用户,该订阅者已经订阅了这个类型的事件
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    //遍历 订阅对象列表
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        //如果订阅方法中有声明优先级,则根据优先级,将该订阅方法加入到指定位置
        //否则,将该订阅方法加入到订阅对象列表的末尾
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    //通过订阅者来找到 其订阅的所有事件的类型列表 subscribedEvents
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    //如果该订阅者还没有任何订阅方法,即 subscribedEvents 为空
    if (subscribedEvents == null) {
        //新建一个列表,用来放这个订阅者订阅的所有的事件的类型
        subscribedEvents = new ArrayList<>();
        //并将该订阅者与这个新建的列表,放入到 typesBySubscriber map 中
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    //将该事件类型加入到 事件类型列表中
    subscribedEvents.add(eventType);

    //如果订阅方法支持粘性事件
    if (subscriberMethod.sticky) {
 //是否考虑事件类的层次结构,默认为true
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    //检查发送粘性事件
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
         //根据事件类型获取粘性事件
            Object stickyEvent = stickyEvents.get(eventType);
            //检查发送粘性事件
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}


注销

看完了注册,我们接着看看注销,注销的方法也很简单,一句代码就可以完成。

EventBus.getDefault().unregister(this);

我们深入看看 unregister方法。

public synchronized void unregister(Object subscriber) {
    //通过订阅者找到其订阅的所有事件类型列表
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        //遍历事件类型列表
        for (Class<?> eventType : subscribedTypes) {
            //通过事件类型,注销订阅者
            unsubscribeByEventType(subscriber, eventType);
        }
        //将该订阅者从typesBySubscriber map 中移除
        typesBySubscriber.remove(subscriber);
    } else {
        //log提示:在没有注册的前提下执行了注销动作
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    //通过事件类型来找到相关的订阅对象列表
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        //遍历订阅对象列表
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            if (subscription.subscriber == subscriber) {
                //从订阅对象列表中找到该订阅者,将其 active 状态改为 false,并从订阅对象列表中移除
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

发送事件

接着就是发送事件了,发送事件也很简单,同样一句话就可以搞定。

 EventBus.getDefault().post(new MessageEvent());

我们进一步看看源码。

public void post(Object event) {
    //PostingThreadState 是事件与发送状态的封装类
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    //将该事件添加到事件队列中
    eventQueue.add(event);

    if (!postingState.isPosting) {
        //检查是否在主线程中
        postingState.isMainThread = isMainThread();
        //设置为正在发送
        postingState.isPosting = true;
        //检查是否取消发送,如果取消发送,将抛出异常
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            while (!eventQueue.isEmpty()) {
                //遍历事件队列,将事件逐一发送
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            //重置发送状态
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    //获取事件的类对象
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    //是否考虑事件类的层次结构,默认为true
    if (eventInheritance) {
        //查找超类的所有事件类型
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            //将事件根据事件类型发送出去
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        //将事件根据事件类型发送出去
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    //没有发现订阅该类型事件的订阅对象,也就是没有存在订阅该类型事件的订阅方法
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

/**
 * 根据事件类型,将事件发送给订阅了该类型事件的订阅方法
 * @param event
 * @param postingState
 * @param eventClass
 * @return true->找到订阅了该类型事件的订阅方法;false->没有订阅该类型事件的订阅方法
 */
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    //订阅对象列表
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //根据事件类型,查询订阅该类型事件的订阅者列表
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    //如果订阅对象列表不为空,则将事件逐一发送给这些订阅者
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted;
            try {
                //将事件发送给订阅对象
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                //重置 PostingState
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    //根据订阅者选择的线程模式来选择使用那种线程方式来分发处理该事件
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            //直接利用反射调用订阅方法
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                //如果当前处于主线程,直接反射调用订阅方法
                invokeSubscriber(subscription, event);
            } else {
                //利用Handler切换到主线程,最终还是执行invokeSubscriber
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                //将事件入队列,在主线程上有序执行
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                //如果当前处于主线程中,将利用线程池,切换到子线程中处理,最终还是会调用invokeSubscriber
                backgroundPoster.enqueue(subscription, event);
            } else {
                //如果当前处于子线程,则直接在该子线程中处理事件
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            //无论处于什么线程,最终都是利用线程池,切换到子线程中处理,最终还是会调用invokeSubscriber
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}


void invokeSubscriber(Subscription subscription, Object event) {
    try {
        //利用反射调用订阅方法
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

简单的用一句话概括一下:根据事件的类型,找到对应的订阅对象列表并遍历列表,然后根据订阅对象的线程模式来决定是在哪个线程处理该事件。

发送粘性事件

如果你在发送普通事件前没有注册过订阅者,那么这时你发送的事件是不会被接收执行的,这个事件也就被回收了。 而粘性事件就不一样了,你可以在发送粘性事件后,再去注册订阅者,一旦完成订阅,这个订阅者就会接收到这个粘性事件。 让我们从源码中看看,是如何实现的吧!

EventBus.getDefault().postSticky(new MessageEvent("Hello everyone!"));

与发送普通事件不同,粘性事件使用postSticky()方法来发送。

/**
 * 用来存放粘性事件
 *
 * key -> 粘性事件的类对象
 * value -> 粘性事件
 */
private final Map<Class<?>, Object> stickyEvents;
public void postSticky(Object event) {
    //同步锁,将粘性事件存入 stickyEvents
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    //事件加入后,与普通事件一样,调用 post() 方法发送事件
    post(event);
}

用了一个stickyEvents集合来保存粘性事件,存入后,与普通事件一样同样调用post()方法。 ?? 嗯 ??,这时我就有疑问了,针对上面的使用场景,我先发送粘性事件,然后再去注册订阅,这时执行post方法去发送事件,根本就没有对应的订阅者啊,肯定是发送失败的。所以,细想一下,想达到这样效果,订阅者注册订阅后应该再将这个存入下来的事件发送一下。 回到register -> subscribe方法:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    //通过订阅方法获得事件类型参数
    Class<?> eventType = subscriberMethod.eventType;
    //通过订阅者与订阅方法来构造出一个 订阅对象
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    ...
    .省略部分代码.
 ...
    //如果订阅方法支持粘性事件
    if (subscriberMethod.sticky) {
        //是否考虑事件类的层次结构,默认为true
        if (eventInheritance) {
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                //eventType 是否是 candidateEventType 的父类
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    //检查发送粘性事件
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            //根据事件类型获取粘性事件
            Object stickyEvent = stickyEvents.get(eventType);
            //检查发送粘性事件
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
    //如果粘性事件不为空,发送事件
    if (stickyEvent != null) {  
        postToSubscription(newSubscription, stickyEvent, isMainThread());
    }
}

果真,订阅者在注册订阅方法中,如果当前订阅方法支持粘性事件,则会去stickyEvents集合中查件是否有对应的粘性事件,如果找到粘性事件,则发送该事件。


思考

设计模式

  • 单例模式:为了避免频繁创建销毁EventBus实例所带来的开销,这里采用DCL的形似来创建单例。

  • 建造者模式:基本上开源库都有很多参数可供用户配置,所以用建造者模式来创建EventBus实例就很合理。

EventBus 2 到 EventBus 3升级之路

EventBus 2 的订阅方法必须是以 onEvent 开头,因为需要通过名字来指定线程模式,如:onEventonEventMainThreadonEventBackgroundonEventAsync

这一点从EventBus V2.4.0源码中也可以很直观的看出:

EventBus V2.4.0源码:

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    ...省略部分代码...
 //返回类声明的方法
    Method[] methods = clazz.getDeclaredMethods();
    for (Method method : methods) {
        String methodName = method.getName();
        //找到以 onEvent 开头的方法
        if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {
            int modifiers = method.getModifiers();
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    //根据声明的名字来确定线程模式
                    //如:onEventMainThread 就指定为 ThreadMode.MainThread
                    String modifierString = methodName.substring(ON_EVENT_METHOD_NAME.length());
                    ThreadMode threadMode;
                    if (modifierString.length() == 0) {
                        threadMode = ThreadMode.PostThread;
                    } else if (modifierString.equals("MainThread")) {
                        threadMode = ThreadMode.MainThread;
                    } else if (modifierString.equals("BackgroundThread")) {
                        threadMode = ThreadMode.BackgroundThread;
                    } else if (modifierString.equals("Async")) {
                        threadMode = ThreadMode.Async;
                    }
    ...省略部分代码...
}

每个订阅方法都必须是以这样的形式来命名,显示对用于来说很不灵活,所以EventBus 3 则换成了通过注解的形式来指定订阅参数@Subscribe(threadMode = ThreadMode.*MAIN*, sticky = true, priority = 9)

既然换成了注解,但本质上他们都是通过反射来拿到方法名以及方法参数,而Java反射是很低效的。所以,为了提高效率,EventBus 3 还推出了 subscriber index,采用 kpt 技术在编译期间生成 Index文件,然后从Index文件中获取订阅方法信息,这样跟在运行期间采用反射来获取订阅方法相比,大大提升了运行效率。

为了进一步提升运行效率,库中还利用了缓存与对象池来循环利用对象实例,减少反复创建销毁对象带来的开销。 比如:循环利用 FindState 对象 来查找订阅方法。

/**
* 利用数组,循环利用 FindState 对象,来减少反复创建销毁对象带来的开销
*/
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

/**
* 从池中取出 FindState 对象
*/
private FindState prepareFindState() {
   //使用同步锁
   synchronized (FIND_STATE_POOL) {
       for (int i = 0; i < POOL_SIZE; i++) {
           FindState state = FIND_STATE_POOL[i];
           if (state != null) {
               //如果当前位置的对象不为空,return出去,并置空
               FIND_STATE_POOL[i] = null;
               return state;
         }
     }
 }
   //如果池是空的,则新建一个 FindState 对象
   return new FindState();
}


看完了源码,真的太棒了,作者能想到利用反射来查找订阅方法,真的太妙了。同时,随着库的升级,从一开始规定死订阅方法名,到使用注解来指定订阅参数,变得更加灵活好用,再进一步考虑效率,新增缓存以及对象池,推出 subscriber index,一直在改进,真的太棒了。


至此,关于 EventBus 的源码就分析结束了,如果你想查看所有注释,可以点击Github EventBus 所有注释 进行查看。


首页
关于博主
我的博客
搜索