莫方教程网

专业程序员编程教程与实战案例分享

Spring 嵌入式轻量消息队列

为 Spring-boot 提供消息队列能力的 starter, 并提供了 VM 线程的轻量级实现。

项目地址:
https://github.com/wangyuheng/embedded-mq-spring-boot-starter

什么是消息队列

消息队列是用于存放消息的容器,可供消费者取出消息进行消费。

观察者模式

观察者(Observer)模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。这种模式有时又称作发布-订阅模式、模型-视图模式,它是对象行为型模式。

Observer 本来的意思是观察者,但具体的实现中并不是主动去观察,而是被动的接收来自 Subject 的通知,所以更合适的名字应该是"消息投递"。

而且通知的模式还存在一个弊端: 通知及多个 ConcreteObserver 的消费程序仍在一个同步线程内,所以只是代码结构层面的解耦,底层还是一个事务内。

为了解决这个弊端,将消息的发送及 N 个消费程序拆分为 N+1 个事务,所以引入消息队列用于存储 Subject。

领域模型设计

  • 罗列领域概念
  • 梳理交互关系

代码实现

  1. LinkedBlockingQueue 作为存储 Message 的容器。
  2. Store 用于存储消息。为了兼容多个 Consumer,每个 Consumer 指定一个唯一标识作为 Partition Key,对应唯一的一个 LinkedBlockingQueue。 e.g. Map<Partition, LinkedBlockingQueue<Message>> messageQueueMap = new ConcurrentHashMap<>();
  3. Producer 通过 Transport 将消息发送只多个 Partition Key 的 LinkedBlockingQueue 队列中
  4. 每个 Consumer 开启一个线程,通过轮询方式从 LinkedBlockingQueue 队列中消费消息。

代码片段

  • VmStore
  • Transport
  • ConsumerCluster

使用 LinkedBlockingQueue 却未使用 take 方法的原因是为了灵活控制消费线程的启停。

Spring 集成

为了方便使用,通过 annotation 的形式与 Spring 框架进行集成。

示例

  • Consumer
  • Producer

代码实现

其他

  1. 如何跨应用消费?通过 MySQL、Redis 等公共存储替换 Store 及 Transport 实现。MySQL 需要考虑行锁。
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言

    滇ICP备2024046894号-1