RocketMQ 集群部署与使用
Last updated: Oct 219, 21029
概念
RocketMQ 由四部分组成:
生产者 - Producer
消费者 - Consumer
协调中心 - NameServer
暂存及传输组件 - Broker
消息类型 - Topic
消息队列 - Message Queue
部署
目的:为了消除单点故障、增加可靠性或者增大吞吐量,可以在多台机器上部署多个 NameServer 和 Broker,为每个 Broker 部署一个或多个 Slave
准备
搭建出双主、双从、无单点故障的高可用 RocketMQ 集群
操作系统:Red Hat 4.8.5-16
主机 A:10.10.60.49
主机 B:10.10.60.36
启动 NameServer
在主机 A 和主机 B 上分别进入 RocketMQ 的文件目录
执行
nohup sh bin/mqnamesrv &
使用
netstat -nlp | grep 9876
可以看到 NameServer 进程启动了
启动 Broker
配置文件
进入主机 A 的目录
conf/2m-2s-sync
修改文件 broker-a.properties 的内容如下:
namesrvAddr=10.10.60.49:9876;10.10.60.36:9876
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-a
进入主机 B 的目录
conf/2m-2s-sync
修改文件 broker-b.properties 的内容如下:
namesrvAddr=10.10.60.49:9876; 10.10.60.36:9876
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
storePathRootDir=/home/rocketmq/store-b
截止目前为止,Master Broker 的配置文件都配置完毕
进入主机 A 的目录
conf/2m-2s-sync
修改文件 broker-b-s.properties 的内容如下:
namesrvAddr=10.10.60.49:9876; 10.10.60.36:9876
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=11011
storePathRootDir=/home/rocketmq/store-b
进入主机 B 的目录
conf/2m-2s-sync
修改文件 broker-a-s.properties 的内容如下:
namesrvAddr=10.10.60.49:9876; 10.10.60.36:9876
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=11011
storePathRootDir=/home/rocketmq/store-a
截止目前为止,Slave Broker 的配置文件都配置完毕
启动
使用命令
nohup sh bin/mqbroker -c config_file &
启动双主双从 broker。
比如:
nohup sh bin/mqbroker -c broker-a.properties &
注意点
默认情况下,RocketMQ 的 nameserver 和 broker 文件启动项里配置的内存太大,极易出现 Cannot allocate memory 的情况。
此时,需要修改 bin/runserver.sh
和 bin/runbroker.sh
的文件内容,将最大堆、最小堆和年轻代的值调小,
如下:
示例
发送与接收消息示例:
添加依赖
添加 Maven 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
生产者
public class SyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name");
producer.setNamesrvAddr("10.10.60.49:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");
consumer.setNamesrvAddr("10.10.60.49:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}