Mq消息乱序如何测试?

为了方便大家理解,简单举例,实际情况肯定比我举例更复杂。

例如,有3笔采购单,id分别是是 12,13,14,并且有3个队列,0,1,2。那12对3取模是0,发往第一个队列;13对3取模是1,发往第二个队列;14对3取模是2,那就发往第三个队列,这样以来,由于采购单id,全局独一无二,如果用采购单id做hashkey,那么同一采购单的所有消息都会发往同一个队列里,生产者发送消息就保证了有序性。

当然了,生产者发送消息保证了有序性,还并不能完全保证消息全局有序性,消费者消费也要保证有序性。

消费者消费有序性

废话不多说,直接上代码:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest", consumeMode = ConsumeMode.ORDERLY)
@Slf4
public class ConsumeController implements RocketMQListener<String> {
    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;
    @SneakyThrows
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
        if (wmsPurchase.getStatus() == 1) {
            wmsPurchaseMapper.insert(wmsPurchase);
        } else {
            wmsPurchaseMapper.updateByPurchaseId(wmsPurchase);
        }
只看RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest", consumeMode = ConsumeMode.ORDERLY)这行代码,
其中RocketMQMessageListener有个属性consumeMode,默认是ConsumeMode.CONCURRENTLY ,多线程消费消息,我们要改成ConsumeMode.ORDERLY,单线程顺序接收消息;
这样就很好的保证了消费信息的有序性。

运行结果如下:

Mq消息乱序如何测试?

左上角是生产者,循环10000次,每循环一次就往wms发送两条消息。先发送已入库状态的采购单(新增采购单),再发送已废弃状态的采购单。

左下角是消费者,添加了ConsumeMode.ORDERLY属性,单线程消费消息,42行会对消息的状态进行判断,如果是1(已入库),就执行落库操作,如果不是,那就执行修改操作,根据采购单id,将采购单状态置为2(已废弃)。

预期情况下,wms_purchase会有10000条采购单数据,由于先发送已入库状态的采购单(新增采购单),再发送已废弃状态的采购单(修改采购单状态),那么这10000条采购单最终状态应该是已废弃状态。

由于生产者和消费者做好有序性防护措施,数据库中已废弃状态采购单有10000条,有效的保证了消息的有序性,符合预期结果,如右上角数据库查询结果。

总结

消息的有序性测试和解决方案,就分享到这里,大家一定要结合代码去理解,大部分的mq中间件原理都差不多,学会了rocketmq,再看看其他消息中间件,这样就很容易了。

上一页12下一页


留言