前言:rabbitmq安装参考rabbitmq基础配置
这里采用python脚本方式进行演示
#安装pip3
python3 get-pip.py
#安装python相关包
pip3 install pika
#修改其他两个py文件
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.66.221', 修改rabbitmq服务器IP
credentials=pika.PlainCredentials('admin', '123') 修改登录账户密码
)
)
#对admin账户配置相应权限
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
[root@rocky221 ~]# python3 Product.py 100
准备发送 100 条消息到 'test_queue'...
100 条消息已全部发送完毕。

#执行消费者脚本 接收消息
[root@rocky221 ~]# python3 Consumer.py
等待接收消息...
收到消息: Hello RabbitMQ! 这是第 1 条消息。
收到消息: Hello RabbitMQ! 这是第 2 条消息。
收到消息: Hello RabbitMQ! 这是第 3 条消息。
收到消息: Hello RabbitMQ! 这是第 4 条消息。
收到消息: Hello RabbitMQ! 这是第 5 条消息。
收到消息: Hello RabbitMQ! 这是第 6 条消息。
收到消息: Hello RabbitMQ! 这是第 7 条消息。
收到消息: Hello RabbitMQ! 这是第 8 条消息。
收到消息: Hello RabbitMQ! 这是第 9 条消息。
收到消息: Hello RabbitMQ! 这是第 10 条消息。
收到消息: Hello RabbitMQ! 这是第 11 条消息。
收到消息: Hello RabbitMQ! 这是第 12 条消息。
收到消息: Hello RabbitMQ! 这是第 13 条消息。
收到消息: Hello RabbitMQ! 这是第 14 条消息。
收到消息: Hello RabbitMQ! 这是第 15 条消息。
收到消息: Hello RabbitMQ! 这是第 16 条消息。
收到消息: Hello RabbitMQ! 这是第 17 条消息。
收到消息: Hello RabbitMQ! 这是第 18 条消息。
收到消息: Hello RabbitMQ! 这是第 19 条消息。
收到消息: Hello RabbitMQ! 这是第 20 条消息。
收到消息: Hello RabbitMQ! 这是第 21 条消息。
......
rabbitmq集群
任何一个服务,如果仅仅是单机部署,那么性能总是有上喂的,Rabbitm也不例外,当单台 Rabbitm服务处理消息的能力到达瓶颈时,可以通过集群来实现高可用和负载均衡。
通常情况下,在集群中我们把每一个服务称之为一个节点,在 Rabbitmo 集群中,节点类型可以分为两种。
内存节点:元数据存放于内存中。为了重启后能同步数据,内存节点会将磁盘节点的地址存放于磁盘之中,除此之外,如果消息被持久化了也会存放于磁盘之中,因为内存节点读写速度快,一般客户端会连接内存节点。
磁盘节点:元数据存放于磁盘中(默认节点类型),需要保证至少一个磁盘节点,否则一旦宕机,无法恢复数据,从而也就无法达到集群的高可用目的。
PS:元数据,指的是包括队列名字属性、交换机的类型名字属性、绑定信息、vhost 等基础信息,不包括队列中的消息数据。
Rabbitme 中的集群主要有两种模式:普通集群模式和镜像队列模式。
普通集群模式
在普通集群模式下,集群中各个节点之间只会相互同步元数据,也就是说,消息数据不会被同步。那么问题就来了,假如我们连接到 A 节点,但是消息又存储在 B 节点又怎么办呢?
无论是生产者还是消费者,假如连接到的节点上没有存储队列数据,那么内部会将其转发到存储队列数据的节点上进行存储。虽然说内部可以实现转发,但是因为消息仅仅只是存储在一个节点,那么假如这节点挂了,消息是不是就没有了?这个问题确实存在,所以这种普通集群模式并没有达到高可用的目的。
部署普通集群
所有节点添加hosts
cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.66.221 rocky221
192.168.66.222 rocky222
192.168.66.223 rocky223
安装并启动rabbitmq
yum install -y rabbitmq-server
systemctl start rabbitmq-server
[root@rocky221 ~]# rabbitmqctl list_users
Listing users ...
user tags
admin [administrator]
guest [administrator]
[root@rocky222 ~]# rabbitmqctl list_users
Listing users ...
user tags
guest [administrator]
[root@rocky223 ~]# rabbitmqctl list_users
Listing users ...
user tags
guest [administrator]
目前只有221节点有创建的用户,222、223节点没有新创建的用户
在node1上查看集群信息、只返回一个节点(目前三个节点没有产生联系)
[root@rocky221 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky221 ...
Basics
Cluster name: rabbit@rocky221
Total CPU cores available cluster-wide: 2
Disk Nodes
rabbit@rocky221
Running Nodes
rabbit@rocky221
Versions
rabbit@rocky221: RabbitMQ 3.13.7 on Erlang 26.2.5.15
CPU Cores
Node: rabbit@rocky221, available CPU cores: 2
Maintenance status
Node: rabbit@rocky221, status: not under maintenance
确保所有节点上.erlang.cookie文件内容一致。通过Erlang的分布式特性(通过magic cookie认证节点)进行RabbitMQ集群,各RabbitMQ服务为对等节点,即每个节点都提供服务给客户端连接,进行消息发送与接收:
[root@rocky221 ~]# scp /var/lib/rabbitmq/.erlang.cookie rocky222:/var/lib/rabbitmq/
[root@rocky221 ~]# scp /var/lib/rabbitmq/.erlang.cookie rocky223:/var/lib/rabbitmq/
[root@rocky221 ~]# cat /var/lib/rabbitmq/.erlang.cookie
TIUDEICBQYWLRTHDXNSL
将node2和node3加入到集群当中:
# 将node2加入集群,在node2上操作
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rocky221//--ram 表示这是一个内存节点
rabbitmqctl start_app
# 在node3上执行加入集群操作, 将node3添加为一个磁盘节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --disc rabbit@rocky221 //--disc表示磁盘节点(默认也是磁盘节点)
rabbitmqctl start_app
#查看集群状态
[root@rocky221 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky221 ...
Basics
Cluster name: rabbit@rocky221
Total CPU cores available cluster-wide: 6
Disk Nodes
rabbit@rocky221
rabbit@rocky223
RAM Nodes
rabbit@rocky222
Running Nodes
rabbit@rocky221
rabbit@rocky222
rabbit@rocky223
Versions
rabbit@rocky221: RabbitMQ 3.13.7 on Erlang 26.2.5.15
rabbit@rocky223: RabbitMQ 3.13.7 on Erlang 26.2.5.15
rabbit@rocky222: RabbitMQ 3.13.7 on Erlang 26.2.5.15
CPU Cores
Node: rabbit@rocky221, available CPU cores: 2
Node: rabbit@rocky223, available CPU cores: 2
Node: rabbit@rocky222, available CPU cores: 2
Maintenance status
Node: rabbit@rocky221, status: not under maintenance
Node: rabbit@rocky223, status: not under maintenance
Node: rabbit@rocky222, status: not under maintenance
测试普通集群,验证效果
[root@rocky222 ~]# rabbitmqctl list_users
Listing users ...
user tags
admin [administrator]
guest [administrator]
现在查看222节点,元数据也已经同步成功
现在演示往test_queue里写入数据,从222节点读取数据,看是否成功
注意:test_queue是在product.py脚本里设置的,在221节点上,所以测试数据是存放在221节点
[root@rocky221 ~]# cat Product.py |grep 223
host='192.168.66.223',
[root@rocky221 ~]# python3 Product.py 100
准备发送 100 条消息到 'test_queue'...
100 条消息已全部发送完毕。
[root@rocky221 ~]# cat Consumer.py |grep 222
host='192.168.66.222',
[root@rocky221 ~]# python3 Consumer.py
等待接收消息...
收到消息: Hello RabbitMQ! 这是第 1 条消息。
收到消息: Hello RabbitMQ! 这是第 2 条消息。
收到消息: Hello RabbitMQ! 这是第 3 条消息。
收到消息: Hello RabbitMQ! 这是第 4 条消息。
收到消息: Hello RabbitMQ! 这是第 5 条消息。
收到消息: Hello RabbitMQ! 这是第 6 条消息。
收到消息: Hello RabbitMQ! 这是第 7 条消息。
收到消息: Hello RabbitMQ! 这是第 8 条消息。
收到消息: Hello RabbitMQ! 这是第 9 条消息。
收到消息: Hello RabbitMQ! 这是第 10 条消息。
收到消息: Hello RabbitMQ! 这是第 11 条消息。
收到消息: Hello RabbitMQ! 这是第 12 条消息。
收到消息: Hello RabbitMQ! 这是第 13 条消息。
收到消息: Hello RabbitMQ! 这是第 14 条消息。
收到消息: Hello RabbitMQ! 这是第 15 条消息。
收到消息: Hello RabbitMQ! 这是第 16 条消息。
收到消息: Hello RabbitMQ! 这是第 17 条消息。
...... 读取成功
显示是成功的,但如果往test_queue里写入数据,但写入数据之后,221节点宕机了呢,还能读取到数据嘛
现在来演示一下
注意:test_queue是在product.py脚本里设置的,在221节点上,所以测试数据是存放在221节点
[root@rocky221 ~]# python3 Product.py 100
准备发送 100 条消息到 'test_queue'...
100 条消息已全部发送完毕。
[root@rocky221 ~]# systemctl stop rabbitmq-server.service
[root@rocky221 ~]# python3 Consumer.py
Traceback (most recent call last):
File "/root/Consumer.py", line 14, in <module>
channel.queue_declare(queue='test_queue', durable=True)
File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2524, in queue_declare
self._flush_output(declare_ok_result.is_ready)
File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1357, in _flush_output
raise self._closing_reason # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - queue 'test_queue' in vhost '/' process is stopped by supervisor")
读取失败
这是因为普通集群,只会同步元数据,不会同步数据的原因
配置镜像队列模式
(3.x版本需要手动设置镜像队列才能同步)而最新4.x版本默认普通集群自动会将数据同步至各个节点
镜像队列模式本质上,就是在普通集群模式的基础上配置队列的同步
如果需要将所有队列都设置为镜像模式,可在任意节点执行如下操作
[root@rocky221 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
现在再重新演示一下 刚刚普通集群的弊端
[root@rocky221 ~]# python3 Product.py 10
准备发送 10 条消息到 'test_queue'...
10 条消息已全部发送完毕。
[root@rocky221 ~]# systemctl stop rabbitmq-server
[root@rocky221 ~]# python3 Consumer.py
等待接收消息...
收到消息: Hello RabbitMQ! 这是第 1 条消息。
收到消息: Hello RabbitMQ! 这是第 2 条消息。
收到消息: Hello RabbitMQ! 这是第 3 条消息。
收到消息: Hello RabbitMQ! 这是第 4 条消息。
收到消息: Hello RabbitMQ! 这是第 5 条消息。
收到消息: Hello RabbitMQ! 这是第 6 条消息。
收到消息: Hello RabbitMQ! 这是第 7 条消息。
收到消息: Hello RabbitMQ! 这是第 8 条消息。
收到消息: Hello RabbitMQ! 这是第 9 条消息。
收到消息: Hello RabbitMQ! 这是第 10 条消息。
显示虽然221节点宕机了,但数据依然可以访问,这是因为配置了镜像队列模式,不但同步元数据,连数据也会同步
如果需要将所有队列都设置为镜像模式,可在任意节点执行如下操作:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
将名称以ha开头的队列同步至所有节点:
rabbitmqctl set_policy ha-all "^ha.*" '{"ha-mode": "all"}'
将所有队列至少同步至两个节点:
rabbitmqctl set_policy ha-two "^.*" '{"ha-mode": "exactly","ha-params": 2,"ha-sync-mode": "automatic" }'
在任意节点查看相应策略
[root@rocky222 ~]# rabbitmqctl list_policies
Listing policies for vhost "/" ...
vhost name pattern apply-to definition priority
/ ha-all ^ all {"ha-mode":"all"} 0
删除策略
[root@rocky222 ~]# rabbitmqctl clear_policy -p / ha-all
Clearing policy "ha-all" on vhost "/" ...
集群管理操作
切换节点类型
将221切换成内存节点
[root@rocky221 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rocky221 ...
[root@rocky221 ~]# rabbitmqctl change_cluster_node_type ram
Turning rabbit@rocky221 into a ram node
[root@rocky221 ~]# rabbitmqctl start_app
Starting node rabbit@rocky221 ...
[root@rocky221 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky221 ...
Basics
Cluster name: rabbit@rocky221
Total CPU cores available cluster-wide: 6
Disk Nodes
rabbit@rocky223
RAM Nodes
rabbit@rocky221
rabbit@rocky222
从集群中摘除节点
[root@rocky222 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rocky222 ...
[root@rocky222 ~]# rabbitmqctl reset
Resetting node rabbit@rocky222 ...
[root@rocky222 ~]# rabbitmqctl start_app
Starting node rabbit@rocky222 ...
[root@rocky222 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky222 ...
Basics
Cluster name: rabbit@rocky222
Total CPU cores available cluster-wide: 2
Disk Nodes
rabbit@rocky222
Running Nodes
rabbit@rocky222
Versions
rabbit@rocky222: RabbitMQ 3.13.7 on Erlang 26.2.5.15
重启集群
集群重启时,最后一个挂掉的节点应该第一个重启,如果因特殊原因(比如同时断电),而不知道哪个节点最后一个挂掉。可用以下方法重启:
# 在任意一个节点上执行强制启动
rabbitmqctl force_boot
systemctl start rabbitmq-server
# 其他节点正常启动
rabbitmqctl start rabbitmq-server

666666