在Go中迅速使用RabbitMQ

2024-09-05 19:44
文章标签 go 使用 rabbitmq 迅速

本文主要是介绍在Go中迅速使用RabbitMQ,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1 认识
    • 1.1 MQ分类
    • 1.2 安装
    • 1.3 基本流程
  • 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)
  • 3 交换机
    • 3.1 fanout
    • 3.2 direct
    • 3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go)
  • 4 Golang创建交换机/队列/Publish/Consume/Bind
  • 5 可靠性
    • 5.1 生产者可靠性
    • 5.2 MQ可靠性
      • 5.2.1 Lazy Queue
    • 5.3 消费者可靠性
    • 5.4 业务幂等性
    • 5.4 Golang实现可靠性
      • 1. 确保消息生产者的可靠性
      • 2. 确保消息队列的可靠性
      • 3. 确保消息消费者的可靠性
      • 4. 容错处理
  • 6 延迟消息
    • 6.1 死信交换机
    • 6.2 延迟消息插件
      • 6.2.1 安装
      • 6.2.2 使用
      • 6.2.3 应用场景

  • 为什么要使用消息队列

image-20240903160417835

1 认识

1.1 MQ分类

  • 有Broker

    • 重Topic —— 在整个broker中,依据topic来进行消息中转。在重topic的MQ中必然需要topic —— kafka
    • 轻Topic —— topic只是一种中转模式 —— rabbitMQ
  • 无Broker

1.2 安装

# latest RabbitMQ 3.13
docker run \-e RABBITMQ_DEFAULT_USER=dusong \  #默认账号和密码均为:guest-e RABBITMQ_DEFAULT_PASS=123123 \-d \  #detached mode-v mq-plugins:/plugins \   #插件挂载--rm \--name rabbitmq \-p 5672:5672 \    #消息通信端口-p 15672:15672 \  #管理界面端口rabbitmq:3.13-management

1.3 基本流程

image-20240904110326213

  • exchange只能转发消息,不能存储消息
  • 通过bind将queue绑定到exchange

2 Work模型

  • 多个消费者绑定到一个队列

  • 同一个消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量(不设置默认平均平均分配)

    image-20240904144344585

    err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global
    )
    

3 交换机

3.1 fanout

fanout类型的交换机会将消息转发给所有绑定到改交换机的队列

3.2 direct

image-20240904151234823

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)
failOnError(err, "Failed to declare an exchange")ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,"logs_direct",         // exchange"log", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

3.3 topic

image-20240904151944421

4 Golang创建交换机/队列/Publish/Consume/Bind

  • 创建交换机

    err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
    )
    
  • 创建队列

    q, err := ch.QueueDeclare("hello", // namefalse,   // durable(是否持久化)false,   // delete when unusedfalse,   // exclusivefalse,   // no-waitnil,     // arguments
    )
    
  • 绑定

    err = ch.QueueBind(q.Name,        // queue name"log",             // routing key"logs_direct", // exchangefalse,nil
    )
    
  • 发送

    body := "this is log"
    err = ch.PublishWithContext(ctx,"logs_direct",         // exchange"log", // routing keyfalse,                 // mandatoryfalse,                 // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})
    
  • 接收

    msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args
    )
    

5 可靠性

5.1 生产者可靠性

  • 生产者重连

  • 生产者确认(ack)

5.2 MQ可靠性

  • 交换机/队列持久化
  • 消息持久化

5.2.1 Lazy Queue

image-20240904172117264

image-20240904163818387

5.3 消费者可靠性

  • 消费者确认机制

    image-20240904172521990

5.4 业务幂等性

  • 消费者因为保证可靠性可能消费业务多次,因此需要保证业务幂等性
  1. 给消息加上uuid
  2. 在业务逻辑上做修改

5.4 Golang实现可靠性

在使用 RabbitMQ 的 Go 应用程序中,要确保消息的可靠性,通常需要从以下几个方面入手:

1. 确保消息生产者的可靠性

  • 消息确认(Publisher Confirms): 开启 RabbitMQ 的发布确认模式。通过调用 Channel.Confirm() 方法,让 RabbitMQ 服务器在成功接收并持久化消息后向生产者发送确认。这样可以确保生产者知道消息已被可靠接收。

    ch.Confirm(false) // 启用发布确认模式
    confirm := ch.NotifyPublish(make(chan amqp.Confirmation, 1))// 发布消息
    err = ch.Publish(exchange, routingKey, mandatory, immediate, msg)
    if err != nil {// 处理发布失败的情况
    }select {
    case confirmed := <-confirm:if confirmed.Ack {fmt.Println("消息已确认")} else {fmt.Println("消息未确认")}
    case <-time.After(time.Second * 5):fmt.Println("消息确认超时")
    }
    
  • 消息持久化(Message Durability): 将消息标记为持久化,以确保即使 RabbitMQ 服务器重启,消息也不会丢失。通过设置 DeliveryModeamqp.Persistent 来实现:

    msg := amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte("Hello, RabbitMQ!"),
    }
    

2. 确保消息队列的可靠性

  • 队列持久化(Queue Durability): 创建队列时,将其声明为持久化队列。这样即使 RabbitMQ 服务器重启,队列依然存在。

    _, err = ch.QueueDeclare("my_queue",  // 队列名true,        // 是否持久化false,       // 是否自动删除false,       // 是否排他false,       // 是否阻塞nil,         // 其他参数
    )
    if err != nil {log.Fatalf("Failed to declare a queue: %s", err)
    }
    

3. 确保消息消费者的可靠性

  • 手动确认(Manual Acknowledgment): 消费者手动确认接收到的消息。这样只有在消息成功处理后,RabbitMQ 才会将其从队列中移除。如果消费者没有确认消息且发生故障,RabbitMQ 会将消息重新投递。

    msgs, err := ch.Consume("my_queue", // 队列名"",         // 消费者标识false,      // 自动确认false,      // 是否排他false,      // 是否阻塞false,      // 是否在同一个连接上消费nil,        // 其他参数
    )
    if err != nil {log.Fatalf("Failed to register a consumer: %s", err)
    }for d := range msgs {// 处理消息fmt.Printf("Received a message: %s", d.Body)// 手动确认d.Ack(false)
    }
    
  • QoS(Quality of Service): 设置消费者的 QoS 参数,例如 prefetch_count,确保消费者不会一次处理太多消息,从而导致过载。

    err = ch.Qos(1,    // 每次处理一条消息0,    // 消息大小限制(不限制)false, // 是否应用于整个通道
    )
    if err != nil {log.Fatalf("Failed to set QoS: %s", err)
    }
    

4. 容错处理

  • 重试机制: 在生产者和消费者中实现重试机制,例如使用带有指数回退的重试逻辑,以应对 RabbitMQ 不可用或网络波动的情况。

  • 死信队列(DLX): 配置死信队列,将处理失败的消息路由到指定的死信队列,方便后续分析和处理。

通过这些措施,可以有效提高使用 RabbitMQ 时的消息可靠性。

6 延迟消息

6.1 死信交换机

image-20240905145924200

6.2 延迟消息插件

6.2.1 安装

  1. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

  2. 将插件放在该目录

    image-20240905153455222

  3. docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq-delayed-message-exchange

6.2.2 使用

 // 3. 声明延迟交换机err = ch.ExchangeDeclare("delay_exchange",               // 交换机名称"x-delayed-message",            // 交换机类型true,                           // 是否持久化false,                          // 是否自动删除false,                          // 是否内部使用false,                          // 是否等待amqp.Table{"x-delayed-type": "direct"}, // 交换机类型的设置)failOnError(err, "Failed to declare an exchange")// 4. 发送消息body := "Hello World with delay"err = ch.Publish("delay_exchange", // 交换机名称"routing_key",    // 路由键false,            // 是否强制发送false,            // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),Headers: amqp.Table{"x-delay": int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)},})

6.2.3 应用场景

  • 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景

image-20240905155732697
false, // 是否立即发送
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(body),
Headers: amqp.Table{
“x-delay”: int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)
},
})


### 6.2.3 应用场景- 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景[外链图片转存中...(img-eA0QMPnx-1725527666228)]

这篇关于在Go中迅速使用RabbitMQ的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1139893

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

git使用的说明总结

Git使用说明 下载安装(下载地址) macOS: Git - Downloading macOS Windows: Git - Downloading Windows Linux/Unix: Git (git-scm.com) 创建新仓库 本地创建新仓库:创建新文件夹,进入文件夹目录,执行指令 git init ,用以创建新的git 克隆仓库 执行指令用以创建一个本地仓库的

【北交大信息所AI-Max2】使用方法

BJTU信息所集群AI_MAX2使用方法 使用的前提是预约到相应的算力卡,拥有登录权限的账号密码,一般为导师组共用一个。 有浏览器、ssh工具就可以。 1.新建集群Terminal 浏览器登陆10.126.62.75 (如果是1集群把75改成66) 交互式开发 执行器选Terminal 密码随便设一个(需记住) 工作空间:私有数据、全部文件 加速器选GeForce_RTX_2080_Ti

【Linux 从基础到进阶】Ansible自动化运维工具使用

Ansible自动化运维工具使用 Ansible 是一款开源的自动化运维工具,采用无代理架构(agentless),基于 SSH 连接进行管理,具有简单易用、灵活强大、可扩展性高等特点。它广泛用于服务器管理、应用部署、配置管理等任务。本文将介绍 Ansible 的安装、基本使用方法及一些实际运维场景中的应用,旨在帮助运维人员快速上手并熟练运用 Ansible。 1. Ansible的核心概念