Mq消息重复如何测试?

这样就很好的保证了幂等性,也避免了大量的日志报错。伪代码如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {

    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;

    @Autowired
    private RedisTemplate redisTemplate;
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
        //首先判断redis里面是否有这条采购单数据,通过PurchaseId查询,有数据,则直接放过不做处理
       if (redisTemplate.opsForValue().get(wmsPurchase.getPurchaseId().toString())==null){
           //然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理
           if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){
               //数据库没有数据,就进行插入操作,
               if (wmsPurchaseMapper.insert(wmsPurchase)>0){
                   //插入成功就把purchaseid塞进redis里,过期时间是72小时
                   redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);
               }
           }else {
               //能走到这个判断分支,说明缓存里的采购单数据已经失效,如果还有消息重复消费
               //那就再放入缓存一次,72h过期
               redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId(),wmsPurchase.toString(),72, TimeUnit.HOURS);
               log.info("数据库已保留该数据");
               // 触发重复消费告警机制
           }
       }else {
           log.info("缓存已保留该数据");
            // 触发重复消费告警机制
       }
    }
}

思路很简单,如代码中注释。当然这种方法也有缺点,就是过于依赖redis,有些系统没有使用redis组件,那么还得维护一套redis组件,并且还得保证redis集群高可用。那项目只有mysql,能不能依靠数据库去维护保证幂等性呢?当然可以!

3、还有一种方法叫去重表+唯一索引,顾名思义就是另外维护一张表,记录已经消费的采购单数据,其实和上述方法差不多,上述方法查询缓存,取重表查询数据库取重表。

伪代码 如下:

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {

    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;

    @Autowired
    private UniquePurchaseMapper uniquePurchaseMapper;

    @Autowired
    private RedisTemplate redisTemplate;
    @SneakyThrows
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
        log.info("映射后实体消息"+ JSON.toJSONString(wmsPurchase));
        if (uniquePurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId().intValue())  == null){
            if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){
                //数据库没有数据,就进行插入操作,
                if (wmsPurchaseMapper.insert(wmsPurchase)>0){
                    //插入成功就把purchaseid塞进unique_purchase
                    UniquePurchase  uniquePurchase =   new UniquePurchase();
                    uniquePurchase.setPurchaseId(wmsPurchase.getPurchaseId().intValue());
                    log.info("插入取重表消息:"+ JSON.toJSONString(uniquePurchase));
                    uniquePurchaseMapper.insert(uniquePurchase);
                }
            }else {
                log.info("数据库已保留该数据");
                //自动触发告警机制
            }
        }else {
            log.info("取重表已有这条采购单数据");
        }
 }

代码已上传至gitee,感兴趣可以自行阅读。

上述方式在查询取重表时,并发不安全,极端情况下还是会触发唯一索引错误,比如说,消费者要消费大量消息(线程),执行上述代码,A线程执行完23行,挂起了,cpu把执行权给了B线程,B执行到25行并插入成功,那么这时A线程被唤起,也执行到了23行,结果触发了唯一索引错误。那怎么避免呢?

我们可以让所有线程别并发执行,串行执行,那就用到redis的分布式锁技术。

4、分布式锁+uniquekey

伪代码如下

@Component
//mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest
@RocketMQMessageListener(topic = "fourbrothertopic", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener {

    @Autowired
    private WmsPurchaseMapper wmsPurchaseMapper;
    @Autowired
    private RedissonClient redisson;
    @Autowired
    private UniquePurchaseMapper uniquePurchaseMapper;
    @Autowired
    private RedisTemplate redisTemplate;
    @SneakyThrows
    @Override
    public void onMessage(String message) {
        log.info("------- Consumer: {}", message);
        //将message消息映射成WmsPurchase实体
        WmsPurchase wmsPurchase = JSONObject.parseObject(message, WmsPurchase.class);
// 注入redisson
// 获取锁对象
        RLock lock = redisson.getLock("lockName");
        try {
            // 1. 最常见的使用方法
            //lock.lock();
            // 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁
            //lock.lock(10, TimeUnit.SECONDS);
            // 3. 尝试加锁,最多等待2秒,上锁以后8秒自动解锁
            boolean res = lock.tryLock();
            if (res) { //成功
                    //然后再使用PurchaseId查询数据库,有数据,则直接放过不做处理
                    if (null == wmsPurchaseMapper.selectByPurchaseId(wmsPurchase.getPurchaseId())){
                        //数据库没有数据,就进行插入操作,
                        if (wmsPurchaseMapper.insert(wmsPurchase)>0){
                            //插入成功就把purchaseid塞进redis里,过期时间是72小时
                            redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);
                        }
                    }else {
                        redisTemplate.opsForValue().set(wmsPurchase.getPurchaseId().toString(),wmsPurchase.toString(),1, TimeUnit.HOURS);
                        log.info("数据库已保留该数据");
                        //自动触发告警机制
                    }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            RLock lockName = redisson.getLock("lockName");
            if (lockName.isLocked()) {
                if (lockName.isHeldByCurrentThread()) {
                    lockName.unlock();
                }
            }
        }
}

这种也是比较常见的一种,缺点也很明显,在高并发,大请求量的场景下,所有线程串行执行,处理效率势必会降低。当然了,技术没有好坏,只有合不合适。如果你的项目并发量一般,可以尝试使用上述方法。

具体代码demo已上传至gitee平台,地址如下:

https://gitee.com/lv1792017548/rocketmq-demo.git

总结

本文主要分享了如何测试mq消息队列重复性消费,以及避免重复消费常见的解决方案,代码已上传至gitee,下一篇文章主要分享怎么测试消息队列的消息无序性。敬请期待!

上一页12下一页


留言