RabbitMQ
RabbitMQ简介
是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
消息队列:是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。
消息队列的主要作用:异步、解耦、削峰。
https://www.rabbitmq.com/networking.html RabbitMQ的服务文档信息
RabbitMQ的安装
一、Linux环境下
- 下载:https://www.rabbitmq.com/download.html
- 因为rabbitmq是基于erlang语言开发的,需要安装环境 ,查看环境的版本 https://www.rabbitmq.com/which-erlang.html
- erlang的下载 https://www.erlang-solutions.com/downloads/
- yum install -y erlang 安装erlang
- 安装socat yum install -y socat
- 安装rabbitmq yum install -y rabbitmq-serve
常用命令:systemctl start rabbitmq-serve 启动服务
systemctl enable rabbitmq-serve 开机自启动
7.安装图形化界面 rabbitmq-plugins enable rabbitmq_management
8.rabbitmq授权账户和密码 rabbitmqctl add_user {username} {passwd}
rabbitmqctl add_user root root123
-
设置用户角色 rabbitmqctl set_user_tags {username} {tag}
rabbitmqctl set_user_tags root administrator
docker run -d –name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:3-management
RabbitMQ的工作模式
直连模式:一个队列只被一个consumer消费
工作模式:一个队列被多个consumer消费 (包括轮训模式(将channel.basicConsume(…, true, …)autoAck设为true)、公平模式(能者多劳,将autoAck设置为false))
发布订阅模式:Fanout,采用广播的机制
路由模式:direct,采用直连的形式
topic模式:可以通过routingkey将消息发送给复合定义的队列。
参数模式:header
RabbitMQ中各个组件
Producer生产消息(消息分为头和体,头中有许多的信息),发送给服务器端的Exchange
Exchange收到消息,根据routing key,将消息转发给匹配的Queue
Exchange与Queue之间是通过binding来进行绑定的
Queue收到消息,将消息发送给订阅者Consumer
Producer收到消息,发送ACK给队列确认收到消息
Queue收到ACK,删除队列中缓存的此条消息
Connection 连接通道
Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制
Producer、Consumer与代理服务器(RabbitMQ)之间建立的是长链接,并且在连接中开辟出一个个的通道,来进行消息的传递。可以通过一个个的通道来确定某个服务是否还健在。
RabbitMQ的工作流程图
1、Direct exchange的特性是直接匹配,例:Routing key : name 创建队列name、name1、name2
我们在发送消息时带上我们设置的路由键name,消息会被交换机接受,由交换机通过name来唯一确定的将消息放入名为name的队列。
2、Fanout exchange:以扇形的形式向队列发送消息,例:Routing key : name 创建队列name、name1、name2
我们在发送消息时带上我们设置的路由键name(这里也可以不设置Routing key),消息会被交换机接受,交换机会将此消息保存到每一个队列中
3、Topic exchange:可以在设置Routing key时,依通配符的形式来设置(# *)name.#表示以name开始,后面只能有一个单词。name. *表示后面可以有多个单词。例:Routing key : name# 创建队列name.zs、name.lisi、name2
我们在发送消息时带上我们设置的路由键name.*,消息会被交换机接受,由交换机通过Routing key确定的将消息放入名为name.zs和name.lisi的队列中。
4、Headers exchange 它是根据Message的一些头部信息来分发过滤Message,忽略routing key的属性,如果Header信息和message消息的头信息相匹配,那么这条消息就匹配上了。在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。
整合SpringBoot
1、导入依赖
引入依赖后RabbitAutoConfiguration就会工作,来进行一些自动配置,
CachingConnectionFactory 相当于是一个工厂,用来创建rabbitmq
RabbitTemplate 相当于是一个实例,用来操作rabbitmq消息
AmqpAdmin 相当于是管理组件,可以用来创建交换机、队列,并管理它们
RabbitMessagingTemplate 实现了RabbitMessageOperations接口
2、还需要进行配置文件的配置
3、@EnableRabbit
消息的发送
消息的接收
RabbitMQ的消息确认机制-可靠抵达
保证消息不丢失,可靠抵达,使用事务消息,性能低下
一、服务收到消息回调
二、消息正确抵达回调
三、消费端确认
publisher: confirmCallback 消息被broker接收就会触发回调
publisher: returnCallback 消息被queue接收就会触发回调
consumer: ack机制
消息端的确认默认是自动确认的,只要消息接受到,服务端默认确认,并且服务端移除消息
我们可以手动进行确认
可靠消费-重试机制
解决消息重试的方案:1、控制重试的次数
2、try catch +手动ack 在catch中要将requeue设置为false
如果为true会一直重试,就算设置了重试次数也没用
3、try catch +手动ack+死信队列
死信队列
死信的概念
无法被消费的消息被称为死信,而为了保证消息的不丢失,就产生了死信队列,将无法消费的消息放入死信队列。例:当用户下完单并没有支付时,可以将其放入死信队列,设置自动过期时间。
- 消息被拒绝(basic.reject/ basic.nack)并且requeue=false
- 消息TTL过期(参考:[RabbitMQ之TTL(Time-To-Live 过期时间)])
- 队列达到最大长度
延迟队列
1、本质是 消息TTL过期 过期时间有生产者设置
2、使用:订单在十分钟内未支付被自动取消
例:延迟队列会出现死信问题,因为其只会检测第一条的过期时间,如果第一条的过期时间太长 ,第二条的过期时间很短,必须得等待第一条执行完,才能执行第二条,第二条就是死信
3、rabbitmq的插件可以解决此问题
插件安装完成后会在图形化界面exchange中新出现一个类型:x-delayed-message
rabbitmq会出现幂等性问题
幂等性就是重复提交表单,造成多次消费。
解决幂等性的方法:
1、唯一id+指纹码机制
2、redis原子性 setnx操作天然原子性
设置队列优先级
rabbitmq中的队列可以设置优先级,来先消费某些消息(0-255数字越大,优先级越高)
惰性队列
消息是被保存在磁盘上的,例:消费者下线,不能消费消息时,为了减少mq的压力,应当将消息磁盘化。
rabbitmq的集群
一、集群的搭建:http://www.rabbitmq.com/install-rpm.html https://www.cnblogs.com/knowledgesea/p/6535766.html
-
RabbitMQ的集群是依赖erlang集群,而erlang集群是通过这个cookie进行通信认证的,因此我们做集群的第一步就是干cookie。必须使集群中也就是F,G这两台机器的.erlang.cookie文件中cookie值一致,且权限为owner只读 (.erlang.cookie 存在于/var/lib/rabbitmq/.erlang.cookie 和~/.erlang.cookie中)
保持服务器中的/var/lib/rabbitmq/.erlang.cookie和~/.erlang.cookie一致
-
后台启动结点 rabbitmq-server -detached
-
查看结点状态 rabbitmqctl status
-
启动第一个服务 rabbitmq-server
-
rabbitmqctl stop_app 先停止应用
rabbitmqctl reset //可以不做
rabbitmqctl join_cluster rabbit@名称1
rabbitmqctl start_app
rabbitmqctl cluster_status 查看集群状态信息
二、镜像队列,保证数据不丢失,进行数据的备份

三、实现高可用的负载均衡
使用Nginx来实现
四、federation exchange 来实现不同地区之间服务器数据的同步 (联盟交换机)是通过交换机之间进行连接
五、federation queue 来实现不同地区之间服务器数据的同步,是通过队列之间进行连接
rabbitmq实现分布式事务
分布式事务会出现数据不一致的情况,两个服务运行在不同的机器上,通过远程调用的方式来调用,因为两个服务都不在一台机器上,所以事务是不能控制的。如:订单服务(加上事务)有自己的数据库,(在数据库的内存中生成一条记录,因为有事务的原因,不会第一时间跟新数据库),向用户服务发起请求,用户服务也有自己的数据库,并在数据库中生成数据,因为某些原因,该服务不可用,订单服务就会出现异常,并回滚事务,那么就会出现数据不一致的情况。
利用RabbitMQ的消息确认机制-可靠抵达作为中间件
文章知识点与官方知识档案匹配,可进一步学习相关知识Java技能树首页概览91390 人正在系统学习中
声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!