## 集群名 brokerClusterName = RaftCluster ## broker组名,同一个RaftClusterGroup内,brokerName名要一样 brokerName=RaftNode00 ## 监听的端口 listenPort=30911 ## 你设置的NameServer地址和端口 namesrvAddr=172.30.34.10:9876;172.30.35.37:9876;172.30.35.30:9876 storePathRootDir=/tmp/rmqstore/node00 storePathCommitLog=/tmp/rmqstore/node00/commitlog enableDLegerCommitLog=true dLegerGroup=RaftNode00 ## 例如:dLegerPeers=n0-服务器1的IP:40911;n1-服务器2的IP:40912;n2-服务器3的IP:40913 dLegerPeers=n0-172.30.34.10:40911;n1-172.30.35.37:40912;n2-172.30.35.30:40913 ## must be unique ## n0 n1 n2 分别是broker1,broker2,broker3 的 dLegerSelfId dLegerSelfId=n0 sendMessageThreadPoolNums=16 #服务器的ip brokerIP1=172.30.34.10
配置成功,执行以下命令:
nohup ./bin/mqbroker -c conf/dledger/broker-n0.conf & tail -f ~/logs/rocketmqlogs/broker.log
出现以下结果,说明这台服务器的broker启动成功
启动 Slave01 broker
登录到172.30.35.37服务器上,执行以下命令:
cd /usr/rocketmq/rocketmq-all-4.7.1-bin-release vim conf/dledger/broker-n1.conf
和上述配置文件区别不大,重点关注:dLegerSelfId 和 brokerIP1不一致
brokerClusterName = RaftCluster brokerName=RaftNode00 listenPort=30921 namesrvAddr=172.30.34.10:9876;172.30.35.37:9876;172.30.35.30:9876 storePathRootDir=/tmp/rmqstore/node01 storePathCommitLog=/tmp/rmqstore/node01/commitlog enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-172.30.34.10:40911;n1-172.30.35.37:40912;n2-172.30.35.30:40913 ## must be unique dLegerSelfId=n1 sendMessageThreadPoolNums=16 brokerIP1=172.30.35.37
配置成功,执行以下命令:
nohup ./bin/mqbroker -c conf/dledger/broker-n1.conf & tail -f ~/logs/rocketmqlogs/broker.log
启动 Slave02 broker
登录到172.30.35.30服务器上,执行以下命令:
cd /usr/rocketmq/rocketmq-all-4.7.1-bin-release vim conf/dledger/broker-n2.conf
具体配置文件如下:
brokerClusterName = RaftCluster brokerName=RaftNode00 listenPort=30931 namesrvAddr=172.30.34.10:9876;172.30.35.37:9876;172.30.35.30:9876 storePathRootDir=/tmp/rmqstore/node02 storePathCommitLog=/tmp/rmqstore/node02/commitlog enableDLegerCommitLog=true dLegerGroup=RaftNode00 dLegerPeers=n0-172.30.34.10:40911;n1-172.30.35.37:40912;n2-172.30.35.30:40913 ## must be unique dLegerSelfId=n2 sendMessageThreadPoolNums=16 brokerIP1=172.30.35.30
配置成功,执行以下命令:
nohup ./bin/mqbroker -c conf/dledger/broker-n2.conf & tail -f ~/logs/rocketmqlogs/broker.log
三台服务器执行jps,若出现以下图片红圈内容,说明启动成功
至此三台broker 启动完毕!
搭建可视化平台
linux 服务器管理这些集群信息,不是很方便,有没有更方便的类似管理平台那种呢?
有的,RocketMQ有一个可视化的的dashboard,通过该控制台可以直观的看到很多数据,
下载地址:
https://github.com/apache/rocketmq-externals/releases
下载成功解压,java idea 导入这个项目,如截图:
考虑到github下载项目比较慢,目前我已经把这个项目上传到gitee上,地址如下:
https://gitee.com/lv1792017548/rocketmq-console.git
下载成功以后需要修改其src/main/resources中的application.properties配置文件
其中rocketmq.config.namesrvAddr,是你配置的nameserver地址
在rocketmq-console目录下执行打包命令
mvn clean package -Dmaven.test.skip=true
执行完,再将target 目录下rocketmq-console-ng-1.0.0.jar 包,上传至172.30.34.10机器上,执行以下命令
java -jar rocketmq-console-ng-1.0.0.jar
浏览器输入链接:
http://172.30.34.10:7000/
出现以下页面,说明可视化管理平台搭建成功了
编写生产者和消费者
用springboot框架编写的生产者和消费者,已上传至gitee
https://gitee.com/lv1792017548/rocketmq-demo.git
主要有以下几个类
- RocketMQApplication springboot 框架启动类
- ProducerController 生产者,对外暴露一个get请求接口
- ConsumeController 消费者,消费消息
生产者发送消息
@RestController @RequestMapping("/mq") public class ProducerController { // 自动注入 RocketMQTemplate模板类,用于生产消息 @Autowired private RocketMQTemplate mqTemplate; // 提供接口路由,ip:8080/mq/send?msg=4563464563456 get请求 @RequestMapping("/send") public String testSend(String msg) { try { mqTemplate.convertAndSend("TopicTest", msg); return "success"; } catch (Exception e) { e.printStackTrace(); return "fail"; } } }
消费者消费消息
@Component //mq的监听器,指定topic是TopicTest,消费者组consumerGroupTest @RocketMQMessageListener(topic = "TopicTest", consumerGroup = "consumerGroupTest") @Slf4j public class ConsumeController implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("------- Consumer: {}", message); } }
总结
本文主要分享了RocketMq的原理、搭建步骤和生产者消费者编写demo,下一篇文章主要讲述RocketMq一些常见的测试场景。