RabbitMQ Deep Dive (by quqi99)

2023-11-11 05:30
文章标签 rabbitmq deep dive quqi99

本文主要是介绍RabbitMQ Deep Dive (by quqi99),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

作者:张华  发表于:2015-06-03
版权声明:可以任意转载,转载时请务必以超链接形式标明文章原始出处和作者信息及本版权声明

( http://blog.csdn.net/quqi99 )

AMQP概念

AMPQ在RPC的基础上引入了中间件机制来解偶client与server来支持同步与异步调用。

通过消息机制,可以实现数据传输,非阻塞型操作,推送通知,发布/订阅,异步处理,work队列。
AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。

  • virutal host相当于namespace,用于不同tenant之间的exchange, queue, binding的隔离。
  •  Queue队列, 每个消息都会被投入到一个或多个队列。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。 
  • Binding绑定, 它的作用就是把exchange和queue按照路由规则绑定起来。 
  • Routing_Key路由关键字:exchange根据这个关键字进行消息投递。
  • Channele消息通道:在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。 
  • Exchange交换机,对消息进行路由,当收到Publisher传递给它的消息后,Excahnge会根据路由键routing_key决定将消息加入到哪些消息队列中。 

消息的类型:

  • Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。一对一交换类型。
  • Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。一对多主题多播交换类型。
  • Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。一对多广播交换类型。

AMQP 支持三种调用:

Call: 同步调用,但过程稍微复杂,producer 发送消息后立刻创建一个 direct consumer, 该 direct consumer 阻塞于接收返回值。对端的 consumer 接收并处理 producer 的消息后,创建一个 direct producer,它负责把处理结果发送给 direct consumer,如下图:

Cast: 异步调用,producer 发送消息后继续执行后续步骤,consumer 接收处理消息,如下图:

Fanout: 相当于广播,producer 可把消息发送给多个 consumer,属于异步调用范畴,如下图:

RabbitMQ简介与特点

RabbitMQ是一个开源的AMQP协议的实现,它具有如下特点:可靠性(Reliability), RabbitMQ使用一些机制来保证程序的可靠性,如持久化、传输确认机制、发布确认、高可用性。灵活的路由机制(Flexible Routing), 在消息进入队列之前,通过Exchange来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。消息集群(Clustering)多个RabbitMQ服务器可以组成一个集群,形成单个逻辑Broker。Federation, For servers that need to be more loosely and unreliably connected than clustering allows, RabbitMQ offers a federation model.队列高可用(Highly Available Queues), 队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全。多种协议的支持(Multi-protocol), RabbitMQ支持多种消息队列协议。

一个rabbitmq python例子

#coding:utf-8
import sys
from amqplib import client_0_8 as amqp
if __name__ == '__main__':if (len(sys.argv) <= 1):ispublisher = '0'print "Then pls run 'rabbittest 1' to sent message."else:ispublisher = sys.argv[1]conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="password", virtual_host="/", insist=False)# 每个channel都被分配了一个整数标识chan = conn.channel()# 创建一个队列,它是durable的(重启后会重新建立)a# 消费者断开时不会自动删除(auto_delte=False)chan.queue_declare(queue="queue1", durable=True, exclusive=False, auto_delete=False)# 创建交换机,参数意思和上面的队列是一样的,还有一个type类型:fanout, direct, topicchan.exchange_declare(exchange="switch1", type="direct",durable=True, auto_delete=False,)# 绑定交换机和队列chan.queue_bind(queue="queue1", exchange="switch1", routing_key="key1")if (ispublisher == '1'):# 生产者msg = amqp.Message("Test message!")msg.properties["delivery_mode"] = 2chan.basic_publish(msg, exchange="switch1", routing_key="key1")else:# 主动从队列拉消息msg = chan.basic_get("queue1")print msg.bodychan.basic_ack(msg.delivery_tag)# 消息来了通知回调# 如果no_ack=True可以使用chan.basic_ack()人工确认,使用delivery_tag参数def recv_callback(msg):print 'Received: ' + msg.bodychan.basic_consume(queue='queue1', no_ack=False,callback=recv_callback, consumer_tag="testtag")# chan.basic_cancel("testtag") # 取消回调函数while True:chan.wait()  # 等待在队列上,直到下一个消息到达队列。chan.close()conn.close()

RabbitMQ CLI

安装,sudo apt-get install rabbitmq-server
重启,sudo service rabbitmq-server restart
sudo rabbitmqctl list_vhostssudo rabbitmqctl add_vhost demo
sudo rabbitmqctl list_users
sudo rabbitmqctl add_user test password
sudo rabbitmqctl change_password test password
sudo rabbitmqctl clear_password test
sudo rabbitmqctl list_user_permissions test
sudo rabbitmqctl set_permissions -p demo test ".*" ".*" ".*"
sudo rabbitmqctl clear_permissions -p demo test
sudo rabbitmqctl list_queues -p demo name durable auto_delete slave_pids synchronised_slave_pids
sudo rabbitmqadmin delete queue name='queuename'
sudo rabbitmqctl list_exchanges -p demosudo rabbitmqctl list_bindings -p demo
sudo rabbitmqctl list_consumers -p demosudo rabbitmqctl statussudo rabbitmqctl report# fileds can be:  [name, durable, auto_delete, arguments, policy, pid, owner_pid, exclusive, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, messages_unacknowledged, messages, messages_ready_ram, messages_unacknowledged_ram, messages_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, message_bytes_persistent, head_message_timestamp, disk_reads, disk_writes, consumers, consumer_utilisation, memory, slave_pids, synchronised_slave_pids, state]
sudo rabbitmqctl list_queues name slave_pids synchronised_slave_pids durable -p openstack

RabbitMQ GUI

Enable it, sudo rabbitmq-plugins enable rabbitmq_management
http://localhost:15672  (guest/guest)

RabbitMQ配置文件

http://www.rabbitmq.com/configure.html#configuration-file
sudo find / -name rabbitmq.config*
sudo mv /usr/share/doc/rabbitmq-server/rabbitmq.config.example.gz /etc/rabbitmq/cd /etc/rabbitmq/ && sudo gunzip rabbitmq.config.example.gz
sudo mv rabbitmq.config.example rabbitmq.config

RabbitMQ调优

1, 流控(Flow Control)机制,默认RabbitMQ在使用机器的40%以上的内存时流控机制会起作用block掉所有连接。故确保使用64位操作系统与64位Erlang VM。当RabbitMQ是集群情况下,当其中有一台机器硬盘不足的时候,所有节点的producer链接也会被阻止。

rabbitmqctl  set_vm_memory_high_watermark 0.4
rabbitmqctl set_vm_memory_high_watermark_paging_ratio 0.75
rabbitmqctl status
http://www.rabbitmq.com/memory.html
Max open files,/etc/default/rabbitmq-server
ulimit -n 65535
cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits

2, Erlang的Hipe优化, 可以设置hipe_compiles设置。可以看到有20-50%的性能优化。而你只需要付出1分钟左右的延迟启动。 HiPE需要你检查是否编译进入你的Erlang安装环境。Ubuntu,需要安装erlang-base-hipe.默认有些平台不支持。如果Erlang VM segfaults,请关闭这个选项。

[{rabbit, [{hipe_compile, true}]}].

RabbitMQ集群

跨三个节点部署 RabbitMQ 集群和镜像消息队列。可以使用 HAProxy 提供负载均衡,或者将 RabbitMQ host list 配置给 OpenStack 组件(使用 rabbit_hosts 和 rabbit_ha_queues 配置项)。

先看第一种方式(采用HAproxy):

# 每个节点上执行下列命令配置RabbitMQ集群
# 根据需要设置当前节点的工作模式(ram/disk),ROOT_NODE_HOSTNAME为集群根节点的主机名,注意在此必须使用主机名

apt-get install rabbitmq-server
rabbitmq-server -detached                   #detached为后台运行别占据终端
echo 'MYRABBITMQCLUSTERABC' > /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
/usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
/usr/sbin/rabbitmqctl stop_app
/usr/sbin/rabbitmqctl reset
/usr/sbin/rabbitmqctl join_cluster --ram rabbit@${ROOT_NODE_HOSTNAME}
/usr/sbin/rabbitmqctl start_app
service rabbitmq-server restart
# 在主节点上添加用户
/usr/sbin/rabbitmqctl add_user web_admin password
/usr/sbin/rabbitmqctl add_user mgmt_admin password
/usr/sbin/rabbitmqctl set_user_tags username monitoring
/usr/sbin/rabbitmqctl set_user_tags mgmt_admin administrator
/usr/sbin/rabbitmqctl rabbitmqctl list_users
/usr/sbin/rabbitmqctl set_permissions -p / mgmt_admin ".*" ".*" ".*"
# 设置HAProxy, 需要设置成镜像队列,可以访问http://192.168.64.87:8888,用户名web_admin/password进行访问
/usr/sbin/rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
修改文件:/etc/haproxy/haproxy.cfg
listen rabbitmq_cluster 0.0.0.0:5672
mode tcp
balance roundrobin
server   node1 192.168.1.1:5672 check inter 2000 rise 2 fall 3 
/usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg -D

第二种使用 rabbit_hosts 和 rabbit_ha_queues 配置项:

rabbit_hosts = rabbit1:5672,rabbit2:5672
rabbit_host = rabbit1
rabbit_ha_queues = true
如果配置了rabbit_hosts,那么nova将会按照顺序连接一个RabbitMQ服务,如果正在使用的MQ服务断开则依次尝试连接下一个,由于所有MQ的消息都是同步的,所以消息不会丢失。
如果配置了rabbit_host,那么需要在集群前架设haproxy,保证集群VIP服务正常。confirm that actual queue is connected and can consume that queue.  sudo rabbitmq-plugins enable rabbitmq_management
wget http://127.0.0.1:15672/cli/rabbitmqadmin && chmod 777 rabbitmqadmi
sudo rabbitmqctl add_user test password
sudo rabbitmqctl set_user_tags test administrator
sudo rabbitmqctl set_permissions -p openstack test ".*" ".*" ".*"
http://10.5.0.6:15672/#/queues/openstack/compute.juju-a09725-xenial-mitaka-7
./rabbitmqadmin publish -V openstack -u test -p password exchange=nova routing_key=compute.juju-a09725-xenial-mitaka-7 payload="test" 
tail -f /var/log/nova/nova-compute.log具体地见:http://m.blog.csdn.net/blog/gtt116/21083533

Debug Hacks

$ tshark -r xxx.pcap |awk '{arr[$5]++}END{for (a in arr) print a, arr[a]}' |sort -n -k 2 -r | head -n 3
10.55.74.103 62756
10.55.74.142 12976
10.55.74.139 12228
juju run -u rabbitmq-server/0 'sudo rabbitmqctl list_queues -p openstack|grep -wv 0'watch -c "sudo rabbitmqctl list_queues -p openstack | grep -E 'log|neutron|agent'" 

Reset rabbitmq slave

1) On juju-3182a3-69-lxd-2, back mnesia, stop the service
$ sudo mv /var/lib/rabbitmq/mnesia /var/lib/rabbitmq/mnesia-back
$ sudo service rabbitmq-server stop
2) Forget the cluster nodes from the rabbit master node
$ sudo rabbitmqctl stop_app
$ sudo rabbitmqctl forget_cluster_node rabbit@juju-3182a3-69-lxd-2
$ sudo rabbitmqctl start_app

如何恢复systemd管理的native mirror rabbitmq cluster

如何恢复systemd管理的native mirror rabbitmq cluster
1, 确保在3个节点上,rabbitmq-server先由systemd启动(随后会由pacemaker接管),这样能可能运行rabbitmqctl cluster_status命令.假设此时3个节点各自为政.
juju run --application=rabbitmq-server 'sudo rabbitmqctl cluster_status'2, juju status看有没有error状态,例如现在看到rabbitmq-server/1因为下列日志为error状态, rabbitmq-server/1上运行:
https://www.jianshu.com/p/498c63e4ace1
https://ywnz.com/linuxyffq/3899.html
2020-02-21 09:24:09 DEBUG config-changed subprocess.CalledProcessError: Command '['timeout', '180', '/usr/sbin/rabbitmqctl', 'wait', '/var/lib/rabbitmq/mnesia/rabbit@juju-cbd760-octavia-10.pid']' returned non-zero exit status 70.
systemctl restart rabbitmq-server
rabbitmqctl status |grep pid   #write pid to /var/lib/rabbitmq/mnesia/rabbit@juju-cbd760-octavia-10.pid
rabbitmqctl wait /var/lib/rabbitmq/mnesia/rabbit@juju-cbd760-octavia-10.pid
juju resolved rabbitmq-server/1 --no-retry
[可选]确保juju status没有rabbitmq untis相关错误之后,触发hooks
juju run --application ha hooks/config-changed
juju run --application rabbitmq-server hooks/ha-relation-joined
juju show-status-log neutron-openvswitch/23, 没有ceph的情况下,代码显示必须有ha-vip-only=true
juju deploy -n 3 rabbitmq-server
juju deploy hacluster ha
juju add-relation rabbitmq-server ha
juju config rabbitmq-server vip=10.5.100.20
juju config rabbitmq-server vip_iface=ens3
juju config ha corosync_bindiface=ens3
juju config rabbitmq-server ha-vip-only=true
juju config ha cluster_count=3
juju config rabbitmq-server min-cluster-size=34, 找到leader,假如leader是rabbitmq-server/1,并在其上找到cluster_name
juju run --application rabbitmq-server "is-leader"   #assue rabbitmq-server/1 is the leader
root@juju-cbd760-octavia-10:~# rabbitmqctl cluster_status |grep cluster{cluster_name,<<"rabbitmq-server@juju-cbd760-octavia-10">>},5, rabbitmq-server/1, 把RABBITMQ_NODENAME由localhost改成juju-cbd760-octavia-10
root@juju-cbd760-octavia-10:~# grep -r 'RABBITMQ_NODENAME' /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODENAME=rabbitmq-server@juju-cbd760-octavia-10
root@juju-cbd760-octavia-10:~# cat /var/lib/rabbitmq/.erlang.cookie
CZIFVOYCELFFGUFWJBZY
systemctl restart rabbitmq-server
rabbitmqctl cluster_status但代码中有这一句会在ha-vip-only=false时将RABBITMQ_NODENAME设为localhost, 所以ha-vip-only应为ha-vip-only,
同时,这里也说对于rabbitmq的hacluster方式的ha已经废弃了
https://github.com/openstack/charm-rabbitmq-server/blob/master/hooks/rabbitmq_context.py#L250# TODO: this is legacy HA and should be removed since it is now# deprecated.if relation_ids('ha'):if not config('ha-vip-only'):# TODO: do we need to remove this setting if it already exists# and the above is false?context['settings']['RABBITMQ_NODENAME'] = \'{}@localhost'.format(service_name())6, 登录到rabbitmq-server/0, 修改hostname并加入集群
root@juju-cbd760-octavia-9:~# grep -r 'RABBITMQ_NODENAME' /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODENAME=rabbitmq-server@juju-cbd760-octavia-9
systemctl restart rabbitmq-server
rabbitmqctl cluster_status
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbitmq-server@juju-cbd760-octavia-10
rabbitmqctl cluster_status7, 登录到rabbitmq-server/2重复上一步,只是hostname不同, 这里为juju-cbd760-octavia-118, 此时.
root@juju-cbd760-octavia-10:~# rabbitmqctl cluster_status
Cluster status of node 'rabbitmq-server@juju-cbd760-octavia-10'
[{nodes,[{disc,['rabbitmq-server@juju-cbd760-octavia-10','rabbitmq-server@juju-cbd760-octavia-11','rabbitmq-server@juju-cbd760-octavia-9']}]},{running_nodes,['rabbitmq-server@juju-cbd760-octavia-10']},{cluster_name,<<"rabbitmq-server@juju-cbd760-octavia-10">>},{partitions,[]},{alarms,[{'rabbitmq-server@juju-cbd760-octavia-10',[]}]}]9, 但上面修复只是systemd管理的native集群.如何被pacemaker管理呢? 在rabbmitmq-server/1上(juju-cbd760-octavia-10)停掉systemd,启动corosync与packemaker,但前提是hacluster charm已经成功为corosync配置了res_rabbitmq_vip resources (当ha-vip-only=true时只有这一个)此时看到crm status没有res_rabbitmq_vip这个resource,
#juju run --application rabbitmq-server hooks/ha-relation-joined
juju run --application ha hooks/config-changed
juju run --unit ha/0 "sudo corosync-quorumtool -s"
juju run --unit ha/0 "sudo crm status"
juju run --unit ha/0 "sudo crm resource restart res_rabbitmq_vip"
juju run --unit ha/0 "sudo crm resource clean res_rabbitmq_vip"
#https://github.com/ClusterLabs/resource-agents/blob/master/heartbeat/rabbitmq-cluster
ls /usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha若运行上面hook无法加入res_rabbitmq_vip的话,检查代码是应该设置ha-vip-only=true,然后使用下面方法添加:
juju remove-relation rabbitmq-server ha
juju add-relation rabbitmq-server ha然后可以看到:
root@juju-cbd760-octavia-11:~# crm status |grep vipres_rabbitmq_vip       (ocf::heartbeat:IPaddr2):       Started juju-cbd760-octavia-10但是仍然看到这种错误:
ubuntu@zhhuabj-bastion:~$ juju status |grep waiting
rabbitmq-server             3.6.10   waiting      3  rabbitmq-server             jujucharms  358  ubuntu  
rabbitmq-server/0                waiting   idle   9        10.5.0.35       5672/tcp                    Unit has peers, but RabbitMQ not clustered
rabbitmq-server/1*               waiting   idle   10       10.5.0.32       5672/tcp                    Unit has peers, but RabbitMQ not clustered
rabbitmq-server/2                waiting   idle   11       10.5.0.28       5672/tcp                    Unit has peers, but RabbitMQ not clustered10, 继续调试, ha relation的数目不对,所以停在' Unit has peers, but RabbitMQ not clustered'
ubuntu@zhhuabj-bastion:~$ juju run --unit rabbitmq-server/0 "relation-ids ha" 
ha:41
ubuntu@zhhuabj-bastion:~$ juju run --unit rabbitmq-server/0 "relation-list -r ha:41" 
ha/3   #because hacluster was named to ha so name is ha/3 now
ubuntu@zhhuabj-bastion:~$ juju run --unit rabbitmq-server/0 "relation-get -r ha:41 - ha/3" 
clustered: "yes"
egress-subnets: 10.5.0.35/32
ingress-address: 10.5.0.35
private-address: 10.5.0.35它在找rabbitmqctl cluster_status中为running_nodes的个数828 @cached     829 def clustered():                                                                                                                                                                  830     ''' Determine whether local rabbitmq-server is clustered '''831     # NOTE: A rabbitmq node can only join a cluster once.832     # Simply checking for more than one running node tells us833     # if this unit is in a cluster.834     if len(running_nodes()) > 1:835         return True836     else:                    837         return False787 @cached                                 788 def running_nodes():789     ''' Determine the current set of running nodes in the RabbitMQ cluster '''790     return nodes(get_running=True)770 @cached 771 def nodes(get_running=False):772     ''' Get list of nodes registered in the RabbitMQ cluster '''773     out = rabbitmqctl_normalized_output('cluster_status')774     cluster_status = {}775     for m in re.finditer("{([^,]+),(?!\[{)\[([^\]]*)", out):776         state = m.group(1)777         items = m.group(2).split(',')778         items = [x.replace("'", '').strip() for x in items]779         cluster_status.update({state: items})780             781     if get_running:782         return cluster_status.get('running_nodes', [])                                                                                                                            783                 784     return cluster_status.get('disc', []) + cluster_status.get('ram', [])

Olso messaging

OpenStack 有三大常用消息中间件,RabbitMQ,QPID 和 ZeroMQ,它们参数和接口各异,不利于直接使用,所以 oslo.messaging 对这三种消息中间件做了抽象和封装,为上层提供统一的接口。

Oslo.messaging 抽象出了两类数据:

  • Transport: 消息中间件的基本参数,如 host, port 等信息。
    • RabbitMQ:host, port, userid, password, virtual_host, durable_queues, ha_queues 等。
    • Qpid: hostname, port, username, password, protocol (tcp or ssl) 等。
    • ZeroMQ: bind_address, host, port, ipc_dir 等。
  • Target: 主要包括 exchange, topic, server (optional), fanout (defaults to False) 等消息通信时用到的参数。

示例如下:

rabbitmqctl add_user hua hua
rabbitmqctl set_user_tags hua administrator
rabbitmqctl set_permissions -p / hua '.*' '.*' '.*'
rabbitmqctl list_users
rabbitmqctl list_user_permissions huaapt install rabbitmq-server python3-oslo.config python3-oslo.messaging -y# python3 consumer.py myserver
# cat consumer.py
#!/usr/bin/python
from oslo_config import cfg
import oslo_messaging
import sys
import timeclass TestEndpoint(object):def test(self, ctx, a,b):print("receive client access")return a+btransport_url = 'rabbit://hua:hua@127.0.0.1:5672/'
server = sys.argv[1]
transport = oslo_messaging.get_transport(cfg.CONF,transport_url)
target = oslo_messaging.Target(topic='test', server=server)
endpoints = [TestEndpoint(),
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints)
try:server.start()while True:time.sleep(1)
except KeyboardInterrupt:print("Stopping server")server.stop()
server.wait()#cat producer.py
#!/usr/bin/pythonfrom oslo_config import cfg
import oslo_messagingtransport_url = 'rabbit://hua:hua@127.0.0.1:5672/'
transport = oslo_messaging.get_transport(cfg.CONF,transport_url)
target = oslo_messaging.Target(topic='test')
client = oslo_messaging.RPCClient(transport, target)
r = client.call({}, 'test',a=2,b=3)
print(r)
print('success')

#cat consumer.py
#-*- coding:utf-8 -*-
from oslo_config import cfg
import  oslo_messaging as messaging
import timeclass ServerControlEndpoint(object):target = messaging.Target(namespace='control', version='2.0')def __init__(self, server):self.server = serverdef stop(self, ctx):print("Call ServerControlEndpoint.stop()")        if self.server is not None:self.server.stop()class TestEndpoint(object):def test(self, ctx, arg):print("Call TestEndpoint.test()")return argmessaging.set_transport_defaults('myexchange')
transport = messaging.get_transport(cfg.CONF,url='rabbit://hua:hua@127.0.0.1:5672/')
target = messaging.Target(topic='testtopic', server='server1')
endpoints = [ServerControlEndpoint(None),TestEndpoint(),
]
server = messaging.get_rpc_server(transport, target, endpoints)
try:server.start()while True:time.sleep(1)
except KeyboardInterrupt:print("Stopping server")
server.stop()
server.wait()
transport.cleanup()#cat producer.py
#-*- coding:utf-8 -*-from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
import sys# log initialization
CONF = cfg.CONF
logging.register_options(CONF)
CONF(sys.argv[1:])
logging.setup(CONF, "producer")
#module log init
LOG = logging.getLogger(__name__)
messaging.set_transport_defaults('myexchange')
LOG.info("Create a transport...")
transport= messaging.get_transport(cfg.CONF,url='rabbit://hua:hua@127.0.0.1:5672')
#construct target, topic parameter is essential
LOG.info("Creating target")
target= messaging.Target(topic='testtopic',server='server1')

运行它之后,会看到:

  • 用“rabbitmqctl list_exchanges”看到两个exchanges(testtopic_fanout        fanout  与 myexchange      topic)
  • 用“rabbitmqctl list_queues”看到3个, 其中fanout这个在stop 之后会消失,其他两个topic queue不会, 见:https://review.opendev.org/c/openstack/oslo.messaging/+/243845/5/oslo_messaging/_drivers/impl_rabbit.py#170
# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
testtopic       0
testtopic.server1       0
testtopic_fanout_04f2ffd679034508891dc52f20864d37       0

这样,在heat中,重启一次会生成下列4个topic queues,同时加4个fanout queues, fanout queues在一段TTL时间后会消失,但下列4个topic queues不会,所以heat的exchange与queue数目会一直增加,最后提升CPU利用率恶性循环

root@juju-c1fca3-heat-12:~# rabbitmqctl list_queues -p openstack | grep -E 'engine_worker|heat-engine-listener' |grep -E '42cee820-4f0c-4aef-b8b6-705e7db3253a|8c12d70c-b00c-4e9b-b33e-7cbf0cb8c510'
engine_worker.42cee820-4f0c-4aef-b8b6-705e7db3253a 0
engine_worker.8c12d70c-b00c-4e9b-b33e-7cbf0cb8c510 0
heat-engine-listener.42cee820-4f0c-4aef-b8b6-705e7db3253a 0
heat-engine-listener.8c12d70c-b00c-4e9b-b33e-7cbf0cb8c510 0

在service.py#start中传进去的engine_id是每次重启随机的:

        self.worker_service = worker.WorkerService(                                                                                                                                   host=self.host,                                                     topic=rpc_worker_api.TOPIC,                                         engine_id=self.engine_id,                                           thread_group_mgr=self.thread_group_mgr                              )                                                                       self.worker_service.start() 

WorerService的将用engine_id作为server, 所以将产生众多 engine_worker.<engine_id>的topic queue.

    def start(self):                                                            target = oslo_messaging.Target(                                         version=self.RPC_API_VERSION,                                       server=self.engine_id,                                              topic=self.topic)                                                   self.target = target                                                    LOG.info("Starting %(topic)s (%(version)s) in engine %(engine)s.",      {'topic': self.topic,                                          'version': self.RPC_API_VERSION,                              'engine': self.engine_id})                                    self._rpc_server = rpc_messaging.get_rpc_server(target, self)           self._rpc_server.start() 

数据库记录类似:

mysql> select * from service;
...+--------------------------------------+--------------------------------------+------+--------------------+-------------+--------+-----------------+---------------------+---------------------+---------------------+
| id                                   | engine_id                            | host | hostname           | binary      | topic  | report_interval | created_at          | updated_at          | deleted_at          |
+--------------------------------------+--------------------------------------+------+--------------------+-------------+--------+-----------------+---------------------+---------------------+---------------------+
| 21d903ba-205b-45db-a2bf-2399f07f34a0 | 054c180e-88cd-41f3-8c1a-d66f8dba9696 | heat | juju-c1fca3-heat-2 | heat-engine | engine |              60 | 2021-04-15 04:24:58 | 2021-04-15 06:43:58 | NULL

按道理说,是不应该产生这么topic queue (engine_worker.<engine_id>)的名字的,所有heat service共用一个topic queue (engine_worker), 但后每个heat service用engine_id作为routing_id, 但是在oslo_messaging中的routing_key是变量topic

    def declare_topic_consumer(self, exchange_name, topic, callback=None,       queue_name=None):                                """Create a 'topic' consumer."""                                        consumer = Consumer(                                                    exchange_name=exchange_name,                                        queue_name=queue_name or topic,                                     routing_key=topic,                                                  type='topic',                                                       durable=self.amqp_durable_queues,                                   exchange_auto_delete=self.amqp_auto_delete,                         queue_auto_delete=self.amqp_auto_delete,                            callback=callback,                                                  rabbit_ha_queues=self.rabbit_ha_queues,                             enable_cancel_on_failover=self.enable_cancel_on_failover) 

heat#worker.py是这样设置topic的, topic是固定义的:

    def start(self):                                                            target = oslo_messaging.Target(                                         version=self.RPC_API_VERSION,                                       server=self.engine_id,                                              topic=self.topic)                                                   self.target = target                                                    LOG.info("Starting %(topic)s (%(version)s) in engine %(engine)s.",      {'topic': self.topic,                                          'version': self.RPC_API_VERSION,                              'engine': self.engine_id})                                    self._rpc_server = rpc_messaging.get_rpc_server(target, self)           self._rpc_server.start()  

同时heat#work.py传进去的server参数最后用在olso_messaging中的_send方法中:

                if notify:                                                      exchange = self._get_exchange(target)                       LOG.debug(log_msg + "NOTIFY exchange '%(exchange)s'"        " topic '%(topic)s'", {'exchange': exchange,      'topic': target.topic})    conn.notify_send(exchange, target.topic, msg, retry=retry)  elif target.fanout:                                             log_msg += "FANOUT topic '%(topic)s'" % {                   'topic': target.topic}                                  LOG.debug(log_msg)                                          conn.fanout_send(target.topic, msg, retry=retry)            else:                                                           topic = target.topic                                        exchange = self._get_exchange(target)                       if target.server:                                           topic = '%s.%s' % (target.topic, target.server)         LOG.debug(log_msg + "exchange '%(exchange)s'"               " topic '%(topic)s'", {'exchange': exchange,      'topic': topic})           conn.topic_send(exchange_name=exchange, topic=topic,        msg=msg, timeout=timeout, retry=retry,      transport_options=transport_options)def _get_exchange(self, target):                                                                                                                                                  return target.exchange or self._default_exchange 

难道是要这种写法?

$ git diff
diff --git a/heat/engine/service.py b/heat/engine/service.py
index bd3631df7..dccd90836 100644
--- a/heat/engine/service.py
+++ b/heat/engine/service.py
@@ -309,8 +309,8 @@ class EngineListener(object):def start(self):self.target = messaging.Target(
-            server=self.engine_id,
-            topic=rpc_api.LISTENER_TOPIC)
+            topic=self.engine_id,
+            exchange=rpc_api.LISTENER_TOPIC)self._server = rpc_messaging.get_rpc_server(self.target, self)self._server.start()diff --git a/heat/engine/worker.py b/heat/engine/worker.py
index ea8553d75..9a2b771af 100644
--- a/heat/engine/worker.py
+++ b/heat/engine/worker.py
@@ -80,8 +80,8 @@ class WorkerService(object):def start(self):target = oslo_messaging.Target(version=self.RPC_API_VERSION,
-            server=self.engine_id,
-            topic=self.topic)
+            topic=self.engine_id,
+            exchange=self.topic)self.target = targetLOG.info("Starting %(topic)s (%(version)s) in engine %(engine)s.",{'topic': self.topic,

另外一种方式是在charm里去设置TTL:

sudo rabbitmqctl set_policy heat_expiry "heat-engine-listener|engine_worker" "{'expires':3600000}" -p openstack --apply-to queues --priority 1

其余listener的用法可以继续查看: https://blog.csdn.net/jxxiaohou/article/details/78386879

20210906更新 - 一些rabbitmq基本操作

https://bugs.launchpad.net/oslo.messaging/+bug/1789177 

Fix a rabbitmq issue

https://zhhuabj.blog.csdn.net/article/details/105847301

Reference

[1] https://gist.github.com/niedbalski/69a72103adad4f0f9609a0857c9810a4

[2] https://pastebin.ubuntu.com/p/sJ94RmmS5x/

这篇关于RabbitMQ Deep Dive (by quqi99)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/387871

相关文章

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

springboot(集成篇):RabbitMQ集成详解

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将RocketMQ捐献给了apache,当然了今天的主角还是讲RabbitMQ。消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在

spring boot实战(番外篇)整合RabbitMQ

前言 最近几篇文章将围绕消息中间件RabbitMQ展开,对于RabbitMQ基本概念这里不阐述,主要讲解RabbitMQ的基本用法、Java客户端API介绍、spring Boot与RabbitMQ整合、 Spring Boot与RabbitMQ整合源码分析。   RabbitMQ安装   在使用消息中间件RabbitMQ之前就是安装RabbitMQ。   安装erlang:yum i

深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念

文章目录 文章导图RabbitMQ架构及相关概念四大核心概念名词解读 七大工作模式及四大交换机类型0、前置了解-默认交换机DirectExchange1、简单模式(Simple Queue)-默认DirectExchange2、 工作队列模式(Work Queues)-默认DirectExchange3、发布/订阅模式(Publish/Subscribe)-FanoutExchange4、路

Deep Ocr

1.圈出内容,文本那里要有内容.然后你保存,并'导出数据集'. 2.找出deep_ocr_recognition_training_workflow.hdev 文件.修改“DatasetFilename := 'Test.hdict'” 310行 write_deep_ocr (DeepOcrHandle, BestModelDeepOCRFilename) 3.推理test.hdev

浅谈RabbitMQ的基石—高级消息队列协议(AMQP)

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 大数据真好玩 点击右侧关注,大数据真好玩!     前言 自从去年做了不少流式系统(Flink也好,Spark Streaming也好)对接RabbitMQ的实时作业。之前一直都在Kafka的领域里摸爬滚打,对RabbitMQ只是有浅薄的了解而已。随着自己逐渐把R

SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列)

SpringBoot教程(十五) | SpringBoot集成RabbitMq(死信队列、延迟队列) (一)死信队列使用场景具体用法前提示例: (二)延迟队列使用场景方法一:通过死亡队列实现方法二:通过延迟消息插件(rabbitmq_delayed_message_exchange)实现 (一)死信队列 死信队列是一个重要的概念,用于处理那些因各种原因无法被正常消费的消息。 它

Introduction to Deep Learning with PyTorch

1、Introduction to PyTorch, a Deep Learning Library 1.1、Importing PyTorch and related packages import torch# supports:## image data with torchvision## audio data with torchaudio## text data with t