RabbitMQ二、RabbitMQ的六种模式

2024-06-01 14:44
文章标签 模式 rabbitmq 六种

本文主要是介绍RabbitMQ二、RabbitMQ的六种模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、RabbitMQ的六种模式

  1. RabbitMQ共有六种工作模式:
    • 简单模式(Simple)
    • 工作队列模式(Work Queue)
    • 发布订阅模式(Publish/Subscribe)
    • 路由模式(Routing)
    • 通配符模式(Topics)
    • 远程调用模式(RPC,不常用,不对此模式进行讲解)

1、RabbitMQ的简单模式

在这里插入图片描述

  1. rabbitmq简单模式的特点:
      1. 一个生产者对应一个消费者,通过队列进行消息传递。
      1. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

JMS

  1. 由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。
  2. JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。
  3. JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ等产品。
    • RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

java操作rabbitmq的简单模式

  1. 操作之前记得启动rabbitmq服务(rabbitmq的客户端不用开启也行)
docker start rabbitmq
  1. 注意:启动rabbitmq的web客户端是为了能访问它的客户端界面
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
第一步:创建项目(使用简单的maven项目即可)并添加RabbitMQ依赖

在这里插入图片描述

  • 依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.0</version></dependency>
</dependencies>
第二步:生产者和消费者代码的编写
  • 创建生产者(producer)代码
package com.knife.demo01.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130"); // 虚拟机ipconnectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建队列,如果队列已存在,则使用该队列/*** 参数1:队列名* 参数2:是否持久化,true表示MQ重启后队列还在。* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问* 参数4:是否自动删除,true表示不再使用队列时自动删除队列* 参数5:其他额外参数*/channel.queueDeclare("simple_queue",false,false,false,null);// 5.发送消息String message = "hello!rabbitmq!";/*** 参数1:交换机名,""表示默认交换机direct* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/channel.basicPublish("","simple_queue",null,message.getBytes());// 6.关闭信道和连接channel.close();connection.close();System.out.println("===发送成功===");}
}

运行生产者代码结果:
在这里插入图片描述
在这里插入图片描述

  • 创建消费者(consumer)代码
package com.knife.demo01.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130"); // 虚拟机ipconnectionFactory.setPort(5672); // 端口号connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.监听队列/*** 参数1:监听的队列名* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费*/channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                接收消息String message = new String(body, "UTF-8");System.out.println("接受消息,消息为:"+message);}});}
}

运行消费者代码结果:
在这里插入图片描述

2、RabbitMQ的工作队列模式(相比简单模式,处理消息的消费者增多了)

在这里插入图片描述

  1. 工作队列模式(Work Queue)与简单模式相比,多了一些消费者,该模式也使用direct交换机(默认使用的交换机),应用于处理消息较多的情况。特点如下:

      1. 一个队列对应多个消费者。
      1. 一条消息只会被一个消费者消费。
      1. 消息队列默认采用轮询的方式将消息平均发送给消费者。
      1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
  2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可
    在这里插入图片描述

java操作rabbitmq的工作队列模式

  1. Work Queues 的入门程序与上面简单模式的代码几乎是一样的。可以完全复制,只是比简单模式的消费者多几个而已,可以让多个消费者同时对消费消息的测试。
  2. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
  • 消息生产者代码:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
//        创建工厂连接ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130"); //虚拟机地址cf.setPort(5672); //rabbitmq的端口号cf.setUsername("admin"); // 用户名cf.setPassword("admin"); // 密码cf.setVirtualHost("/"); //        创建连接Connection connection = cf.newConnection();//        创建信道Channel channel = connection.createChannel();//        创建队列/*** 参数1:队列名称* 参数二:是否持久化* 参数三:是否私有化* 参数4:队列使用完毕后是否自动删除*/channel.queueDeclare("work_queue",true,false,false,null);//        发送消息/*** 参数1:交换机名,""表示默认交换机direct* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/for (int i = 1; i <= 10; i++) {channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,("你好,这是今天的第"+i+"条消息").getBytes());}//        关闭资源channel.close();connection.close();System.out.println("消息发送成功====");}
}
  • 消息消费者代码(多个消费者可以代码复用,复用下面的代码,多创几个类)
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//        4. 接收消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("消费者1 = " + msg);}});channel.close();conn.close();}}

代码运行的效果和上面简单模式的类似。

3、RabbitMQ的发布订阅模式(相比工作队列模式,交换机使用的类型是Fanout(广播类型))

在这里插入图片描述

  • 图解说明
    • P:生产者,向 Exchange 发送消息
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与交换机绑定的队列
    • C1、C2:消费者,其所在队列要与交换机进行绑定
  1. 在开发过程中,有一些消息需要不同的消费者进行不同的处理
    • 如电商网站的同一条促销信息需要短信发送邮件发送站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
  2. 在订阅模型中,多了一个Exchange角色,而且过程略有变化:
    • Producer:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Consumer:消费者,消息的接收者,会一直等待消息到来
    • Queue:消息队列,接收消息、缓存消息
    • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
      • Fanout:广播类型,将消息交给所有绑定到交换机的队列
      • Direct:定向类型,把消息交给符合指定routing key 的队列
      • Topic:通配符类型,把消息交给符合routing pattern(路由模式) 的队列,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
  3. 特点:
      1. 生产者(Producer)将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中(即可以转发到多个队列中)。
      1. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机

java操作rabbitmq的发布订阅模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是使用到的交换机不一样了而已。
  • Producer
public class Producer {// 生产者public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/
//        BuiltinExchangeType.FANOUT:表示广播模式channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT, true);// 5.创建队列
//        队列1channel.queueDeclare("SEND_MAIL", true, false, false, null);
//        队列2channel.queueDeclare("SEND_MESSAGE", true, false, false, null);
//        队列3channel.queueDeclare("SEND_STATION", true, false, false, null);// 6.交换机绑定队列/*** 参数1:队列的名称* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL", "exchange_fanout", "");channel.queueBind("SEND_MESSAGE", "exchange_fanout", "");channel.queueBind("SEND_STATION", "exchange_fanout", "");// 7.发送消息channel.basicPublish("exchange_fanout", "", null,("618商品开抢了!").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }// 8.关闭资源channel.close();connection.close();}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

4、RabbitMQ的路由模式(相比发布订阅模式,多了个Route Key)

  1. 使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
    在这里插入图片描述
  • 图解说明
    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
  1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey,Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

java操作rabbitmq的路由模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述

其实代码还是差不多的,只是使用到的交换机不一样了,多了一个route Key。

  • Producer
public class Producer {// 生产者public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/
//      BuiltinExchangeType.DIRECTchannel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT, true);// 5.创建队列
//        队列1channel.queueDeclare("SEND_MAILRoute", true, false, false, null);
//        队列2channel.queueDeclare("SEND_MESSAGERoute", true, false, false, null);
//        队列3channel.queueDeclare("SEND_STATIONRoute", true, false, false, null);// 6.交换机绑定队列/*** 参数1:队列的名称* 参数2:交换机名* 参数3:路由关键字(路由名称)*/channel.queueBind("SEND_MAILRoute", "routing_exchange", "import");channel.queueBind("SEND_MESSAGERoute", "routing_exchange", "import");channel.queueBind("SEND_STATIONRoute", "routing_exchange", "import");channel.queueBind("SEND_STATIONRoute", "routing_exchange", "normal");// 7.发送消息channel.basicPublish("routing_exchange", "import", null,("618商品开抢了!").getBytes(StandardCharsets.UTF_8));channel.basicPublish("routing_exchange", "normal", null,("normal路由的息").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }// 8.关闭资源channel.close();connection.close();}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAILRoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGERoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATIONRoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

5、RabbitMQ的通配符模式(相比路由模式,路由的组成中带上了通配符)

在这里插入图片描述

  1. 通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机
  2. 通配符规则:
      1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
      1. 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
        在这里插入图片描述
  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

java操作rabbitmq的通配符模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是route Key的组成多了通配符。
  • Producer
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC, true);/*** 参数1:队列名* 参数2:是否持久化,true表示MQ重启后队列还在。* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问* 参数4:是否自动删除,true表示不再使用队列时自动删除队列* 参数5:其他额外参数*///5.创建队列(举例订单给手机发送,邮箱,站内)发送消息,3个队列channel.queueDeclare("SEND_MAIL3", true, false, false, null);channel.queueDeclare("SEND_MESSAGE3", true, false, false, null);channel.queueDeclare("SEND_STATION3", true, false, false, null);//6.将队列和交换机绑定/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL3", "exchange_topic", "#.mail.#");channel.queueBind("SEND_MESSAGE3", "exchange_topic", "#.message.#");channel.queueBind("SEND_STATION3", "exchange_topic", "#.station.#");//8.发送消息channel.basicPublish("exchange_topic", "mail.message.station", null, "618大促销活动".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_topic", "station", null, "618小促销活动".getBytes(StandardCharsets.UTF_8));//9.关闭资源channel.close();conn.close();System.out.println("发送消息成功");}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAIL3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGE3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATION3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

RabbitMQ一、RabbitMQ的介绍与安装(docker)
RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)

这篇关于RabbitMQ二、RabbitMQ的六种模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1021299

相关文章

Python中lambda排序的六种方法

《Python中lambda排序的六种方法》本文主要介绍了Python中使用lambda函数进行排序的六种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录1.对单个变量进行排序2. 对多个变量进行排序3. 降序排列4. 单独降序1.对单个变量进行排序

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

Java解析JSON的六种方案

《Java解析JSON的六种方案》这篇文章介绍了6种JSON解析方案,包括Jackson、Gson、FastJSON、JsonPath、、手动解析,分别阐述了它们的功能特点、代码示例、高级功能、优缺点... 目录前言1. 使用 Jackson:业界标配功能特点代码示例高级功能优缺点2. 使用 Gson:轻量

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

浅谈主机加固,六种有效的主机加固方法

在数字化时代,数据的价值不言而喻,但随之而来的安全威胁也日益严峻。从勒索病毒到内部泄露,企业的数据安全面临着前所未有的挑战。为了应对这些挑战,一种全新的主机加固解决方案应运而生。 MCK主机加固解决方案,采用先进的安全容器中间件技术,构建起一套内核级的纵深立体防护体系。这一体系突破了传统安全防护的局限,即使在管理员权限被恶意利用的情况下,也能确保服务器的安全稳定运行。 普适主机加固措施:

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法

【iOS】MVC模式

MVC模式 MVC模式MVC模式demo MVC模式 MVC模式全称为model(模型)view(视图)controller(控制器),他分为三个不同的层分别负责不同的职责。 View:该层用于存放视图,该层中我们可以对页面及控件进行布局。Model:模型一般都拥有很好的可复用性,在该层中,我们可以统一管理一些数据。Controlller:该层充当一个CPU的功能,即该应用程序

迭代器模式iterator

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/iterator 不暴露集合底层表现形式 (列表、 栈和树等) 的情况下遍历集合中所有的元素