Mq消息乱序如何测试?

上篇文章主要分享了重复消费的原因,以及如何去测试这个场景,最后告诉大家,目前互联网项目关于如何避免重复消费的解决方案。

本篇文章主要分享消息乱序的原因、测试方法,以及目前互联网项目解决消息无序的常见方案。

为什么要保证消息消费有序?

还是供应链业务,拿采购单服务和仓储服务的业务背景来说:

采购员在采购服务的前端页面进行采购单下单操作,下单成功后,采购服务这边会保留一份采购单数据,状态置为1(待入库),然后发送一条mq给到wms 仓库服务,wms消费到采购单的消息,落入数据库wms_purchase表中,也把状态置为1(待入库)。

那么问题来了,采购员觉得刚才那笔采购单有问题,我想废弃,那么采购员在采购服务的前端页面进行采购单废弃操作,废弃成功后,采购服务这边会把刚才那笔采购单状态置为2(已废弃),然后发送一条mq给到wms 仓库服务,wms消费到采购单的消息,落入数据库wms_purchase表中,也把状态置为2(已废弃)。

在极端情况下,可能已废弃的采购单比已入库状态的采购单消息先到达wms,被wms消费成功,去修改数据库状态,发现这笔采购单没有啊,那就不修改,之后已入库的采购单姗姗来迟,被wms消费成功,落入库中。

但是,同一笔采购单,采购单服务这边状态是已废弃,wms服务却是已入库状态,那这就造成数据不一致了。

话不多说,上具体测试代码:

ProducerController生产者代码

@RequestMapping("/send1")
public String testSend1() {
    try {
        for (int i = 0; i <10000 ; i++) {
            long puuchaseId = new Date().getTime();
            //模拟向wms发送待入库状态的采购单
            WmsPurchaseDto  params = new WmsPurchaseDto(puuchaseId,"测试",1);
            mqTemplate.convertAndSend("fourbrothertopic", params);
            //模拟 向wms发送已废弃状态的采购单
            WmsPurchaseDto  param = new WmsPurchaseDto(puuchaseId,"测试",2);
            mqTemplate.convertAndSend("fourbrothertopic", param);
        }
        return "success";
    } catch (Exception e) {
        e.printStackTrace();
        return "fail";
    }
}

如代码中注释,循环10000次,往wms 服务发送20000条消息。

预期情况下,wms_purchase表中会有10000条采购单数据,由于先发送已入库状态的采购单(新增采购单),再发送已废弃状态的采购单(修改采购单状态),那么这10000条采购单最终状态应该是已废弃状态,但是现实情况就是,由于没有保障消息有序性,导致部分采购单最终状态还是已入库状态。

如下截图:

Mq消息乱序如何测试?

具体什么原因呢?下面我就详细聊聊消息无序的原因。

消息无序的原因是什么?

消息队列的基本数据结构就是队列,类似出水管道一样,队列的特性就是先进先出,最先进入队列的元素,会最先被拿出来,和栈相反。

如果同一个采购单的消息能发到同一个队列里,并且一个队列对应一个消费者中一个线程消费,那么消息将会是有序的。

但是现实情况就是在普通发送消息的模式下,为了提高Mq的tps,往往会生成多个队列,来提高吞吐量。

生产者发送消息无序性

生产者为了保证队列流量的均衡性,一般会采用轮询的方式将消息均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性,如截图:

Mq消息乱序如何测试?

再回到上述那个案例,假如此时RocketMq有三个队列,那么同一笔的采购单已入库和已废弃状态的消息,可能不在同一个队列里,由于每个队列消费的速度不一样,那么这就造成已废弃状态的消息先到,被消费者消费,已入库状态的消息却姗姗来迟的情况。

消费者消费消息无序性

上代码:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener<String> {
    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;
    @SneakyThrows
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
        if (wmsPurchase.getStatus() == 1) {
            wmsPurchaseMapper.insert(wmsPurchase);
        } else {
            wmsPurchaseMapper.updateByPurchaseId(wmsPurchase);
        }

就是一个普普通通的消费者,14行会对消息的状态进行判断,如果是1(已入库),就执行落库操作,如果不是,那就执行修改操作,根据采购单id,将采购单状态置为2(已废弃)。

但是极端情况下,还是会出现无序情况,比如已入库消息(线程)执行到14行,线程hang住,cpu将执行权交给下一个线程(已废弃消息),当执行到17行代码时,修改无效,因为这笔采购单,还未入库。

之前已入库消息(线程)被唤起,执行第15行代码,进行落库,这样还是会造成双边数据不一致的情况。

有什么方式可以避免呢?

怎么保证消息有序性?

生产者消息发送有序性

一个队列可以,但是这样mq的吞吐量将大打折扣,肯定不行的,所以我们一般都会在生产者发送消息的时候会对某个独一无二的标识进行hash取模,再决定发送到那个队列里。

废话不多说,直接上代码:

 

//生产者有序发送消息
    @RequestMapping("/sendOrder")
    public String sendOrder() {
        try {
            for (int i = 0; i <10000 ; i++) {
                long purchaseId = new Date().getTime();
                //模拟向wms发送待入库状态的采购单
                WmsPurchaseDto  params = new WmsPurchaseDto(purchaseId,"测试",1);
                mqTemplate.syncSendOrderly("fourbrothertopic",params,purchaseId+"");
                //模拟 向wms发送已废弃状态的采购单
                WmsPurchaseDto  param = new WmsPurchaseDto(purchaseId,"测试",2);
                mqTemplate.syncSendOrderly("fourbrothertopic",param,purchaseId+"");
            }
            return "success";
        } catch (Exception e) {
            e.printStackTrace();
            return "fail";
        }
    }
}
看这两行代码:
mqTemplate.syncSendOrderly("fourbrothertopic",params,purchaseId+"");
mqTemplate.syncSendOrderly("fourbrothertopic",param,purchaseId+"");

其中syncSendOrderly 方法中,第一个参数是topic,第二个参数是你的消息体,第三个参数,是你要做hash算法的key,消息发送到broker之前,生产者会更具你设置的hashkey进行hash 得到一个hash值,然后再对配置的队列进行取模。

上一页12下一页


留言