Skip to content

Menu
  • Home
  • 文章
    • 监控
      • 自己开发的监控服务
    • MQ
      • rabbitmq
    • Database
      • Mysql
      • redis
  • 常用脚本
    • shell
      • 系统初始化
      • jdk安装
  • 资源
    • 豆包
    • ChatGPT
    • 哔哩哔哩
    • CSDN
    • 百度
    • Github
    • 阿里镜像源
    • 腾讯云
  • 联系方式
  • 关于本站
Menu

Rabbitmq效果演示与集群配置

Posted on 2025年11月21日2025年11月22日 by rangrang

前言:rabbitmq安装参考rabbitmq基础配置

这里采用python脚本方式进行演示

Consumer下载
get-pip下载
Product下载
#安装pip3
python3 get-pip.py
#安装python相关包
pip3 install pika
#修改其他两个py文件
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.66.221', 修改rabbitmq服务器IP
        credentials=pika.PlainCredentials('admin', '123') 修改登录账户密码
    )
)
#对admin账户配置相应权限
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
[root@rocky221 ~]# python3 Product.py 100
准备发送 100 条消息到 'test_queue'...
100 条消息已全部发送完毕。
#执行消费者脚本 接收消息
[root@rocky221 ~]# python3 Consumer.py 
等待接收消息...
收到消息: Hello RabbitMQ! 这是第 1 条消息。
收到消息: Hello RabbitMQ! 这是第 2 条消息。
收到消息: Hello RabbitMQ! 这是第 3 条消息。
收到消息: Hello RabbitMQ! 这是第 4 条消息。
收到消息: Hello RabbitMQ! 这是第 5 条消息。
收到消息: Hello RabbitMQ! 这是第 6 条消息。
收到消息: Hello RabbitMQ! 这是第 7 条消息。
收到消息: Hello RabbitMQ! 这是第 8 条消息。
收到消息: Hello RabbitMQ! 这是第 9 条消息。
收到消息: Hello RabbitMQ! 这是第 10 条消息。
收到消息: Hello RabbitMQ! 这是第 11 条消息。
收到消息: Hello RabbitMQ! 这是第 12 条消息。
收到消息: Hello RabbitMQ! 这是第 13 条消息。
收到消息: Hello RabbitMQ! 这是第 14 条消息。
收到消息: Hello RabbitMQ! 这是第 15 条消息。
收到消息: Hello RabbitMQ! 这是第 16 条消息。
收到消息: Hello RabbitMQ! 这是第 17 条消息。
收到消息: Hello RabbitMQ! 这是第 18 条消息。
收到消息: Hello RabbitMQ! 这是第 19 条消息。
收到消息: Hello RabbitMQ! 这是第 20 条消息。
收到消息: Hello RabbitMQ! 这是第 21 条消息。
......

rabbitmq集群

任何一个服务,如果仅仅是单机部署,那么性能总是有上喂的,Rabbitm也不例外,当单台 Rabbitm服务处理消息的能力到达瓶颈时,可以通过集群来实现高可用和负载均衡。
通常情况下,在集群中我们把每一个服务称之为一个节点,在 Rabbitmo 集群中,节点类型可以分为两种。
内存节点:元数据存放于内存中。为了重启后能同步数据,内存节点会将磁盘节点的地址存放于磁盘之中,除此之外,如果消息被持久化了也会存放于磁盘之中,因为内存节点读写速度快,一般客户端会连接内存节点。
磁盘节点:元数据存放于磁盘中(默认节点类型),需要保证至少一个磁盘节点,否则一旦宕机,无法恢复数据,从而也就无法达到集群的高可用目的。
PS:元数据,指的是包括队列名字属性、交换机的类型名字属性、绑定信息、vhost 等基础信息,不包括队列中的消息数据。
Rabbitme 中的集群主要有两种模式:普通集群模式和镜像队列模式。

普通集群模式

在普通集群模式下,集群中各个节点之间只会相互同步元数据,也就是说,消息数据不会被同步。那么问题就来了,假如我们连接到 A 节点,但是消息又存储在 B 节点又怎么办呢?
无论是生产者还是消费者,假如连接到的节点上没有存储队列数据,那么内部会将其转发到存储队列数据的节点上进行存储。虽然说内部可以实现转发,但是因为消息仅仅只是存储在一个节点,那么假如这节点挂了,消息是不是就没有了?这个问题确实存在,所以这种普通集群模式并没有达到高可用的目的。

部署普通集群

所有节点添加hosts
cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.66.221 rocky221
192.168.66.222 rocky222
192.168.66.223 rocky223
安装并启动rabbitmq
yum install -y rabbitmq-server
systemctl start rabbitmq-server
[root@rocky221 ~]# rabbitmqctl list_users
Listing users ...
user	tags
admin	[administrator]
guest	[administrator]
[root@rocky222 ~]# rabbitmqctl list_users
Listing users ...
user	tags
guest	[administrator]
[root@rocky223 ~]# rabbitmqctl list_users
Listing users ...
user	tags
guest	[administrator]
目前只有221节点有创建的用户,222、223节点没有新创建的用户
在node1上查看集群信息、只返回一个节点(目前三个节点没有产生联系)
[root@rocky221 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky221 ...
Basics

Cluster name: rabbit@rocky221
Total CPU cores available cluster-wide: 2

Disk Nodes

rabbit@rocky221

Running Nodes

rabbit@rocky221

Versions

rabbit@rocky221: RabbitMQ 3.13.7 on Erlang 26.2.5.15

CPU Cores

Node: rabbit@rocky221, available CPU cores: 2

Maintenance status

Node: rabbit@rocky221, status: not under maintenance

确保所有节点上.erlang.cookie文件内容一致。通过Erlang的分布式特性(通过magic cookie认证节点)进行RabbitMQ集群,各RabbitMQ服务为对等节点,即每个节点都提供服务给客户端连接,进行消息发送与接收:
[root@rocky221 ~]# scp /var/lib/rabbitmq/.erlang.cookie rocky222:/var/lib/rabbitmq/
[root@rocky221 ~]# scp /var/lib/rabbitmq/.erlang.cookie rocky223:/var/lib/rabbitmq/
[root@rocky221 ~]# cat /var/lib/rabbitmq/.erlang.cookie 
TIUDEICBQYWLRTHDXNSL
将node2和node3加入到集群当中: 

# 将node2加入集群,在node2上操作
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rocky221//--ram 表示这是一个内存节点
rabbitmqctl start_app

# 在node3上执行加入集群操作, 将node3添加为一个磁盘节点
rabbitmqctl stop_app
rabbitmqctl  reset
rabbitmqctl join_cluster --disc rabbit@rocky221 //--disc表示磁盘节点(默认也是磁盘节点)
rabbitmqctl start_app
#查看集群状态
[root@rocky221 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky221 ...
Basics

Cluster name: rabbit@rocky221
Total CPU cores available cluster-wide: 6

Disk Nodes

rabbit@rocky221
rabbit@rocky223

RAM Nodes

rabbit@rocky222

Running Nodes

rabbit@rocky221
rabbit@rocky222
rabbit@rocky223

Versions

rabbit@rocky221: RabbitMQ 3.13.7 on Erlang 26.2.5.15
rabbit@rocky223: RabbitMQ 3.13.7 on Erlang 26.2.5.15
rabbit@rocky222: RabbitMQ 3.13.7 on Erlang 26.2.5.15

CPU Cores

Node: rabbit@rocky221, available CPU cores: 2
Node: rabbit@rocky223, available CPU cores: 2
Node: rabbit@rocky222, available CPU cores: 2

Maintenance status

Node: rabbit@rocky221, status: not under maintenance
Node: rabbit@rocky223, status: not under maintenance
Node: rabbit@rocky222, status: not under maintenance

测试普通集群,验证效果

[root@rocky222 ~]# rabbitmqctl list_users
Listing users ...
user	tags
admin	[administrator]
guest	[administrator]
现在查看222节点,元数据也已经同步成功

现在演示往test_queue里写入数据,从222节点读取数据,看是否成功
注意:test_queue是在product.py脚本里设置的,在221节点上,所以测试数据是存放在221节点
[root@rocky221 ~]# cat Product.py |grep 223
        host='192.168.66.223',
[root@rocky221 ~]# python3 Product.py 100
准备发送 100 条消息到 'test_queue'...
100 条消息已全部发送完毕。
[root@rocky221 ~]# cat Consumer.py |grep 222
        host='192.168.66.222',
[root@rocky221 ~]# python3 Consumer.py 
等待接收消息...
收到消息: Hello RabbitMQ! 这是第 1 条消息。
收到消息: Hello RabbitMQ! 这是第 2 条消息。
收到消息: Hello RabbitMQ! 这是第 3 条消息。
收到消息: Hello RabbitMQ! 这是第 4 条消息。
收到消息: Hello RabbitMQ! 这是第 5 条消息。
收到消息: Hello RabbitMQ! 这是第 6 条消息。
收到消息: Hello RabbitMQ! 这是第 7 条消息。
收到消息: Hello RabbitMQ! 这是第 8 条消息。
收到消息: Hello RabbitMQ! 这是第 9 条消息。
收到消息: Hello RabbitMQ! 这是第 10 条消息。
收到消息: Hello RabbitMQ! 这是第 11 条消息。
收到消息: Hello RabbitMQ! 这是第 12 条消息。
收到消息: Hello RabbitMQ! 这是第 13 条消息。
收到消息: Hello RabbitMQ! 这是第 14 条消息。
收到消息: Hello RabbitMQ! 这是第 15 条消息。
收到消息: Hello RabbitMQ! 这是第 16 条消息。
收到消息: Hello RabbitMQ! 这是第 17 条消息。
...... 读取成功  
显示是成功的,但如果往test_queue里写入数据,但写入数据之后,221节点宕机了呢,还能读取到数据嘛
现在来演示一下
注意:test_queue是在product.py脚本里设置的,在221节点上,所以测试数据是存放在221节点
[root@rocky221 ~]# python3 Product.py 100
准备发送 100 条消息到 'test_queue'...
100 条消息已全部发送完毕。
[root@rocky221 ~]# systemctl stop rabbitmq-server.service 
[root@rocky221 ~]# python3 Consumer.py 
Traceback (most recent call last):
  File "/root/Consumer.py", line 14, in <module>
    channel.queue_declare(queue='test_queue', durable=True)
  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2524, in queue_declare
    self._flush_output(declare_ok_result.is_ready)
  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1357, in _flush_output
    raise self._closing_reason  # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - queue 'test_queue' in vhost '/' process is stopped by supervisor")
读取失败
这是因为普通集群,只会同步元数据,不会同步数据的原因

配置镜像队列模式

(3.x版本需要手动设置镜像队列才能同步)而最新4.x版本默认普通集群自动会将数据同步至各个节点
镜像队列模式本质上,就是在普通集群模式的基础上配置队列的同步

如果需要将所有队列都设置为镜像模式,可在任意节点执行如下操作
[root@rocky221 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
现在再重新演示一下 刚刚普通集群的弊端
[root@rocky221 ~]# python3 Product.py 10
准备发送 10 条消息到 'test_queue'...
10 条消息已全部发送完毕。
[root@rocky221 ~]# systemctl stop rabbitmq-server
[root@rocky221 ~]# python3 Consumer.py 
等待接收消息...
收到消息: Hello RabbitMQ! 这是第 1 条消息。
收到消息: Hello RabbitMQ! 这是第 2 条消息。
收到消息: Hello RabbitMQ! 这是第 3 条消息。
收到消息: Hello RabbitMQ! 这是第 4 条消息。
收到消息: Hello RabbitMQ! 这是第 5 条消息。
收到消息: Hello RabbitMQ! 这是第 6 条消息。
收到消息: Hello RabbitMQ! 这是第 7 条消息。
收到消息: Hello RabbitMQ! 这是第 8 条消息。
收到消息: Hello RabbitMQ! 这是第 9 条消息。
收到消息: Hello RabbitMQ! 这是第 10 条消息。
显示虽然221节点宕机了,但数据依然可以访问,这是因为配置了镜像队列模式,不但同步元数据,连数据也会同步

如果需要将所有队列都设置为镜像模式,可在任意节点执行如下操作:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

将名称以ha开头的队列同步至所有节点:
rabbitmqctl set_policy ha-all "^ha.*" '{"ha-mode": "all"}'

将所有队列至少同步至两个节点:
rabbitmqctl set_policy ha-two "^.*" '{"ha-mode": "exactly","ha-params": 2,"ha-sync-mode": "automatic"   }'

在任意节点查看相应策略
[root@rocky222 ~]# rabbitmqctl list_policies
Listing policies for vhost "/" ...
vhost	name	pattern	apply-to	definition	priority
/	ha-all	^	all	{"ha-mode":"all"}	0

删除策略
[root@rocky222 ~]# rabbitmqctl clear_policy -p / ha-all
Clearing policy "ha-all" on vhost "/" ...

集群管理操作

切换节点类型
将221切换成内存节点
[root@rocky221 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rocky221 ...
[root@rocky221 ~]# rabbitmqctl change_cluster_node_type ram
Turning rabbit@rocky221 into a ram node
[root@rocky221 ~]# rabbitmqctl start_app
Starting node rabbit@rocky221 ...
[root@rocky221 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky221 ...
Basics

Cluster name: rabbit@rocky221
Total CPU cores available cluster-wide: 6

Disk Nodes

rabbit@rocky223

RAM Nodes

rabbit@rocky221
rabbit@rocky222

从集群中摘除节点
[root@rocky222 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rocky222 ...
[root@rocky222 ~]# rabbitmqctl reset
Resetting node rabbit@rocky222 ...
[root@rocky222 ~]# rabbitmqctl start_app
Starting node rabbit@rocky222 ...
[root@rocky222 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rocky222 ...
Basics

Cluster name: rabbit@rocky222
Total CPU cores available cluster-wide: 2

Disk Nodes

rabbit@rocky222

Running Nodes

rabbit@rocky222

Versions

rabbit@rocky222: RabbitMQ 3.13.7 on Erlang 26.2.5.15


重启集群
集群重启时,最后一个挂掉的节点应该第一个重启,如果因特殊原因(比如同时断电),而不知道哪个节点最后一个挂掉。可用以下方法重启:
# 在任意一个节点上执行强制启动
rabbitmqctl force_boot
systemctl start rabbitmq-server

# 其他节点正常启动
rabbitmqctl start rabbitmq-server

1 thought on “Rabbitmq效果演示与集群配置”

  1. 王畅说道:
    2025年11月22日 上午1:57

    666666

    回复

发表回复 取消回复

您的邮箱地址不会被公开。 必填项已用 * 标注

网站统计

本站已运行:5 天

文章总数:9 篇

分类:6 个 · 标签:6 个

最后更新:2025-11-22

一句话

记录每一次踩坑,帮后来人少踩一个坑。

持续学习 · 持续输出 · 持续复盘。

友情链接

  • 新华社
  • 人民网
  • 中国网
© 2025 | Powered by Superbs Personal Blog theme