RabbitMQ二、RabbitMQ的六种模式

2024-06-01 14:44
文章标签 模式 rabbitmq 六种

本文主要是介绍RabbitMQ二、RabbitMQ的六种模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、RabbitMQ的六种模式

  1. RabbitMQ共有六种工作模式:
    • 简单模式(Simple)
    • 工作队列模式(Work Queue)
    • 发布订阅模式(Publish/Subscribe)
    • 路由模式(Routing)
    • 通配符模式(Topics)
    • 远程调用模式(RPC,不常用,不对此模式进行讲解)

1、RabbitMQ的简单模式

在这里插入图片描述

  1. rabbitmq简单模式的特点:
      1. 一个生产者对应一个消费者,通过队列进行消息传递。
      1. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机

JMS

  1. 由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。
  2. JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。
  3. JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ等产品。
    • RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

java操作rabbitmq的简单模式

  1. 操作之前记得启动rabbitmq服务(rabbitmq的客户端不用开启也行)
docker start rabbitmq
  1. 注意:启动rabbitmq的web客户端是为了能访问它的客户端界面
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
第一步:创建项目(使用简单的maven项目即可)并添加RabbitMQ依赖

在这里插入图片描述

  • 依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.0</version></dependency>
</dependencies>
第二步:生产者和消费者代码的编写
  • 创建生产者(producer)代码
package com.knife.demo01.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 生产者
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130"); // 虚拟机ipconnectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建队列,如果队列已存在,则使用该队列/*** 参数1:队列名* 参数2:是否持久化,true表示MQ重启后队列还在。* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问* 参数4:是否自动删除,true表示不再使用队列时自动删除队列* 参数5:其他额外参数*/channel.queueDeclare("simple_queue",false,false,false,null);// 5.发送消息String message = "hello!rabbitmq!";/*** 参数1:交换机名,""表示默认交换机direct* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/channel.basicPublish("","simple_queue",null,message.getBytes());// 6.关闭信道和连接channel.close();connection.close();System.out.println("===发送成功===");}
}

运行生产者代码结果:
在这里插入图片描述
在这里插入图片描述

  • 创建消费者(consumer)代码
package com.knife.demo01.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;// 消费者
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130"); // 虚拟机ipconnectionFactory.setPort(5672); // 端口号connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.监听队列/*** 参数1:监听的队列名* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费*/channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                接收消息String message = new String(body, "UTF-8");System.out.println("接受消息,消息为:"+message);}});}
}

运行消费者代码结果:
在这里插入图片描述

2、RabbitMQ的工作队列模式(相比简单模式,处理消息的消费者增多了)

在这里插入图片描述

  1. 工作队列模式(Work Queue)与简单模式相比,多了一些消费者,该模式也使用direct交换机(默认使用的交换机),应用于处理消息较多的情况。特点如下:

      1. 一个队列对应多个消费者。
      1. 一条消息只会被一个消费者消费。
      1. 消息队列默认采用轮询的方式将消息平均发送给消费者。
      1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系
  2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可
    在这里插入图片描述

java操作rabbitmq的工作队列模式

  1. Work Queues 的入门程序与上面简单模式的代码几乎是一样的。可以完全复制,只是比简单模式的消费者多几个而已,可以让多个消费者同时对消费消息的测试。
  2. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
  • 消息生产者代码:
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {
//        创建工厂连接ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130"); //虚拟机地址cf.setPort(5672); //rabbitmq的端口号cf.setUsername("admin"); // 用户名cf.setPassword("admin"); // 密码cf.setVirtualHost("/"); //        创建连接Connection connection = cf.newConnection();//        创建信道Channel channel = connection.createChannel();//        创建队列/*** 参数1:队列名称* 参数二:是否持久化* 参数三:是否私有化* 参数4:队列使用完毕后是否自动删除*/channel.queueDeclare("work_queue",true,false,false,null);//        发送消息/*** 参数1:交换机名,""表示默认交换机direct* 参数2:路由键,简单模式就是队列名* 参数3:其他额外参数* 参数4:要传递的消息字节数组*/for (int i = 1; i <= 10; i++) {channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,("你好,这是今天的第"+i+"条消息").getBytes());}//        关闭资源channel.close();connection.close();System.out.println("消息发送成功====");}
}
  • 消息消费者代码(多个消费者可以代码复用,复用下面的代码,多创几个类)
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//        4. 接收消息channel.basicConsume("work_queue",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("消费者1 = " + msg);}});channel.close();conn.close();}}

代码运行的效果和上面简单模式的类似。

3、RabbitMQ的发布订阅模式(相比工作队列模式,交换机使用的类型是Fanout(广播类型))

在这里插入图片描述

  • 图解说明
    • P:生产者,向 Exchange 发送消息
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与交换机绑定的队列
    • C1、C2:消费者,其所在队列要与交换机进行绑定
  1. 在开发过程中,有一些消息需要不同的消费者进行不同的处理
    • 如电商网站的同一条促销信息需要短信发送邮件发送站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
  2. 在订阅模型中,多了一个Exchange角色,而且过程略有变化:
    • Producer:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Consumer:消费者,消息的接收者,会一直等待消息到来
    • Queue:消息队列,接收消息、缓存消息
    • Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
      • Fanout:广播类型,将消息交给所有绑定到交换机的队列
      • Direct:定向类型,把消息交给符合指定routing key 的队列
      • Topic:通配符类型,把消息交给符合routing pattern(路由模式) 的队列,Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
  3. 特点:
      1. 生产者(Producer)将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中(即可以转发到多个队列中)。
      1. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机

java操作rabbitmq的发布订阅模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是使用到的交换机不一样了而已。
  • Producer
public class Producer {// 生产者public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/
//        BuiltinExchangeType.FANOUT:表示广播模式channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT, true);// 5.创建队列
//        队列1channel.queueDeclare("SEND_MAIL", true, false, false, null);
//        队列2channel.queueDeclare("SEND_MESSAGE", true, false, false, null);
//        队列3channel.queueDeclare("SEND_STATION", true, false, false, null);// 6.交换机绑定队列/*** 参数1:队列的名称* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL", "exchange_fanout", "");channel.queueBind("SEND_MESSAGE", "exchange_fanout", "");channel.queueBind("SEND_STATION", "exchange_fanout", "");// 7.发送消息channel.basicPublish("exchange_fanout", "", null,("618商品开抢了!").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }// 8.关闭资源channel.close();connection.close();}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

4、RabbitMQ的路由模式(相比发布订阅模式,多了个Route Key)

  1. 使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。
    在这里插入图片描述
  • 图解说明
    • P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
    • C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
    • C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
  1. 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey,Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息。

java操作rabbitmq的路由模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述

其实代码还是差不多的,只是使用到的交换机不一样了,多了一个route Key。

  • Producer
public class Producer {// 生产者public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.70.130");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");// 2.创建连接Connection connection = connectionFactory.newConnection();// 3.建立信道Channel channel = connection.createChannel();// 4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/
//      BuiltinExchangeType.DIRECTchannel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT, true);// 5.创建队列
//        队列1channel.queueDeclare("SEND_MAILRoute", true, false, false, null);
//        队列2channel.queueDeclare("SEND_MESSAGERoute", true, false, false, null);
//        队列3channel.queueDeclare("SEND_STATIONRoute", true, false, false, null);// 6.交换机绑定队列/*** 参数1:队列的名称* 参数2:交换机名* 参数3:路由关键字(路由名称)*/channel.queueBind("SEND_MAILRoute", "routing_exchange", "import");channel.queueBind("SEND_MESSAGERoute", "routing_exchange", "import");channel.queueBind("SEND_STATIONRoute", "routing_exchange", "import");channel.queueBind("SEND_STATIONRoute", "routing_exchange", "normal");// 7.发送消息channel.basicPublish("routing_exchange", "import", null,("618商品开抢了!").getBytes(StandardCharsets.UTF_8));channel.basicPublish("routing_exchange", "normal", null,("normal路由的息").getBytes(StandardCharsets.UTF_8));
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("exchange_fanout", "", null,
//                    ("你好,尊敬的用户,秒杀商品开抢了!" + i).getBytes(StandardCharsets.UTF_8));
//        }// 8.关闭资源channel.close();connection.close();}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAILRoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGERoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.70.130");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATIONRoute",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

5、RabbitMQ的通配符模式(相比路由模式,路由的组成中带上了通配符)

在这里插入图片描述

  1. 通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机
  2. 通配符规则:
      1. 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以.分割。
      1. 队列设置RoutingKey时,#可以匹配任意多个单词,*可以匹配任意一个单词。
        在这里插入图片描述
  • 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
  • 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

java操作rabbitmq的通配符模式

  1. 在简单模式的基础下编写代码(直接用同一个项目)
    在这里插入图片描述
    其实代码还是差不多的,只是route Key的组成多了通配符。
  • Producer
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.创建交换机/*** 参数1:交换机名* 参数2:交换机类型* 参数3:交换机持久化*/channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC, true);/*** 参数1:队列名* 参数2:是否持久化,true表示MQ重启后队列还在。* 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问* 参数4:是否自动删除,true表示不再使用队列时自动删除队列* 参数5:其他额外参数*///5.创建队列(举例订单给手机发送,邮箱,站内)发送消息,3个队列channel.queueDeclare("SEND_MAIL3", true, false, false, null);channel.queueDeclare("SEND_MESSAGE3", true, false, false, null);channel.queueDeclare("SEND_STATION3", true, false, false, null);//6.将队列和交换机绑定/*** 参数1:队列名* 参数2:交换机名* 参数3:路由关键字,发布订阅模式写""即可*/channel.queueBind("SEND_MAIL3", "exchange_topic", "#.mail.#");channel.queueBind("SEND_MESSAGE3", "exchange_topic", "#.message.#");channel.queueBind("SEND_STATION3", "exchange_topic", "#.station.#");//8.发送消息channel.basicPublish("exchange_topic", "mail.message.station", null, "618大促销活动".getBytes(StandardCharsets.UTF_8));channel.basicPublish("exchange_topic", "station", null, "618小促销活动".getBytes(StandardCharsets.UTF_8));//9.关闭资源channel.close();conn.close();System.out.println("发送消息成功");}
}
  • ConsumerMail(消费者代码都类似)
public class ConsumerMail {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MAIL3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送邮件消息 = " + msg);}});}
}
  • ConsumerMesage
public class ConsumerMessage {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_MESSAGE3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送短信消息 = " + msg);}});}
}
  • ConsumerStation
public class ConsumerStation {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工程ConnectionFactory cf = new ConnectionFactory();cf.setHost("192.168.126.10");cf.setPort(5672);cf.setUsername("admin");cf.setPassword("admin");cf.setVirtualHost("/");//2.创建连接Connection conn = cf.newConnection();//3.创建信道Channel channel = conn.createChannel();//4.监听队列channel.basicConsume("SEND_STATION3",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, StandardCharsets.UTF_8);System.out.println("发送站內信 = " + msg);}});}
}

RabbitMQ一、RabbitMQ的介绍与安装(docker)
RabbitMQ三、springboot整合rabbitmq(消息可靠性、高级特性)

这篇关于RabbitMQ二、RabbitMQ的六种模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1021299

相关文章

Vue中组件之间传值的六种方式(完整版)

《Vue中组件之间传值的六种方式(完整版)》组件是vue.js最强大的功能之一,而组件实例的作用域是相互独立的,这就意味着不同组件之间的数据无法相互引用,针对不同的使用场景,如何选择行之有效的通信方式... 目录前言方法一、props/$emit1.父组件向子组件传值2.子组件向父组件传值(通过事件形式)方

SpringBoot如何通过Map实现策略模式

《SpringBoot如何通过Map实现策略模式》策略模式是一种行为设计模式,它允许在运行时选择算法的行为,在Spring框架中,我们可以利用@Resource注解和Map集合来优雅地实现策略模式,这... 目录前言底层机制解析Spring的集合类型自动装配@Resource注解的行为实现原理使用直接使用M

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

大数据spark3.5安装部署之local模式详解

《大数据spark3.5安装部署之local模式详解》本文介绍了如何在本地模式下安装和配置Spark,并展示了如何使用SparkShell进行基本的数据处理操作,同时,还介绍了如何通过Spark-su... 目录下载上传解压配置jdk解压配置环境变量启动查看交互操作命令行提交应用spark,一个数据处理框架

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

nginx upstream六种方式分配小结

《nginxupstream六种方式分配小结》本文主要介绍了nginxupstream六种方式分配小结,包括轮询、加权轮询、IP哈希、公平轮询、URL哈希和备份服务器,具有一定的参考价格,感兴趣的可... 目录1 轮询(默认)2 weight3 ip_hash4 fair(第三方)5 url_hash(第三

SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程

《SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程》本文详细介绍了如何在虚拟机和宝塔面板中安装RabbitMQ,并使用Java代码实现消息的发送和接收,通过异步通讯,可以优化... 目录一、RabbitMQ安装二、启动RabbitMQ三、javascript编写Java代码1、引入

Java实现状态模式的示例代码

《Java实现状态模式的示例代码》状态模式是一种行为型设计模式,允许对象根据其内部状态改变行为,本文主要介绍了Java实现状态模式的示例代码,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来... 目录一、简介1、定义2、状态模式的结构二、Java实现案例1、电灯开关状态案例2、番茄工作法状态案例

微服务架构之使用RabbitMQ进行异步处理方式

《微服务架构之使用RabbitMQ进行异步处理方式》本文介绍了RabbitMQ的基本概念、异步调用处理逻辑、RabbitMQ的基本使用方法以及在SpringBoot项目中使用RabbitMQ解决高并发... 目录一.什么是RabbitMQ?二.异步调用处理逻辑:三.RabbitMQ的基本使用1.安装2.架构

Python中lambda排序的六种方法

《Python中lambda排序的六种方法》本文主要介绍了Python中使用lambda函数进行排序的六种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录1.对单个变量进行排序2. 对多个变量进行排序3. 降序排列4. 单独降序1.对单个变量进行排序