activemq延迟消费_浅析延迟消息的实现方案

发布时间:2021-10-27 07:25:02

生产者把消息发送到消息队列中以后,并不期望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。延迟消息的应用场景其实是非常的广泛,比如以下的场景:


? 网上直播授课时,在课程开始前15分钟通知所有学生准备上课。
? 订单提交成功后1个小时内未支付,订单需要及时关闭并且释放对应商品的库存。
? 用户超过15天未登录时,给该用户发送召回推送。
? 工单提交后超过24小时未处理,向相关责任人发送催促处理的提醒。
针对延迟消息,本文向大家分享五种实现方案,下面我们就来逐一讨论各种方案的大致实现和优缺点。
1 Redis
在Redis中,有一种有序集合(Sorted Set)的数据结构,在有序集合中,所有元素是按照其 Score 进行排序的。
我们可以把消息被消费的预期时间戳作为Score,定时任务不断读取Score大于当前时间的元素即可。
基本流程如下:
1. 调用API,传入执行时间、消息体等数据。
2. 生成唯一key,把消息体数据序列化后存入Redis的String结构中。
3. 把key和执行时间的时间戳存入Redis的有序集合结构中,有序集合中不存储具体的消息体数据,而是存储唯一的key。
4. 定时任务不断读取时间戳最小的消息。
5. 如果时间戳小于当前时间,将key放入作为队列的Redis的List结构中。
6. 另外一个定时任务不断从队列中读取需要消费的消息的key。
7. 根据key获取消息体数据,对消息进行消费。
8. 如果消费消息成功,删除key对应的消息体数据。
9. 如果消费消息失败,重新存入key和时间戳(加60秒)。
具体方案如下图:

为了避免一个有序集合中存储过多的延时消息,存入操作以及查询操作速度变慢的问题,可以建立多个有序集合,通过哈希算法把消息路由到不同的有序集合中去。
优点
简单实用,快速落地。
缺点
? 单个有序集合无法支持太大的数据量。
? 定时任务不断读取可能造成不必要的请求。
所以,Redis方案并不是一个十分成熟的方案,只是一个支持小消息量可以快速落地的方案。2 RabbitMQ
RabbitMQ本身是不支持延迟消息功能的,一般的做法,是通过
最大生存时间
(Time-To-Live)和
死信交换机
(Dead Letter Exchanges)两个特性模拟出延迟消息的功能。消息超过最大生存时间没有被消费就变成一条死信,便会被重新投递到死信交换机,然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。

不过,在RabbitMQ的3.5.8版本以后,我们就可以使用官方推荐的
rabbitmq delayed message exchange
插件很方便地实现延迟消息的功能。
安装插件
首先去官方插件列表页面
?https://www.rabbitmq.com/community-plugins.html
下载rabbitmq_delayed_message_exchang插件,然后复制到RabbitMQ每个节点的plugins目录中。使用命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
一旦插件被启用,我们就可以开始使用它了。
使用示例
安装该插件后会生成支持延迟投递机制的Exchange类型:
x-delayed-message
。接收到该类型的消息后不会立即将消息投递至目标队列中,而是存储在mnesia表中,检测消息达到可投递时间时再投递到目标队列。
使用延迟消息时,需要先声明一个
x-delayed-message
类型的交换器机:

Map args = new HashMap<>();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("OneMoreTopic", "*");// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s %s Receive New Messages:%n"
, sdf.format(new Date())
, Thread.currentThread().getName());for (MessageExt msg : msgs) {
System.out.printf(" Msg Id: %s%n", msg.getMsgId());
System.out.printf(" Body: %s%n", new String(msg.getBody()));
}// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});// 启动消费者实例
consumer.start();
System.out.println("Consumer Started.");
}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");// 启动Producer实例
producer.start();
Message msg = new Message("OneMoreTopic"
, "DelayMessage", "This is a delay message.".getBytes());//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"//设置消息延迟级别为3,也就是延迟10s。
msg.setDelayTimeLevel(3);// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达
System.out.printf("%s Send Status: %s, Msg Id: %s %n"
, sdf.format(new Date())
, sendResult.getSendStatus()
, sendResult.getMsgId());// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore
.getScheduleMessageService().getMaxDelayLevel());
}// 获取延迟消息的主题,// 其中RMQ_SYS_SCHEDULE_TOPIC的值为SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;// 根据延迟级别获取延迟消息的队列Id,// 队列Id其实就是延迟级别减1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// 备份真正的主题和队列Id
MessageAccessor.putProperty(msg
, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg
, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 设置延时消息的主题和队列Id
msg.setTopic(topic);
msg.setQueueId(queueId);
Integer level = entry.getKey();// value为延迟级别对应的毫秒数
Long timeDelay = entry.getValue();// 根据延迟级别获得对应队列的偏移量
Long offset = this.offsetTable.get(level);// 如果偏移量为null,则设置为0if (null == offset) {
offset = 0L;
}if (timeDelay != null) {// 为每个延迟级别创建定时任务,// 第一次启动任务延迟为FIRST_DELAY_TIME,也就是1秒this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}// 延迟10秒后每隔flushDelayOffsetInterval执行一次任务,// 其中,flushDelayOffsetInterval默认配置也为10秒this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 持久化每个队列消费的偏移量if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore
.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
, delayLevel2QueueId(delayLevel));

如果没有获取到对应的消息队列,则在
DELAY_FOR_A_WHILE
(默认为100)毫秒后再执行任务。如果获取到了,就继续执行下面操作:

// 根据消费偏移量从消息队列中获取所有有效消息
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)long countdown = deliverTimestamp - now;// 省略部分代码...
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);

清除了消息的延迟级别,并且恢复了真正的消息主题和队列Id,重新把消息发送到真正的消息队列上以后,消费者就可以立即消费了。
由于篇幅限制,其中源码的细节不做过多展开,有兴趣的小伙伴可以去GitHub
https://github.com/apache/rocketmq
上下载源码仔细阅读。
定制化方案
经过以上对源码的分析,可以总结出延迟消息的实现步骤:
1. 如果消息的延迟级别大于0,则表示该消息为延迟消息,修改该消息的主题为
SCHEDULE_TOPIC_XXXX
,队列Id为延迟级别减1。
2. 消息进入
SCHEDULE_TOPIC_XXXX
的队列中。
3. 定时任务根据上次拉取的偏移量不断从队列中取出所有消息。
4. 根据消息的物理偏移量和大小再次获取消息。
5. 根据消息属性重新创建消息,清除延迟级别,恢复原主题和队列Id。
6. 重新发送消息到原主题的队列中,供消费者进行消费。
概括起来如下图:

在CommitLog中,我们可以根据自定义的延迟时间选择一个最大的延迟级别,比如:延迟15分钟消费的消息,那么最大的延迟级别就是10分钟。在ScheduleMessageService中,判断消息是否真的到了消费的时间,如果已到了消费的时间,则恢复原主题和队列Id;如果未到消费的时间,则选择最大延迟级别重新修改主题和队列ID。如下图:

消息主体以及元数据都存储在CommitLog中,队列中只存放了在CommitLog中的起始物理偏移量、消息大小和消息Tag的哈希值,虽然需要重新把消息放入队列中,但空间浪费还是比较有限的。
优点
分布式、高吞吐量、高性能、高可靠,支持自定义延时时间。
缺点
定制RocketMQ不易维护,无法升级新版本。总结
从延迟消息的概念和应用场景出发,我们逐一讨论了五种不同的实现方案,分别是:
? 使用Redis的Sorted Set结构。
? 使用RabbitMQ的rabbitmq delayed message exchange插件。
? 使用ActiveMQ的5.4及以*姹镜难映傧⒐δ堋
? 使用RocketMQ仅支持特定级别的延迟消息。
? 定制RocketMQ,以重新计算延迟级别的方式实现自定义延时。
以上每个方案都是各自的优点和缺点,所以说延迟消息没有一个放之四海而皆准的方案,需要根据数据规模和业务需求的实际情况才能确定最适合的方案。

相关文档

  • 显示器太大怎么缩小显示画面
  • 励志的早安心语摘抄
  • 当手掌
  • 高科技公司合作协议书范本
  • 上海高二有机化学知识点复习
  • 重置手机和恢复出厂设置一样么
  • 某某同志德能勤绩廉考察材料
  • 南京市劳动合同范本专业版
  • wps演示如何制作组织结构图
  • Linux 下 rz/sz命令安装及使用
  • 独具慧眼(转载)
  • 【Activiti工作流】(二)Activiti工作流初体验
  • 关于 List<Map<String, Object>>存值的问题
  • 【经济社会调研报告论文】经济社会发展情况调研报告
  • 第八篇论文读后总结-使用合成梯度的解耦神经接口
  • python3对MySQL的数据备份的详细介绍
  • 如何做清蒸鱼?各种清蒸鱼的做法
  • 幼儿园小班上学期科学教案《有趣的海绵》含反思
  • 0pp手机没有计算机怎么办
  • Shell 循环语句、函数及数组
  • V-rep使用手册目录
  • 简爱读书笔记800字摘抄
  • python写斗地主游戏_python斗地主
  • 学员兵是什么兵
  • 女装服装广告词语
  • 崇武古城怎么去_崇武古城交通信息
  • 励志故事四分钟
  • 食用玛咖有哪些相关功效
  • 异步SOCKET编程-发送和接收数据[转] 异步SOCKET编程-发送和接收数据[转]
  • 引以为豪的豪是什么意思自豪的豪是什么意思谢谢了
  • 猜你喜欢

    电脑版