RocketMQ 下载安装,修改运行内存,配置单主

讲解目录

1:RocketMQ 下载安装启动测试,调整 namesrv 和 broker 启动内存等等,

2:SpringBoot 集成 RocketMQ,

3:RocketMQ配置双主模式 和 双主双从模式,

4:RocketMQ配置ACL权限,

5:RocketMQ配置RocketMQ-Console,

6:RocketMQ配置console管理后台的登录账户密码,

7:RocketMQ在ACL权限下Console管理后台如何配置,

RocketMQ 下载安装

1、打开apache官 ,Apache官 ,下拉到底部,找到RocketMQ ,点击进去到主页,或者直接访问 RocketMQ 主页

可以看到最新版本为:release-notes-4.8.0 ,点击进去进行下载,如下图所示:

RocketMQ 最新版

演示系统:linux CentOS系统

下载:

如果想看源码的同学们,可以去github上clone一份

地址:
https://github.com/apache/rocketmq

这里我们只演示编译后的下载安装方式

我们先进行下载,使用指令:

> cd /usr/local> wget https://mirrors.bfsu.edu.cn/apache/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip

如下所示:

下载操作截图

下载完成后,使用unzip进行解压,修改文件名称(博主嫌弃它太长)

# 解压> unzip rocketmq-all-4.8.0-bin-release.zip# 修改名称> mv rocketmq-all-4.8.0-bin-release rocketmq-4.8.0# 删除刚下载的压缩文件> rm -rf rocketmq-all-4.8.0-bin-release.zip

都准备完毕了,我们使用指令查看一下目录文件详情:

安装完的目录详情

下边我们的操作根目录都在 /usr/local/rocketmq-4.8.0 目录下。

修改 namesrv 和 broker 启动内存:

namesrv JVM 初始内存分配

仅一个namesrv就给了4G内存,太大了,namesrv没必要给这么大,我觉得给个 512m 就足够了。

我们进行调整修改成如下所示:

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

然后保存退出。

再来看一下broker的内存分配情况:

broker JVM 初始内存分配

broker 分配内存为8G,我们调整成1G。

我们进行调整修改成如下所示:

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

然后保存退出。

启动单主模式:

namesrv启动:

# 建议新手先使用第一种启动语法,可以看下控制台输出信息,成功后,再使用ctrl+c退出,使用第二种语法,后台启动 # 1、启动 namesrv 语法,退出控制台就退出了> sh bin/mqnamesrv   # 2、后台启动 namesrv 语法> nohup sh bin/mqnamesrv &

第 1 种语法:如下所示,表示启动成功:

启动成功

第 2 种语法:

后台启动方式

broker启动:

# 建议新手先使用第一种启动语法,可以看下控制台输出信息,成功后,再使用ctrl+c退出,使用第二种语法,后台启动 # 1、启动 broker 语法,退出控制台就退出了> sh bin/mqbroker -c conf/broker.conf   # 2、后台启动 broker 语法> nohup sh bin/mqbroker -c conf/broker.conf &  # 或者增加namesrv地址(如果broker启动后,没有和namesrv链接成功),例如:> nohup sh bin/mqbroker -c conf/broker.conf -n 10.211.55.11:9876 &

broker的启动我们先演示一张未启动成功的图片,未修改内存之前的启动:提示内存不足,相信不少人第一次下载启动的时候会遇到这种问题,因为官方默认的内存分配有点大。

内存不足,启动失败

正确的启动图展示:

broker 启动成功

RocketMQ namesrv和broker 停止语法

# namesrv 停止语法> sh bin/mqshutdown namesrv# 输出如下内容,表示停止成功The mqnamesrv(16200) is running...Send shutdown request to mqnamesrv(16200) OK # ========华丽短分割线======== # broker 停止语法> sh bin/mqshutdown broker# 输出如下内存,表示停止成功The mqbroker(16447) is running...Send shutdown request to mqbroker(16447) OK # ========华丽短分割线======== # 查看broker启动后是否和namesrv链接成功> sh bin/mqadmin clusterList -n 10.211.55.11:9876#输出如下内容则表示成功,如果没有,则重启broker,带上-n ip:端口 :RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).RocketMQLog:WARN Please initialize the logger system properly.#Cluster Name     #Broker Name            #BID  #Addr                  #Version                #InTPS(LOAD)       #OutTPS(LOAD) #PCWait(ms) #Hour #SPACEDefaultCluster    broker-a                0     10.211.55.11:10911     V4_8_0                   0.00(0,0ms)         0.00(0,0ms)          0 449820.93 0.1126

重复一遍:我们使用后台启动方式将namesrv和broker都启动起来后,就可以正常使用了。

# 后台启动namesrv> nohup sh bin/mqnamesrv &# 后台启动broker> nohup sh bin/mqbroker -c conf/broker.conf & #查看下进程是否正常,如下所示:> ps -ef|grep mq

启动成功后的进程图

测试消息发送和接收

### 运行示例程序,发送消息:# 我们设置一下namesrv_addr地址> export NAMESRV_ADDR=localhost:9876# 设置成功后,我们执行生产者进行消息生产> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer # 接下来我们会看到很多生产信息SendResult [sendStatus=SEND OK , msgid= ......

测试启动生产者

我们再开一个窗口,也定位该目录。

### 运行示例程序,接收消息:# 我们设置一下namesrv_addr地址> export NAMESRV_ADDR=localhost:9876# 设置成功后,我们执行消费者进行消息消息> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer # 接下来我们会看到很多消费信息ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broke......

以上流程都测试完毕了,如果没有问题,那么你所安装的RocketMQ已经具备生产消费的基本功能了,你也可以编写程序进行测试。

下边我们给出两个Java类,来测试生产消息和消费消息。

Producer 生产者:

/** *                     .::::. *                   .::::::::. *                  :::::::::::    佛主保佑、永无Bug *              ..:::::::::::' *            '::::::::::::' *              .:::::::::: *         '::::::::::::::.. *              ..::::::::::::. *            ``:::::::::::::::: *             ::::``:::::::::'        .:::. *            ::::'   ':::::'       .::::::::. *          .::::'      ::::     .:::::::'::::. *         .:::'       :::::  .:::::::::' ':::::. *        .::'        :::::.:::::::::'      ':::::. *       .::'         ::::::::::::::'         ``::::. *   ...:::           ::::::::::::'              ``::. *  ```` ':.          ':::::::::'                  ::::.. *                     '.:::::'                    ':'````.. */ import com.alibaba.fastjson.JSONObject;import com.sunjs.rocketmq.model.Users;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message; import java.util.Date;import java.util.UUID; /** * @Date 2021-04-20 15:40 */public class Producer {     public static void main(String[] args) throws Exception {        DefaultMQProducer producer = new DefaultMQProducer("default_producter_group");        producer.setNamesrvAddr("10.211.55.11:9876");        producer.start();         String body = "你好 RocketMQ";        Message message = new Message("TopicTest", null, body.getBytes());        message.setKeys(UUID.randomUUID().toString());        SendResult sendResult = producer.send(message);        System.out.printf("发送结果:%s%n", sendResult);         producer.shutdown();    } }
控制台:RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).RocketMQLog:WARN Please initialize the logger system properly.发送结果:SendResult [sendStatus=SEND_OK, msgId=7F0000018D2618B4AAC27E7E1CEF0000, offsetMsgId=0AD3370B00002A9F00000000000F1275, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=817]

Consumer 消费者:

/**                     .::::.                   .::::::::.                  :::::::::::    佛主保佑、永无Bug              ..:::::::::::'            '::::::::::::'              .::::::::::         '::::::::::::::..              ..::::::::::::.            ``::::::::::::::::             ::::``:::::::::'        .:::.            ::::'   ':::::'       .::::::::.          .::::'      ::::     .:::::::'::::.         .:::'       :::::  .:::::::::' ':::::.        .::'        :::::.:::::::::'      ':::::.       .::'         ::::::::::::::'         ``::::.   ...:::           ::::::::::::'              ``::.  ```` ':.          ':::::::::'                  ::::..                     '.:::::'                    ':'````..      */  import com.alibaba.fastjson.JSON;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @Date 2021-04-20 15:47 */public class SyncConsumer {     public static void main(String[] args) throws Exception {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_consumer_group");        consumer.setNamesrvAddr("10.211.55.11:9876");        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);         consumer.subscribe("TopicTest", "*");//订阅topic和tag设置        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                String body = new String(msgs.get(0).getBody());                System.out.printf("消息内容:%s%n", body);                System.out.printf("消息完整数据包:%s%n", msgs);                 System.out.println(Thread.currentThread().getName()+" = "+msgs);                System.out.println(Thread.currentThread().getName()+" = "+body);                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        consumer.start();    } }
控制台:消息内容:你好 RocketMQ消息完整数据包:[MessageExt [brokerName=broker-a, queueId=1, storeSize=232, queueOffset=817, sysFlag=0, bornTimestamp=1619328594160, bornHost=/10.211.55.2:60811, storeTimestamp=1619328594172, storeHost=/10.211.55.11:10911, msgId=0AD3370B00002A9F00000000000F1275, commitLogOffset=987765, bodyCRC=2021450339, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=818, KEYS=77002c48-4c90-4695-9b10-057bc9979b62, CONSUME_START_TIME=1619328594187, UNIQ_KEY=7F0000018D2618B4AAC27E7E1CEF0000, CLUSTER=DefaultCluster, WAIT=true}, body=[-28, -67, -96, -27, -91, -67, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]]ConsumeMessageThread_1 = [MessageExt [brokerName=broker-a, queueId=1, storeSize=232, queueOffset=817, sysFlag=0, bornTimestamp=1619328594160, bornHost=/10.211.55.2:60811, storeTimestamp=1619328594172, storeHost=/10.211.55.11:10911, msgId=0AD3370B00002A9F00000000000F1275, commitLogOffset=987765, bodyCRC=2021450339, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=818, KEYS=77002c48-4c90-4695-9b10-057bc9979b62, CONSUME_START_TIME=1619328594187, UNIQ_KEY=7F0000018D2618B4AAC27E7E1CEF0000, CLUSTER=DefaultCluster, WAIT=true}, body=[-28, -67, -96, -27, -91, -67, 32, 82, 111, 99, 107, 101, 116, 77, 81], transactionId='null'}]]ConsumeMessageThread_1 = 你好 RocketMQ

因文章内容字数有限,特将文章进行拆分。

下一篇: SoringBoot 集成 RocketMQ,
rocketmq-spring-boot-starter

声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!

上一篇 2021年3月15日
下一篇 2021年3月15日

相关推荐