RocketMQ
操作系统版本:linux7.6
软件名称:RocketMQ
软件版本:4.8.0
原理:
RocketMQ具有以下特点:
o 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
o Producer、Consumer、队列都可以分布式。
o Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
o 能够保证严格的消息顺序
o 提供丰富的消息拉取模式
o 高效的订阅者水平扩展能力
o 实时的消息订阅机制
o 亿级消息堆积能力
o 较少的依赖
如上图所示, RocketMQ的部署结构有以下特点:
o Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
o Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
o Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
o Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
安装部署:
服务器:
master > slave
192.168.105.10 > 192.168.105.35
192.168.105.35 > 192.168.105.39
192.168.105.39 > 192.168.105.41
192.168.105.41 > 192.168.105.105
192.168.105.105 > 192.168.105.106
192.168.105.106 > 192.168.105.107
192.168.105.107 > 192.168.105.108
192.168.105.108 > 192.168.105.10
master端口:29701,slave端口:29501,namesrv端口:29876
1.官 下载软件包 https://downloads.apache.org/rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip
2.解压到指定目录unzip -d /data/middleware/rockermq-4.8.0 rocketmq-all-4.8.0-source-release.zip
3.解压后目录结构如下
配置:
1.Broker节点的配置文件在/rocketmq/conf下面,有三种配置方式:
2m-noslave:多Master模式
2m-2s-sync:多Master多Slave模式,同步双写
2m-2s-async:多Master多Slave模式,异步复制
2.我们这里使用的是namesrv五节点配置,异步复制方式,所以配置的时候主要使用的是/data/middleware/rockermq-4.8.0/conf/2m-2s-async目录
注意:八个master的listenport一致,八个slave的listenport一致,不是主从listenport一致
3.删除/data/middleware/rockermq-4.8.0/conf/2m-2s-async目录下配置文件,并创建broker-m.properties和broker-s.properties文件
4.broker-m.properties(master节点配置文件)
brokerClusterName=Cluster-RocketMq
brokerName=broker-105105
#不同服务器需要修改
brokerIP1=192.168.105.105
#服务器IP(不同服务器需要修改)
brokerIP2=10.0.22.225
#内 IP(BrokerHAIP地址,供slave同步消息的地址,不同服务器需要修改)
listenPort=29701
#master端口
brokerId=0
#brokerid=0为master
brokerRole=ASYNC_MASTER
#broker的角色
namesrvAddr=192.168.105.41:29876;192.168.105.105:29876;192.168.105.106:29876;192.168.105.107:29876;192.168.105.108:29876
#namesrv的节点
rocketmqHome=/data/middleware/rocketmq-4.8.0
#RockerMQ主目录
storePathRootDir=/data/middleware/rocketmq-4.8.0/rootdir-m
#broker存储目录
storePathCommitLog=/data/middleware/rocketmq-4.8.0/commitlog-m
#Commitlog存储目录默认为${storePathRootDir}/commitlog
serverSelectorThreads=3
#IO线程池线程个数,主要是NameServer.broker端解析请求,返回相应的线程个数,这类线程主要是处理 络请求的,解析请求包。然后转发到各个业务线程池完成具体的业务无操作,然后将结果在返回调用方
serverSocketRcvBufSize=131072
#netty 络socket接收缓存区大小16MB
osPageCacheBusyTimeOutMills=1000
#putMessage锁占用超过该时间,表示 PageCache忙
shortPollingTimeMills=1000
#短轮询等待时间
clientSocketRcvBufSize=131072
#客户端socket接收缓冲区大小
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=false
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=2000
brokerFastFailureEnable=true
flushDiskType=ASYNC_FLUSH
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
#消息消费堆积阈值默认16GB在disableConsumeifConsumeIfConsumerReadSlowly为true时生效
autoCreateSubscriptionGroup=true
transientStorePoolEnable=false
flushConsumerOffsetInterval=5000
#持久化消息消费进度 consumerOffse.json文件的频率ms
waitTimeMillsInHeartbeatQueue=31000
#清理broker心跳线程等待时间
diskMaxUsedSpaceRatio=75
#commitlog目录所在分区的最大使用比例,如果commitlog目录所在的分区使用比例大于该值,则触发过期文件删除
flushCommitLogLeastPages=4
#一次刷盘至少需要脏页的数量,针对commitlog文件
cleanFileForciblyEnable=true
#是否支持强行删除过期文件
slaveReadEnable=true
#从节点是否可读
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=true
useEpollNativeSelector=false
enablePropertyFilter=true
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 2d
deleteCommitLogFilesInterval=100
maxTransferBytesOnMessageInDisk=65536
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=80
useReentrantLockWhenPutMessage=false
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=32
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=32
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=32
transactionTimeOut=6000
maxMessageSize=6291456
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
queryMessageThreadPoolNums=40
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=80
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=5
waitTimeMillsInSendQueue=500
warmMapedFileEnable=false
endTransactionThreadPoolNums=72
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
haListenPort=19702
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=04
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=4
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=72
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
maxTransferCountOnMessageInDisk=2000
5.broker-s.properties(slave节点配置文件)
brokerClusterName=DefaultCluster
brokerName=broker-105041
#该配置文件IP192.168.105.105,slave的配置文件brokername要写master节点配置文件的brokername(不同服务器需要修改)
namesrvAddr=192.168.105.41:29876;192.168.105.105:29876;192.168.105.106:29876;192.168.105.107:29876;192.168.105.108:29876
#namesrv的节点
brokerId=1
#brokerid=0为master,非0为slave
listenPort=29501
#slave端口
brokerIP1=192.168.105.105
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/data/middleware/rocketmq-4.8.0/rootdir-s
#broker存储目录
storePathCommitLog=/data/middleware/rocketmq-4.8.0/commitlog-s
#Commitlog存储目录默认为${storePathRootDir}/commitlog
flushDiskType=ASYNC_FLUSH
#刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值SYNC_FLUSH(同步刷盘)
useReentrantLockWhenPutMessage=true
#从节点是否可读, 默认false
slaveReadEnable=true
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组
autoCreateSubscriptionGroup=true
#支持消息轨迹
traceTopicEnable=true
#在发送线程池任务队列的最大等待时间, 超过时间任务会被移除, 默认200ms
waitTimeMillsInSendQueue=500
#服务端发送线程个数,建议配置成Cpu核数
sendMessageThreadPoolNums=32
#消息存储到commitlog文件时获取锁类型,如果为true使用ReentrantLock否则使用自旋锁
useReentrantLockWhenPutMessage=false
#一次服务消息拉取,消息在内存中传输运行的最大消息条数,默认为32条
maxTransferCountOnMessageInMemory=2000
#一次消息服务端消息拉取,消息在磁盘中传输允许的最大条数,默认为8条
maxTransferCountOnMessageInDisk=2000
#允许的最大消息体, 默认4M
maxMessageSize=6291456
#延迟队列等级
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 2d
6.namesrv配置文件
运维:
-
启动:
namesrv:
nohup /bin/sh $ROCKETMQ_HOME/bin/mqnamesrv -c $ROCKETMQ_HOME/conf/namesrv.properties $ROCKETMQ_HOME/logs/mqnamesrv.log 2>&1 &
master:
nohup /bin/sh $ROCKETMQ_HOME/bin/mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-m.properties > $ROCKETMQ_HOME/logs/broker-m.log 2>&1 &
slave:
nohup /bin/sh $ROCKETMQ_HOME/bin/mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-s.properties > $ROCKETMQ_HOME/logs/broker-s.log 2>&1 &
查看集群状态:
sh bin/mqadmin clusterList -n 10.0.0.0:29876 (namesrvIP及namesrv端口)
rocketmq web:
nohup java -jar jar包名 &
vim jar包后修改里面配置文件的namesrvaddr地址即可启动
文章知识点与官方知识档案匹配,可进一步学习相关知识云原生入门技能树持续集成和部署(Jenkins)使用helm安装Jenkins8746 人正在系统学习中
声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!