消息总线重构之EventBus



本人博客文章如未特别注明皆为原创!如有转载请注明出处:http://blog.csdn.net/yanghua_kobe/article/details/46699205

最近花了不少时间对消息总线进行了重构。重构的重点是在消息总线中加入了Guava的EventBus,并应用于以下两个场景:

(1)改进广播通知

(2)业务逻辑串联,用事件驱动替代责任链模式



# EventBus简介

EventBus是Google的开源项目Guava里的一个组件,有兴趣的人可以看我前不久的一篇博文解读。总得来说,EventBus是观察者模型的实现,利用它你既可以实现观察者模型的业务场景,还可以基于它的事件驱动机制来实现应用程序内组件之间的解耦与通信。

# 改进广播通知

广播通知是消息总线提供的功能之一。在重构之前,客户端接收广播通知是通过消息总线客户端SDK的一个API来实现的:

public void setNotificationListener(IMessageReceiveListener notificationListener);

但之前的广播通知设计并不合理。它受限于之前的基于RabbitMQ的树形路由拓扑模型:



这个拓扑结构中有些只发送不接受的“虚拟队列”并不是真实存在的队列。这些消息生产者无法接收消息,这是非常大的一个缺陷。我一直在想办法重新设计它,之前的关注点都集中在RabbitMQ上,想在MQ上找到一种解决方案,但这很难,除非摈弃“虚拟队列”的设计。于是,我将关注点转移到消息总线中另一个可以提供pub/sub的组件上(后称之为pubsuber),该组件目前可以是redis也可以是zookeeper。因为每个client(更准确得说是每个创建client的pool)都会以长连接的方式挂在pubsuber上。所以,它本身就是一个很不错的广播渠道,并且因为它脱离RabbitMQ单独实现,跟虚拟队列的设计不相冲突。

上面的思路没有问题,但语义与实现上并不对等。通知的收发从语义上来说应该是Client API级别的。而PubSuber接收到的广播事件却是Pool级别的,并不依赖client(Pool创建PubSuber以及Client)。我们不应该在Pool层面上接收广播事件。因此这里存在一个事件的截获与二次转发的过程。这是我们针对EventBus的第一个应用场景:用它转发PubSuber接收到的广播通知给client。

PubSuber接收到广播消息之后通过EventBus 作二次转发:

    public class NotifyHandler implements IPubSubListener {

@Override
public void onChange(String channel, byte[] data, Map<String, Object> params) {
NotifyEvent notifyEvent = new NotifyEvent();
Message broadcastMsg = pubsuberManager.deserialize(data, Message.class);
if (broadcastMsg != null && broadcastMsg.getMessageType().equals(MessageType.BroadcastMessage)) {
notifyEvent.setMsg(broadcastMsg);
getComponentEventBus().post(notifyEvent);
}
}
}

事件发布完了之后,EventBus会将其分发到该事件的订阅者处理,这里需要注意的是创建的EventBus是一个异步EventBus的实例,它在一个独立的线程上执行事件处理器方法。而所有的事件处理器都需要通过Client进行注册:
    public void registerEventProcessor(Object eventProcessor) {
componentEventBus.register(eventProcessor);
}

以上这一步,就将消息通知跟Client关联起来。而且对多个client注册不同的事件处理器,还可以起到多播的作用(原来在Pool级别是一个事件,现在在Client级别,多个Client可以应对若干个处理器)。

EventBus通过注解来解析事件处理器与事件之间的关联关系,更多的实现细节,请参考之前的文章。下面就是订阅广播通知的方式:

    public static class NotificationEventProcessor {

@Subscribe
public void onNotification(NotifyEvent event) {
logger.info("onNotification");
Message message = event.getMsg();
assertNotNull(message);
assertEquals("test", new String(message.getContent(), Constants.CHARSET_OF_UTF8));
}

}

仅仅需要一个注解即可。当然最后别忘记移除注册,如果你不再希望接收通知的话,整个过程如下:
    public void testBroadcast() throws Exception {
String secret = "kljasdoifqoikjhhhqwhebasdfasdf";

Message msg = MessageFactory.createMessage(MessageType.BroadcastMessage);
msg.setContentType("text/plain");
msg.setContentEncoding("utf-8");

msg.setContent("test".getBytes(Constants.CHARSET_OF_UTF8));

NotificationEventProcessor eventProcessor = new NotificationEventProcessor();
client.registerEventProcessor(eventProcessor);

client.broadcast(secret, new Message[]{msg});

TimeUnit.SECONDS.sleep(10);

client.unregisterEventProcessor(eventProcessor);
}


这样,原先的拓扑结构就不再包含广播通知的实现了:



# 事件驱动替代责任链模式

客户端跟消息总线的一次通信,需要经历多个业务逻辑环节。这些业务逻辑有些有顺序关系,有些没有。我们希望将逻辑进行拆分、自由组合搭配并且能够互不干扰得扩展。在此之前的实现基于责任链模式,有一点问题:当长连接消费时,因为真正的消费通常是chain的最后一个调用(方式是:阻塞,一直等到超过设定的时间),所以整个递归链都阻在最后一个调用。而递归调用的实现是基于栈,因此如果最后一个调用不返回(很多时候这种长连接的生命周期跟应用的生命周期相同),整个调用链以及调用中的局部变量一直都不被释放,某种程度上这有点像内存泄露了。这个问题,我曾在之前的文章中探讨过,但一直没找到太好的解决方法,除非我们放弃使用责任链模式。

但基于同步事件驱动的方式似乎能起到跟责任链模式一样的效果。它通过事件分发来驱动业务逻辑调用。将chain的每一个调用都看做是一个事件处理方法,一个单向通信逻辑(比如produce)对应一个事件处理器(produceEventProcessor)。因为此处的EventBus是同步的(事件处理逻辑在调用线程上执行,执行顺序跟事件发生的顺序相同),所以只要编排好事件顺序,一一触发事件,事件处理器也就会一一按照事件触发的顺序执行。

我们以消息生产者来看一下通过EventBus改造后的业务逻辑是什么样子。

首先我们定义一个生产消息的事件处理器:

public class ProduceEventProcessor extends CommonEventProcessor {

}

为了使得逻辑关系紧凑,我们将事件以内部类的方式定义在生产消息的事件处理器内部:
//region events definition
public static class ValidateEvent extends CarryEvent {
}

public static class PermissionCheckEvent extends CarryEvent {
}

public static class ProduceEvent extends CarryEvent {
}
//endregion

定义每个事件的事件处理方法:
@Subscribe
public void onValidate(ValidateEvent event) {
}

@Subscribe
public void onPermissionCheckEvent(PermissionCheckEvent event) {
}

@Subscribe
public void onProduce(ProduceEvent event) {

}

在client被调用以生产消息时,首先创建该事件处理器的实例,然后向EventBus注册事件处理器:
        EventBus carryEventBus = this.getContext().getCarryEventBus();

//register event processor
ProduceEventProcessor eventProcessor = new ProduceEventProcessor();
carryEventBus.register(eventProcessor);

只有注册了该实例,在发布事件时,才会触发该实例的事件处理方法。注册完成该实例之后,需要初始化事件对象,这里事件之间以及事件处理器之间没有必然联系,我们以一个消息上下文对象的引用来让它们以共享“内存”的方式进行数据交换:
        //init events
ProduceEventProcessor.ValidateEvent validateEvent = new ProduceEventProcessor.ValidateEvent();
ProduceEventProcessor.MsgBodySizeCheckEvent msgBodySizeCheckEvent = new ProduceEventProcessor.MsgBodySizeCheckEvent();
ProduceEventProcessor.PermissionCheckEvent permissionCheckEvent = new ProduceEventProcessor.PermissionCheckEvent();
ProduceEventProcessor.MsgIdGenerateEvent msgIdGenerateEvent = new ProduceEventProcessor.MsgIdGenerateEvent();
ProduceEventProcessor.MsgBodyCompressEvent msgBodyCompressEvent = new ProduceEventProcessor.MsgBodyCompressEvent();
ProduceEventProcessor.ProduceEvent produceEvent = new ProduceEventProcessor.ProduceEvent();

validateEvent.setMessageContext(ctx);
msgBodySizeCheckEvent.setMessageContext(ctx);
permissionCheckEvent.setMessageContext(ctx);
msgIdGenerateEvent.setMessageContext(ctx);
msgBodyCompressEvent.setMessageContext(ctx);
produceEvent.setMessageContext(ctx);

准备工作就绪,现在开始发布事件。这里事件的发布顺序跟执行顺序是一致的,所以我们需要根据业务逻辑来编排事件,以形成原先的串联调用的效果:
//arrange event order and emit!
carryEventBus.post(validateEvent);
carryEventBus.post(msgBodySizeCheckEvent);
carryEventBus.post(permissionCheckEvent);
carryEventBus.post(msgIdGenerateEvent);
carryEventBus.post(msgBodyCompressEvent);
carryEventBus.post(produceEvent);

这就是重构的整个过程。我们发现这里不再存在链式(递归)调用了,各个事件处理器方法之间也没有耦合性,它们通过MessageContext来共享上下文。如果我们要增加新的业务逻辑,如何扩展?四步走:

(1)定义一个新事件对象

(2)定义一个新的事件处理器方法

(3)实例化该事件对象

(4)根据需要插入原先的编排过的事件中去并发布该事件

跟原先的事件没有任何关系。

更多实现,可以查看项目源码:banyan



版权声明:本文为博主原创文章,未经博主允许不得转载。