消息总线重构之简化客户端

文章目录

这段时间对消息总线进行了再次重构。本次重构主要针对消息总线的pubsub组件以及对client的简化,同时谈谈对消息总线的一些想法。

##简化client的复杂度
之前的client需要同时连接两个分布式组件。消息总线的访问需要用户提供pubsuberHostpubsuberPort参数,因此它首先连接的就是pubsuber。而消息总线是基于RabbitMQ构建的,因此它必然还需要连接RabbitMQ。而之所以没有需要用户程序提供RabbitMQ Server的地址信息,是因为它是通过pubsuber间接获得的。

当时的想法是出于安全的角度考虑,不让用户直面MQ Server,而MQ的选择理论上可以有多种,这些对用户都是透明的。但作为一个后端组件,安全问题本不是最重要的关注目标,而替换MQ这样的成本无异于重写消息总线,这样的可能性也不大。除此之外这还带来了额外的复杂度与高失败率(当pubsuber与RabbitMQ两者中有其一失败,消息总线就将陷入混乱),结合到消息总线较多的长连接场景(比如,push模式的consume),一旦一个组件失效就可能导致客户程序的重启(为了重新初始化连接)。

让消息总线客户端只连接单一的RabbitMQ组件,可以大大降低失效的概率,而且RabbitMQ官方client提供的失效重试机制也能更好得发挥作用。

###用RPC获取授权信息
因为之前的pubsuber部分承担了授权信息数据源的角色,移除之前的pubsuber组件,那么就需要重新设计获取远程授权信息的方案。因为RabbitMQ正好提供了基于JSON的轻量级RPC机制,我们就可以通过RPC从后端获取授权信息,而让后端去跟pubsuber交互。之前就曾有过这个想法,后来在使用HBase时,发现其java client内部也有通过RPC跟Master节点交互,于是这次就确定用这种方式来实现。其实采用RPC的形式可以大大简化客户端的逻辑实现,而且也大大降低了升级成本。

##修改broadcast和pubsub的实现
之前pubsuber在客户端还起到了两个作用:实现广播机制实现实时控制。所以,如果要将pubsuber从client里移除,就要重新实现这两个功能。也就是说,要找到另一种支持实时push 的机制,考虑到其实RabbitMQ本身就可以实现长连接的即时消费功能,这里选择直接基于RabbitMQ本身来实现。

我们新建了一个内部使用的exchange来实现消息路由。跟其他topic类型的exchange不同,这里我们采用并不常见的headers 类型的exchange。

inner-headers-exchange

header类型:根据消息的消息头里包含特殊的key-value对来进行路由

考虑到我们需要重新实现上面两个功能,所以,我们将消息分成两类:eventnotice

  • event:内部控制消息
  • notice:广播消息

在发送这两个消息时,只需要在消息头中指定对应的header的key-value即可实现自动路由。这两种消息类型分别对应的是绑定在inner exchange后的队列的类型。

###如何节省RabbitMQ Server的资源
考虑到几乎每个client都有接受这两种类型的消息的需求,而我们为每个客户端在该exchange下创建两个queue多少有些过于浪费,最好的做法当然是在客户端使用的会话周期内建立两个临时队列,等客户端使用结束就可立即销毁队列回收资源。

得益于RabbitMQ丰富的特性,我们可以很容易做到这一点。当我们实例化客户端的时候,我们在内部创建两个临时且排他的队列。所谓临时且排他即一种特殊的队列,它只对创建它的连接可见,当创建它的连接断开或者消费者个数从大于零降到零时,该队列就会被删除,具备这种属性的队列几乎是为一次会话而创建的。

临时且排他属性是通过在创建队列时,指定队列的auto-delete以及exclusive属性同时为true来实现的。

这两个队列被创建后,当前客户端会作为消费者立即挂载在队列上等待eventnotice

发送端无需知晓上述两个队列的具体名称,它只需知道代理exchange以及inner exchange的routing-key即可,然后在发送消息的消息头中指定需要发送的是event还是notice

代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
InnerEventEntity eventEntity = new InnerEventEntity();
eventEntity.setIdentifier(channel);
eventEntity.setValue(new String(data));
eventEntity.setType("event");
String jsonObjStr = GSON.toJson(eventEntity);
Message eventMsg = MessageFactory.createMessage(MessageType.QueueMessage);
Map<String, Object> map = new HashMap<String, Object>(1);
map.put("type", "event");
eventMsg.setHeaders(map);
eventMsg.setContent(jsonObjStr.getBytes());
AMQP.BasicProperties properties = MessageHeaderTransfer.box(eventMsg);
ProxyProducer.produce(Constants.PROXY_EXCHANGE_NAME,
mqChannel,
EVENT_ROUTING_KEY_NAME,
eventMsg.getContent(),
properties);

##去除pubsuber的封装

###为什么要封装
在去除之前,我想谈谈当初为什么要封装。在最初封装消息总线的时候,我对redis和zookeeper都有所了解,它们都有一些共同的特性,比如:

  • 都能够以key-value的形式存取少量数据
  • 都能提供Pub/Sub的实时push变更功能

这是消息总线客户端的pubsuber需要的,但为了提供可选性,我在这两个特性上做了一层封装,可以使得这种配置变更组件无论选择哪一个,无需修改代码,两者都可适配。这是当初封装的目的。

###为什么要去除封装
首先,去除封装是回到了zookeeper而排除了redis。这么除了发现太多的开源软件都在使用zookeeper来实现这个需求,除了发现这是zookeeper的专长,而redis只是能提供这些功能而已。除了这些,最关键的问题是我发现当涉及到命名服务的特性时,redis将变得不再适合。

在分布式的服务中,很可能会存在多个组件,而这些组件跟应用之间会存在一些逻辑关系,而不都是简单的扁平关系。很多情况下,我们需要将一些关系构建成树状结构。比如,现在消息总线只变成了平台中的一个组件,我们需要在配置上体现这种关系,所以可能会由原先的扁平关系修改为如下图这样的形式:

zk-name-service

在这种类似于文件系统的树状结构下,要实现诸如获取子节点的变更事件这样的联动行为redis将无能为力。这是因为redis的pubsub功能,只提供在key-value(String)类型上。也就是说,它的value只能是一级关系。当然,为了表示多层关系,你是可以在key里以“.”进行区分,比如”app1”,”app1.message”,虽然你能知道他们之间的关系,但它们在技术层面上是一样的,无法产生联动变更功能。所以在一些场景下,zookeeper是无法取代的。

##一些思考

###拓扑的权衡
消息总线最初的目标主要偏向消息传递。但在实现中的添加了一些额外的特性,比如之前的RPC功能。其实,如果单从技术层面上来看消息总线就是收发消息。但如果你将收发消息的主体包含进来(也就是发送者消费者)会有一些新的定位。如果有一些消费者做的事情是很通用的基础的很多人都需要的或者纯技术性的。那么处理这类消息的消费者就是在提供服务。比如,下面这些:

  • 将数据存入ElasticSearch
  • 发送短信
  • 发送邮件
  • 往移动端推送消息

消息总线可以直接提供这些服务以供第三方申请使用,当然如果带上语义来看,RPC的服务端也是一种服务(只不过是同步的服务而已),其他队列也可能在提供某种服务,只是它们的专有性更强,所以消息总线也具有提供服务的能力以及构建服务的基础。

所以我一直在考虑,整个路由图应该是这样构建:

now_router_topology

还是这样来构建:

thinking_router_topology

###消息租赁
因为消息的通信模型生来就具有异步性。那么消息的消费时机,对于消息总线本身而言是无法知晓的,这就产生了消息长期堆积压垮消息总线的可能。所以,可能会考虑将消息的永久驻留改为按序驻留。这取决于业务,有些业务消息是具有时效性的,这样的消息如果隔个几周还没有被消费掉,那么它存在的意义几乎没有了,而它却白白占据着总线服务器的内存或磁盘资源,甚至这些消息将永远得不到消费也有可能。

所谓消息租赁,其实是将现在的永久留存的模式改为临时驻留队列的模式,具体消息能够存活多久,这取决于给消息设置的TTL(time to live)时间,而对于TTL的评估来自于队列申请方根据自身的业务特点而定。当然,TTL可以也可以设置为永久,这需要接收审核。

###proxy的必要性
对于一个组件的扩展有很多种模式,比如proxysmart clientplugin。消息总线封装自RabbitMQ,其实RabbitMQ官方是带plugin模式的扩展机制的,无奈语言所限,力所不及。

而对于proxysmart client的两种模式对比来看,可算是各有所长,优势互补。比如在侵入性上proxy侵入性更小,在掌控性上smart client掌控力更强。这里就不再过多比较。

现在对RabbitMQ的扩展采用的是smart client的形式。但这种方式总有它的受限之处,当你处于一个分布式的环境中,服务器上的资源在很多时候是被共享的(比如RabbitMQ里的队列,它可以同时被多个client消费),你可以将它看成是多个分支河流汇聚的场景,分支是没办法掌控全部的,你只有依靠Proxy Server。

我曾看到携程开源的消息系统是在kafkamysql前端做了一个proxy(它们称之为broker)。会不会构建一个proxy,看进度吧。但proxy的存在缺失能带来非常多的好处。