RabbitMQ核心概念及AMQP协议

2023-12-16 18:32

本文主要是介绍RabbitMQ核心概念及AMQP协议,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RabbitMQ核心概念及AMQP协议

前言

版本说明

rabbitmq=3.8.4

相关链接

  • RabbitMQ 官网文档:https://www.rabbitmq.com/getstarted.html
  • SpringBoot 整合 RabbitMQ maven 官网地址:https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

AMQP

AMQP(Advanced Message Queuing Protocol) :高级消息队列协议,是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

现代特征特点:多信道、协商式、异步、安全、可扩展、中立、高效。

AMQP 协议模型

在这里插入图片描述

AMQP 核心概念

  • Server :又称 Broker ,接收客户端的连接,实现 AMQP 实体服务;
  • Connection :连接,应用程序与 Broker 的网络连接;
  • Channel :网络信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道。客户端可以建立多个 Channel ,每个 Channel 代表一个会话任务;
  • Message :消息,服务器和应用程序之间传送的数据,由 PropertiesBody 组成。Properties 可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body 则就是消息体的内容;
  • Virtual Host :虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个 Virtual Host 里面可以有若干个 ExchangeQueue ,同一个 Virtual Host 里面不能有相同名称的 Exchange 或者 Queue
  • Exchange :交换机,接收消息,根据路由键转发消息到绑定的队列中;
  • BindingExchangeQueue 之间的虚拟连接,Binding 中可以包含 routing key
  • Routing Key :一个路由规则,虚拟机可以用它来确定如何路由一个特定消息;
  • Queue :也称为 Message Queue ,消息队列,保存消息并将它们转发给消费者;

RabbitMQ

RabbitMQ 优势

  • SpringAMQP 完美的整合,API 丰富;
  • 集群模式丰富,表达式配置,HA 模式,镜像队列模型;
  • 保证数据不丢失的前提做到高可靠性、高可用性;

RabbitMQ 高性能原因

  • Erlang 语言最初在与交换机领域的架构模式,这样使得 RabbitMQBroker 之间进行数据交互的性能是非常优秀的;
  • Erlang 语言有着和原生 Socket 一样的延迟;

RabbitMQ 整体架构

在这里插入图片描述

RabbitMQ 消息流转

在这里插入图片描述

RabbitMQ 角色说明

  • 超级管理员(Administrator):可登录管理控制台,并且可以对用户,策略进行操作;
  • 监控者(Monitoring):可登录管理控制台,同时可以查看节点的相关信息(进程数、内存使用情况、磁盘使用情况等);
  • 策略制定者(Policymaker):可登录管理控制台,同时可以对策略进行管理,但无法查看节点相关信息;
  • 普通管理员(Management):仅可以登录管理控制台,无法看到节点相关信息,也无法对策略进行管理;
  • 其他:无法登录管理控制台,通常就是普通的生产者和消费者;

命令行与管控台操作

基础操作

# 关闭应用
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 节点状态
rabbitmqctl status# 添加用户
rabbitmqctl add_user username password
# 列出所有用户
rabbitmqctl list_users
# 删除用户
rabbitmqctl delete_user username
# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username
# 列出用户权限
rabbitmqctl list_user_permissions username
# 修改密码
rabbitmqctl change_password username password
# 设置用户权限
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*" # 创建虚拟主机
rabbitmqctl add_vhost vhostpath
# 列出所有的虚拟主机
rabbitmqctl list_vhosts
# 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath
# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath# 查看所有队列信息
rabbitmqctl list_queues
# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue

高级操作

# 移除所有数据,要在 rabbitmqctl stop_app 之后使用 
rabbitmqctl reset
# 组成集群命令
rabbitmqctl join_cluster <clusternode> 
# 查看集群状态
rabbitmqctl cluster_status
# 修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc|ram
# 忘记节点(摘除节点)
rabbitmqctl forget_cluster_node [--offline]
# 修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 oldnode2 newnode2...

RabbitMQ 工作模式

  • P:生产者
  • C:消费者
  • Queue:红色部分
  • X:exchange 交换机,深蓝色部分。交换机一方面,接收生产者发送的消息;另一方面,知道如何处理消息,例如如何递交给某个特定的队列、递交给所有的队列、或者是将消息丢弃。Exchange只负责转发消息,不具备存储消息的能力; Exchange 常见类型:
    1. Fanout :广播,将消息交给所有绑定到交换机的队列;
    2. Direct:定向,把消息交给符合指定 routing key 规则的队列;
    3. Topic:通配符,把消息交给符合 routing pattern (路由模式)的队列;
    4. headers:首部交换机

SimpleQueue 简单模式

原理

The simplest thing that does something

在这里插入图片描述

生产者代码
package top.simba1949.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:52*/
public class Producer {public static final String SIMPLE_QUEUE_NAME = "simple_queue";public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.8.9");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();// 3.通过 connection 创建 channelChannel channel = connection.createChannel();// 4.声明队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(SIMPLE_QUEUE_NAME, true, false, false, null);// 发送消息String msg = "Hello RabbitMQ";// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,如果是简单模式,可以直接传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish("", SIMPLE_QUEUE_NAME, null, msg.getBytes());// 释放资源channel.close();connection.close();}
}
消费者代码
package top.simba1949.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.8.9");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();// 3.通过 connection 创建 channelChannel channel = connection.createChannel();// 4.声明一个队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(Producer.SIMPLE_QUEUE_NAME, true, false, false, null);// 5.监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));System.out.println("-----------------");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.SIMPLE_QUEUE_NAME, true, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}

WorkQueues 工作队列模式

原理

Distributing tasks among workers (the competing consumers pattern)

在这里插入图片描述

工具类
package top.simba1949;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/16 16:20*/
public class RabbitMQUtil {/*** 获取连接* @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.8.9");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();return connection;}/*** 关闭连接* @param channel* @param connection* @throws IOException* @throws TimeoutException*/public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {channel.close();connection.close();}
}
生产者代码
package top.simba1949.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:52*/
public class Producer {public static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtil.getConnection();// 通过 connection 创建 channelChannel channel = connection.createChannel();// 声明队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 发送消息for (int i = 1; i <= 30; i++){String msg = "你好:小兔子! work 模式————" + i;// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,简单模式可以传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}// 释放资源RabbitMQUtil.close(channel, connection);}
}
消费者1代码
package top.simba1949.work;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明一个队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);// 监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============消费者1 start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("============消费者1 end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.QUEUE_NAME, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}
消费者2代码
package top.simba1949.work;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明一个队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);// 监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============消费者2 start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("============消费者2 end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.QUEUE_NAME, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}

Publish/Subscribe

原理

Sending messages to many consumers at once

在这里插入图片描述

工具类代码
package top.simba1949;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/16 16:20*/
public class RabbitMQUtil {/*** 获取连接* @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.8.9");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();return connection;}/*** 关闭连接* @param channel* @param connection* @throws IOException* @throws TimeoutException*/public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {channel.close();connection.close();}
}
生产者代码
package top.simba1949.publish.subscribe;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:52*/
public class Producer {public static final String FANOUT_EXCHANGE = "fanout_exchange";public static final String FANOUT_QUEUE_1 = "fanout_queue_1";public static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtil.getConnection();// 通过 connection 创建 channelChannel channel = connection.createChannel();// 声明交换机// 参数一:交换机名称// 参数二:交换机类型: DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);// 声明队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);// 队列绑定交换机// 参数一:队列名称// 参数二:交换机名称// 参数三:路由keychannel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, "");// 发送消息for (int i = 1; i <= 30; i++){String msg = "你好:小兔子! Publish/Subscribe模式(" + i;// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,如果是简单模式,可以直接传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());}// 释放资源RabbitMQUtil.close(channel, connection);}
}
消费者1订阅队列1代码
package top.simba1949.publish.subscribe;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明一个队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);// 监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============Publish/Subscribe 消费者1 start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("============Publish/Subscribe 消费者1 end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.FANOUT_QUEUE_1, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}
消费者2订阅队列2代码
package top.simba1949.publish.subscribe;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 4.声明一个队列channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);// 5.监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============Publish/Subscribe 消费者2 start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("============ Publish/Subscribe消费者2 end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.FANOUT_QUEUE_2, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}

Routing

原理

Receiving messages selectively

在这里插入图片描述

路由模式特点

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 Routing Key
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 Routing Key
  • Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routing Key 与消息 Routing Key 完全一致,才会接收到消息;
工具类代码
package top.simba1949;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/16 16:20*/
public class RabbitMQUtil {/*** 获取连接* @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.8.9");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();return connection;}/*** 关闭连接* @param channel* @param connection* @throws IOException* @throws TimeoutException*/public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {channel.close();connection.close();}
}
生产者代码
package top.simba1949.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 路由模式的交换机类型为:direct* @author Theodore* @date 2020/6/15 17:52*/
public class Producer {public static final String DIRECT_EXCHANGE = "direct_exchange";public static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";public static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";public static final String ROUTING_KEY_INSERT = "insert";public static final String ROUTING_KEY_UPDATE = "update";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtil.getConnection();// 通过 connection 创建 channelChannel channel = connection.createChannel();// 声明交换机// 参数一:交换机名称// 参数二:交换机类型: DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);// 声明队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);// 队列绑定交换机// 参数一:队列名称// 参数二:交换机名称// 参数三:路由keychannel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, ROUTING_KEY_INSERT);channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, ROUTING_KEY_UPDATE);// 发送消息String msgInsert = "新增了商品,路由模式:routing key 为 insert";// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,如果是简单模式,可以直接传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish(DIRECT_EXCHANGE, ROUTING_KEY_INSERT, null, msgInsert.getBytes());// 发送消息String msgUpdate = "新增了商品,路由模式:routing key 为 update";// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,如果是简单模式,可以直接传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish(DIRECT_EXCHANGE, ROUTING_KEY_UPDATE, null, msgUpdate.getBytes());// 释放资源RabbitMQUtil.close(channel, connection);}
}
消费者 insert 代码
package top.simba1949.routing;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class ConsumerInsert {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明一个队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);// 监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============Routing insert start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("============Routing insert end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}
消费者 update 代码
package top.simba1949.routing;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class ConsumerUpdate {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 4.声明一个队列channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);// 5.监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============Routing update start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("===========Routing update end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}

Topics 通配符模式

原理

Receiving messages based on a pattern (topics)

在这里插入图片描述

Topic 类型与 Direct 相比,都是可以根据 Routing Key 把消息路由到不同的队列。只不过 Topic 类型的 Exchange 可以让队列在绑定 Routing Key 的时候可以使用通配符

Routing Key 一般都是有一个或者多个单词组成,多个单词以“.” 分割,例如:item.insert

通配符规则

  • # :匹配一个或者多个词;eg :item.# 能够匹配 item.insert 或者 item.insert.abc ;
  • * :匹配一个词;item.* 能够匹配 item.insert;
工具类代码
package top.simba1949;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/16 16:20*/
public class RabbitMQUtil {/*** 获取连接* @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.8.9");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 2.通过连接工厂创建连接Connection connection = connectionFactory.newConnection();return connection;}/*** 关闭连接* @param channel* @param connection* @throws IOException* @throws TimeoutException*/public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {channel.close();connection.close();}
}
生产者代码
package top.simba1949.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 路由模式的交换机类型为:topic* @author Theodore* @date 2020/6/15 17:52*/
public class Producer {public static final String TOPIC_EXCHANGE = "topic_exchange";public static final String TOPIC_QUEUE_1 = "topic_queue_1";public static final String TOPIC_QUEUE_2 = "topic_queue_2";public static final String ROUTING_KEY_TOPIC_1 = "item.#";public static final String ROUTING_KEY_TOPIC_2 = "item.*";public static final String ROUTING_KEY_INSERT = "item.insert";public static final String ROUTING_KEY_UPDATE = "item.update";public static final String ROUTING_KEY_DELETE = "item.delete.abc";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtil.getConnection();// 通过 connection 创建 channelChannel channel = connection.createChannel();// 声明交换机// 参数一:交换机名称// 参数二:交换机类型: DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);// 声明队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null);channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null);// 队列绑定交换机// 参数一:队列名称// 参数二:交换机名称// 参数三:路由keychannel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, ROUTING_KEY_TOPIC_1);channel.queueBind(TOPIC_QUEUE_2, TOPIC_EXCHANGE, ROUTING_KEY_TOPIC_2);// 发送消息String msgInsert = "新增了商品,Topic模式:routing key 为 item.insert";// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,如果是简单模式,可以直接传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish(TOPIC_EXCHANGE, ROUTING_KEY_INSERT, null, msgInsert.getBytes());// 发送消息String msgUpdate = "更新了商品,Topic模式:routing key 为 item.update";// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,如果是简单模式,可以直接传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish(TOPIC_EXCHANGE, ROUTING_KEY_UPDATE, null, msgUpdate.getBytes());// 发送消息String msgDelete = "删除了商品,Topic模式:routing key 为 item.delete.abc";// 参数一:交换机名称,如果没有指定则使用 Default Exchange;// 参数二:路由 key,如果是简单模式,可以直接传递队列名称// 参数三:消息其他属性// 参数四:消息内容channel.basicPublish(TOPIC_EXCHANGE, ROUTING_KEY_DELETE, null, msgDelete.getBytes());// 释放资源RabbitMQUtil.close(channel, connection);}
}
消费者1代码
package top.simba1949.topic;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明一个队列// 参数一:队列名称// 参数二:是否定义持久化队列// 参数三:是否独占本次连接// 参数四:是否在不使用的使用自动删除队列// 参数五:队列其他参数channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);// 监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("============end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.TOPIC_QUEUE_1, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}
消费者2代码
package top.simba1949.topic;import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import top.simba1949.RabbitMQUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author Theodore* @date 2020/6/15 17:49*/
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 创建连接Connection connection = RabbitMQUtil.getConnection();// 创建信道Channel channel = connection.createChannel();// 4.声明一个队列channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);// 5.监听消息DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {/*** @param consumerTag 消费标签,在 channel.basicConsume 时候可以指定* @param envelope 消息包内容,可从中获取消息id、消息routingKey、交换机和重转标记(收到消息失败后是否需要重新发送该消息)* @param properties 消息属性* @param body 消息* @throws IOException*/@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);System.out.println("============start =============");System.out.println("路由 key :" + envelope.getRoutingKey());System.out.println("交换机 :" + envelope.getExchange());System.out.println("消息id :" + envelope.getDeliveryTag());System.out.println("接收到的消息 :" + new String(body, "UTF-8"));Thread.sleep(1000L);// 客户端自己确认消息收到ackchannel.basicAck(envelope.getDeliveryTag(), false);System.out.println("===========end =============");}};// 监听消息// 参数一:队列名称// 参数二:是否自动确认,设置为true表示消息接收到自动向mq回复接收到了,mq接收到消息之后会删除消息,设置为false则需要手动确认channel.basicConsume(Producer.TOPIC_QUEUE_2, false, defaultConsumer);// 不关闭资源,会一直监听// channel.close();// connection.close();}
}

RabbitMQ 高级

过期时间TTL

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接受获取;过了之后消息将自动被删除。

RabbitMQ 可以对消息和队列设置 TTL。

  1. 通过队列属性设置 TTL,队列中所有消息都有相同的过期时间;
  2. 对消息进行单独设置,每条消息 TTL 可以不同;

如果上述两种方法同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL 值,就称为 dead message 被投递到死信队列,消费者将无法接收到消息。

死信队列

DLX,全称为 Dead-Letter-Exchange ,可以称之为死信交换机。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列;

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可;

在这里插入图片描述

延迟队列

延迟队列存储的对象是对应的延迟消息;所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费;

在 RabbitMQ 中延迟队列可以通过 过期时间和死信队列 来实现;

消息确认机制

确认并且保证消息被送达,提供两种方式:发布确认和事务;(两者不可同时使用)在 channel 为事务时,不可引入确认模式;通用 channel 为确认模式下,不可使用事务。

发布确认

有两种方式:消息发送成功确认和消息发送失败回调;

事务支持

场景:业务处理伴随着消息的发送,业务处理失败(事务回滚)后要求消息不发送。RabbitMQ 使用调用者的外部事务,通常是首选,因为它是非入侵的(低耦合);

<!--定义rabbitTemplate对象操作可以在代码中方便发送消息-->
<!--confirm-callback="confirmCallback"表示消息发送失败回调-->
<!--return-callback="returnCallback"表示消息失败回调,同时需要配置mandatory="true",否则消息则丢失-->
<!--channel-transacted="true"表示支持事务操作-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"confirm-callback="confirmCallback"return-callback="returnCallback"mandatory="true"channel-transacted="true"/>
<!--平台事务管理器-->
<bean id="rabbitTransactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"><property name="connectionFactory" ref="connectionFactory"/>
</bean>

消息追踪

消息中心的消息追踪需要使用 Trace 实现,Trace 是 RabbitMQ 用于记录每次发送的消息,方便使用 RabbitMQ 的开发者调试、排错。可通过插件形式提供可视化界面。Trace 启动后会自动创建系统 Exchange:amq.rabbitmq.trace,每一个队列会自动绑定该 Exchange ,绑定后发送到该队列的消息会记录到 Trace 日志。

命令集描述
rabbitmq-plugins list查看插件列表
rabbitmq-plugins enable rabbitmq_tracingrabbitmq启动trace插件
rabbitmqctl trace_on打开 trace 开关
rabbitmqctl trace_on vhost-path打开 trace 的开关,追踪 vhost-path 日志
rabbitmqctl trace_off关闭 trace 的开关
rabbitmqctl-plugins disable rabbitmq_tracingrabbitmq 关闭 trace 插件
rabbitmqctl set_user_tags 用户名 administrator只有 administrator 的角色才能查看日志界面

安装插件并开启 trace_on 之后,会发现多了个 exchange : amq.rabbitmq.trace,类型为 topic

RabbitMQ 高可用集群

  1. 主备模式:用来实现 RabbitMQ 的高可用集群,一般是在并发和数据不是特别多的时候使用,当主节点挂点以后会从备份节点中选择一个节点出来作为主节点对外提供服务;消费者通过 HAProxy 访问;HAProxy 可以切换主备节点;
  2. 远程模式: 主要用来实现双活,简称为 “Shovel模式” ,就是把消息复制到不同的数据中心,让两个跨地域的集群互联;
  3. 镜像队列模式: 也别称为 Mirror 队列,主要是用来保证 MQ 消息的可靠性,通过消息复制的方式能够保证消息 100% 不丢失,同时该集群模式也是企业中使用最多的模式;
  4. 多活模式: 主要用来实现异地数据复制,Shovel 模式其实也可以实现,但是Shovel配置比较繁琐,而且还受版本限制;使用多活模式需要借助 federation 插件来实现集群与集群之间或者节点与节点之间的消息复制。

列会自动绑定该 Exchange ,绑定后发送到该队列的消息会记录到 Trace 日志。

命令集描述
rabbitmq-plugins list查看插件列表
rabbitmq-plugins enable rabbitmq_tracingrabbitmq启动trace插件
rabbitmqctl trace_on打开 trace 开关
rabbitmqctl trace_on vhost-path打开 trace 的开关,追踪 vhost-path 日志
rabbitmqctl trace_off关闭 trace 的开关
rabbitmqctl-plugins disable rabbitmq_tracingrabbitmq 关闭 trace 插件
rabbitmqctl set_user_tags 用户名 administrator只有 administrator 的角色才能查看日志界面

安装插件并开启 trace_on 之后,会发现多了个 exchange : amq.rabbitmq.trace,类型为 topic

RabbitMQ 高可用集群

  1. 主备模式:用来实现 RabbitMQ 的高可用集群,一般是在并发和数据不是特别多的时候使用,当主节点挂点以后会从备份节点中选择一个节点出来作为主节点对外提供服务;消费者通过 HAProxy 访问;HAProxy 可以切换主备节点;
  2. 远程模式: 主要用来实现双活,简称为 “Shovel模式” ,就是把消息复制到不同的数据中心,让两个跨地域的集群互联;
  3. 镜像队列模式: 也别称为 Mirror 队列,主要是用来保证 MQ 消息的可靠性,通过消息复制的方式能够保证消息 100% 不丢失,同时该集群模式也是企业中使用最多的模式;
  4. 多活模式: 主要用来实现异地数据复制,Shovel 模式其实也可以实现,但是Shovel配置比较繁琐,而且还受版本限制;使用多活模式需要借助 federation 插件来实现集群与集群之间或者节点与节点之间的消息复制。

这篇关于RabbitMQ核心概念及AMQP协议的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/501472

相关文章

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

RabbitMQ练习(AMQP 0-9-1 Overview)

1、What is AMQP 0-9-1 AMQP 0-9-1(高级消息队列协议)是一种网络协议,它允许遵从该协议的客户端(Publisher或者Consumer)应用程序与遵从该协议的消息中间件代理(Broker,如RabbitMQ)进行通信。 AMQP 0-9-1模型的核心概念包括消息发布者(producers/publisher)、消息(messages)、交换机(exchanges)、

【Linux】应用层http协议

一、HTTP协议 1.1 简要介绍一下HTTP        我们在网络的应用层中可以自己定义协议,但是,已经有大佬定义了一些现成的,非常好用的应用层协议,供我们直接使用,HTTP(超文本传输协议)就是其中之一。        在互联网世界中,HTTP(超文本传输协议)是一个至关重要的协议,他定义了客户端(如浏览器)与服务器之间如何进行通信,以交换或者传输超文本(比如HTML文档)。

【VUE】跨域问题的概念,以及解决方法。

目录 1.跨域概念 2.解决方法 2.1 配置网络请求代理 2.2 使用@CrossOrigin 注解 2.3 通过配置文件实现跨域 2.4 添加 CorsWebFilter 来解决跨域问题 1.跨域概念 跨域问题是由于浏览器实施了同源策略,该策略要求请求的域名、协议和端口必须与提供资源的服务相同。如果不相同,则需要服务器显式地允许这种跨域请求。一般在springbo

PostgreSQL核心功能特性与使用领域及场景分析

PostgreSQL有什么优点? 开源和免费 PostgreSQL是一个开源的数据库管理系统,可以免费使用和修改。这降低了企业的成本,并为开发者提供了一个活跃的社区和丰富的资源。 高度兼容 PostgreSQL支持多种操作系统(如Linux、Windows、macOS等)和编程语言(如C、C++、Java、Python、Ruby等),并提供了多种接口(如JDBC、ODBC、ADO.NET等

【Go】go连接clickhouse使用TCP协议

离开你是傻是对是错 是看破是软弱 这结果是爱是恨或者是什么 如果是种解脱 怎么会还有眷恋在我心窝 那么爱你为什么                      🎵 黄品源/莫文蔚《那么爱你为什么》 package mainimport ("context""fmt""log""time""github.com/ClickHouse/clickhouse-go/v2")func main(

RabbitMQ使用及与spring boot整合

1.MQ   消息队列(Message Queue,简称MQ)——应用程序和应用程序之间的通信方法   应用:不同进程Process/线程Thread之间通信   比较流行的中间件:     ActiveMQ     RabbitMQ(非常重量级,更适合于企业级的开发)     Kafka(高吞吐量的分布式发布订阅消息系统)     RocketMQ   在高并发、可靠性、成熟度等