LegendShop是采用插件的形式进行编写,每个插件的功能是相互独立的,插件工程可以独立于主工程运行。每个插件工程之间的关系根据业务关系是有可能是相互依赖,也有可能是相互独立的。那插件工程和主工程、插件工程和插件工程之间是怎么通信的呢?答案是采用类似Spring的Event机制,LegendShop的自有一套消息通信的机制,采用监听者(Observer)模式来实现系统之间的解耦和异步调用。
参照spring的事件机制:
ApplicationContext基于Observer模式(java.util包中有对应实现),提供了针对Bean的事件传播功能。通过Application. publishEvent方法,我们可以将事件通知系统内所有的
ApplicationListener。事件传播的一个典型应用是,当Bean中的操作发生异常(如数据库连接失败),则通过事件传播机制通知异常监听器进行处理。
Spring的事件传播接口如下:
体系架构图如下:
消息:两台计算机之间传送的数据单位,例如字符串、文本等
消息队列:消息的容器,用于在消息传递的过程中保存消息的容器,充当消息源和目标之间的中间桥梁。队列的只要目的就在于提供路由保证消息的传递。
消息队列网络:是指能够相互之间发送消息的一组计算机。网络中不同的计算机在消息处理过程中扮演者不同的角色,有的是发送者,有的是接受者。
消息中间件
上面对消息队列有了一定的了解,那么消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,可以再分布式环境下扩展进程间的通信。
那么对于消息中间件,常见的角色大致也就有
1. Producer(生产者)
2. Consumer(消费者)
3. Broker(中转角色)
有这么几个主要的角色,那么消息中间件能为我们带来那些功能呢?
1.Message Priority
Producer把消息发送给Broker来存储,那么我们就可以再消息队列中对我们的消息来进行排序,实现不同的优先级。从而满足我们复杂的业务需求。
2.Message Order
消息排序,有的消息的处理是需要按照一定的顺序进行处理的,比如用户的创建订单、订单付款、订单完成。那么对于消费者也需要按照这个流程来消费,否则就没有意义了。
3.Message Filter
在消息对立中,也可以对我们的消息进行过滤,比如按照消息类型等条件来过滤
4.Message Persistence
消息的持久化,一般有以下几种方式
(1)持久化到数据库,比如MySQL
(2)持久哈到KV存储,比如Redis
(3)文件形式持久化
消息的持久化,防止了系统挂掉后,仍然能够从之前备份中恢复出来。
5.事务的支持
正如上面所谈到的订单的操作,因此消息中间件中也会提供对分布式事务的支持。
通过对目前市场上常用的消息中间件比较,LegendShop最终采用了RocketMQ作为系统的消息中间件。
LegendShop定义了一个事件接口 BaseEventId,每个Id代表一个事件类型。
根据事件的类型,我们又分为3种类型的事件:
3.1 事件初始化
所有的事件是以spring bean的方式进行定义的,在系统启动的瞬间系统会将所有的事件收集并初始化,见EventHome类的initBaseEventListener方法:
/** KEY:事件类的ID,值:所有监听此事件的处理类实例 */
private static Map
其中参数名为EventId,参数值为EventListener的列表,如果没有EventListener则发出去的事件没有人响应,如果列表有多个EventListener,则会按照配置中的顺序进行,每个EventListener定义如下:
3.2 事件发布
事件的封装类是SystemEvent,其中包括了BaseEventId和一个泛型的source,source可以是任意类型的参数。
发布事件的例子如下:
创建一个带有参数的Event,并传给EventHome的publishEvent方法。
3.3 事件处理方式
事件发出来之后会在事件总线中根据EventId找出对应的EventListener,并循环调用其中的onEvent方法。每个EventListener包括一到多个Processor。
Processor定义如下:
Processor有2个实现
1、ThreadProcessor 采用线程的方式处理,可以处理异步的事情。
2、BaseProcessor 采用同步的方式处理。
针对特定场景如果都用线程的话会导致系统过多线程影响系统性能。可以采用发送MQ消息的方式来实现异步操作。
3.4 MQ事件处理方式
通过对目前市场上常用的消息中间件比较,LegendShop最终采用了RocketMQ作为系统的消息中间件。RocketMQ是阿里巴巴的一个开源项目。
3.4.2 消息发送者
使用一个封装类MessageProducerClient作为消息的发送类。
首先在spring xml中增加如下配置:
其中的配置文件放在properties文件中,其中的配置文件放在properties文件中,
调用消息发送者的配置如下:
把消息转化为消息中间件格式的数据即可发送:
发送代码如下:
3.4.2 消息消费者
每个消息的消费者都监听MessageListenerConcurrently接口,接收方法如下:
从代码中可以得知,当消息到达消息消费者后,调用BaseMessageListenerConsumer的consumeMessage方法进行处理。如下:
最终会交给对应的messageExecutes处理,目前系统支持的messageExecutes有。
以EmailMessageExecute为例,
采用监控者方式处理可以减少代码的耦合程度,对应一些耗时的操作可以采用异步的方式来实现。目前系统支持采用线程的方式来做支持,一般异步使用应用在以下场景:
异步记录用户管理员的登录历史,更新缓存等,入下图的实现类: