RabbitMQ (消息队列)专题学习04 Publish/Subscribe(发布者/订阅者)

本文主要是介绍RabbitMQ (消息队列)专题学习04 Publish/Subscribe(发布者/订阅者),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

(使用Java客户端)

一、概述

在前面的专题学习中,我们创建了一个工作队列,在工作队列中假如每个任务交给一个确定的工作者,不管是生产者还是消费者都必须知道一个指定的队列名称才能发送和接收消息,而RabbitMQ消息模型的核心思想就是生产者不会将消息直接发送给队列。

因为生产者通常不会知道消息将会被哪些消费者接收,生产者的消息虽然不是直接发送给queue(队列),但是消息会交给exchange(交换机),所以需要定义exchange的消息分发模式来实现消息的分发,这便是这部分专题学习中我们将要学习的发布者/订阅者模式,这样实现了消息生产者和消息消费者之间的解耦。

在前面的专题学习中实现简单消息传递和工作队列中有如下一行代码:

  channel.basicPublish("", queueName, null, msg.getBytes());
在上述代码中第一个是空字符串其实就是exchangeName,这里用了空字符串,就表示消息会交给默认的exchange。

为了说明这种消息分发模型,我们将构建一个简单的日志记录系统,它包括两个程序--第一个程序用来发送日志消息,第二个程序用来接收打印这些日志消息。

在日志记录系统运行的每个接收者都将接收到消息,这样我们可以运行一个接收者将消息输出到控制台。

总的原则:发送的日志消息将被广播到所有的接收者。

二、日志消息系统的实现

2.1、exchange(交换机)

之前发送和接收消息都是通过一个队列来实现,现在是时候介绍下一个完整的RabbitMQ的消息传递模型了。

首先来对之前学习的消息传递加深一下映象

>一个生产者是一个用于发送消息的应用

>一个队列是存储消息的缓冲区

>一个消费者是一个接收消息的应用。

在前面已经提到了RabbitMQ的核心思想是:生产者从来不需要直接发送任何消息到队列中,实际上通常生产者甚至不知道消息江北发送到任何一个队列中。

相反,生产者只能发送消息到一个交换组件中(exchange),exchange是一个很简单的东西,一方面它接收来自生产者的消息,另外一方面它将把来自生产者的消息放入到队列中,exchange必须知道怎么接收一个消息,而且接收的消息应该被添加到一个指定的队列?还是多个队列中,或者接收的消息被丢弃,这个规则被exchange所定义,它的结构如下:

图-1

exchange有如下几种定义类型:direct、topic、headers、fanout,每种类型都自己的实现方式和消息分发机制,在此我们将重点放在最后一种类型:fanout,首先创建一个这种类型的交换。

channel.exchangeDeclare("logs", "fanout");

基于fanout的exchange是非常简单的,正如它的名字一样,我们能猜到它的具体实现,它不仅仅广播各种来着生产者的消息到它所知道的所有队列中,这正是日志记录系统的所需要的。

2.2、交换列表(exchange list)

为了列出服务器中所有的exchanges(交换机),我们通过运行rabbitmqctl来实现,在列出的列表中有一些amp.*changes和没有定义名称的exchange(默认),这些是被服务器默认创建的,但是这些当我们需要使用的时候是不可用的。

在之前不知道关于exchange的任何东西,但是它仍然能够发送消息到队列,这可能因为是使用了默认的exchange,因为我们定义一个空的串("")。

之前发布的消息:

channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是exchange的名称,空的字符串表示默认或者是无名的exchange,消息被路由到指定的routingKey名称的队名,加入它存在的话。

2.3、临时队列(Temporary queues)

在之前我们使用的队列都是被定义过特殊的名称(hello和task_queue),对于RabbitMQ来说命名一个队列是至关重要的,当你想在生产者和消费者中分享队列的时候,给一个队列的名称是必须的。

但是那些都不是日志记录系统所需要的,我们希望能够获得所有的日志信息,而不只是其中的一部分,而且我们只对当前正在传递的信息感兴趣,对旧的日志信息不感兴趣,要解决这些问题,我们需要分两个步骤:

首先当我们链接到RabbitMQ服务器的时候,需要一个新的、空的队列,为了做到这点,可以创建一个随机名的队列,或者更好的方法就是让服务器选择一个随机的队列名。

其次,当断开与队列的连接时,消费者应该被自动删除掉。

在Java客户端,我们通过一个无参数的queueDeclare()方法为我们创建一个非持久的、唯一的、能自动删除的队列与队列名称

String queueName = channel.queueDeclare().getQueue();
在这一点上queueName包含一个随机队列名称,比如它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg. 的随机串

2.4、绑定(bindings)

图-2

我们已经创建了一个fanout exchange和一个队列,现在我们需要告诉exchange去发送消息到队列中,exchange和队列之间的关系被称为一个绑定(binding)。

channel.queueBind(queueName, "logs", "");
从现在开始我们从logs exchange将被添加消息到队列中,使用rabbitmqctl list_bingdins能列出所有的绑定。

2.5、发布者/订阅者实现(putting it all together)

图-3

生产者代码和之前的发送消息的代码并没有太大的区别,最重要的变化是,我们现在要将发布的消息传递给logs exchange来代替无名的exchange(之前的是""),在发送消息时需要提供一个routingKey,它对于fanout exchange是非常重要的,不能被忽视的,这里的EmitLog.java代码如下:

发送

package com.xuz.ps;import java.io.IOException;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");Connection conn = factory.newConnection();Channel channel = conn.createChannel();/**exchange类型* direct(直接)、topic(主题)、headers(标题)和fanout*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = getMessage(args);channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("Sent["+message+"]");channel.close();conn.close();}private static String getMessage(String[] strings) {if(strings.length<1){return "info:Hello World!";}return joinStrings(strings,"");}private static String joinStrings(String[] strings, String string) {int len = strings.length;if(len == 0)return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 0; i < len; i++) {words.append(string).append(strings[i]);}return words.toString();}
}

接收

package com.xuz.ps;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");Connection conn = factory.newConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//获取队列名称String queueName = channel.queueDeclare().getQueue();//绑定队列与exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println("ReceiveLogs wait for message .TO exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true,consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println("Received [" + message + "]");  }}
}

2.6、测试发布者/订阅者

操作步骤:

1、运行多个ReceiveLogs,分别记为01、02、03、04,首先执行前三个接收者,如下图所示

     

-4

图-5

图-6

2、运行EmitLog.java,此时可以看到上述三个接收者都能接收消息

图-7

3、执行ReceiveLogs04,此时它没有收到消息。

图-8

4、再次执行EmitLog.java,此时可以看到所有的接收者都接收到了消息。

图-9

说明exchange在接收到生产者的消息后,会将消息发送到当前已经与它绑定了的所有的queue中,在接收者完消息之后,RabbitMQ将队列中的消息移除。

源码下载

基于RabbitMQ消息队列的发布者订阅者消息分发模型


这篇关于RabbitMQ (消息队列)专题学习04 Publish/Subscribe(发布者/订阅者)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/757342

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

取得 Git 仓库 —— Git 学习笔记 04

取得 Git 仓库 —— Git 学习笔记 04 我认为, Git 的学习分为两大块:一是工作区、索引、本地版本库之间的交互;二是本地版本库和远程版本库之间的交互。第一块是基础,第二块是难点。 下面,我们就围绕着第一部分内容来学习,先不考虑远程仓库,只考虑本地仓库。 怎样取得项目的 Git 仓库? 有两种取得 Git 项目仓库的方法。第一种是在本地创建一个新的仓库,第二种是把其他地方的某个

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等