手把手教姐姐写消息队列

2023-11-12 01:20
文章标签 手把手 队列 消息 姐姐

本文主要是介绍手把手教姐姐写消息队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧。因为要用go语言写,这可给姐姐愁坏了。赶紧来求助我,我这么坚贞不屈一人,在姐姐的软磨硬泡下还是答应他了,所以接下来我就手把手教姐姐怎么写一个消息队列。下面我们就来看一看我是怎么写的吧~~~。

本代码已上传到我的github:

有需要的小伙伴,可自行下载,顺便给个小星星吧~~~

什么是消息队列

姐姐真是把我愁坏了,自己写的精通kafka,竟然不知道什么是消息队列,于是,一向好脾气的我开始给姐姐讲一讲什么是消息队列。

消息队列,我们一般称它为MQ(Message Queue),两个单词的结合,这两个英文单词想必大家都应该知道吧,其实最熟悉的还是Queue吧,即队列。队列是一种先进先出的数据结构,队列的使用还是比较普遍的,但是已经有队列了,怎么还需要MQ呢?

我:问你呢,姐姐,知道吗?为什么还需要MQ

姐姐:快点讲,想挨打呀?

我:噗。。。算我多嘴,哼~~~

欠欠的我开始了接下来的耐心讲解......

举一个简单的例子,假设现在我们要做一个系统,该登陆系统需要在用户登陆成功后,发送封邮件到用户邮箱进行提醒,需求还是很简单的,我们先看一看没有MQ,我们该怎么实现呢?画一个时序图来看一看:

看这个图,邮件发送在请求登陆时进行,当密码验证成功后,就发送邮件,然后返回登陆成功。这样是可以的,但是他是有缺陷的。这让我们的登陆操作变得复杂了,每次请求登陆都需要进行邮件发送,如果这里出现错误,整个登陆请求也出现了错误,导致登陆不成功;还有一个问题,本来我们登陆请求调用接口仅仅需要100ms,因为中间要做一次发送邮件的等待,那么调用一次登陆接口的时间就要增长,这就是问题所在,一封邮件他的优先级 不是很高的,用户也不需要实时收到这封邮件,所以这时,就体现了消息队列的重要性了,我们用消息队列进行改进一下。

这里我们将发送邮件请求放到Mq中,这样我们就能提高用户体验的吞吐量,这个很重要,顾客就是上帝嘛,毕竟也没有人喜欢用一个很慢很慢的app。

这里只是举了MQ众多应用中的其中一个,即异步应用,MQ还在系统解藕、削峰/限流中有着重要应用,这两个我就不具体讲解了,原理都一样,好好思考一下,你们都能懂得。

channel

好啦,姐姐终于知道什么是消息队列了,但是现在还是没法进行消息队列开发的,因为还差一个知识点,即go语言中的channel。这个很重要,我们还需要靠这个来开发我们的消息队列呢。

因篇幅有限,这里不详细介绍channel,只介绍基本使用方法。

什么是channel

Goroutine 和 Channel 是 Go 语言并发编程的两大基石。Goroutine 用于执行并发任务,Channel 用于 goroutine 之间的同步、通信。Go提倡使用通信的方法代替共享内存,当一个Goroutine需要和其他Goroutine资源共享时,Channel就会在他们之间架起一座桥梁,并提供确保安全同步的机制。channel本质上其实还是一个队列,遵循FIFO原则。具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;

  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

创建通道

创建通道需要用到关键字 make ,格式如下:

通道实例 := make(chan 数据类型)
  • 数据类型:通道内传输的元素类型。

  • 通道实例:通过make创建的通道句柄。

无缓冲通道的使用

Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。

无缓冲通道的定义方式如下:

通道实例 := make(chan 通道类型)
  • 通道类型:和无缓冲通道用法一致,影响通道发送和接收的数据类型。

  • 缓冲大小:0

  • 通道实例:被创建出的通道实例。

写个例子来帮助大家理解一下吧:

package mainimport ("sync""time"
)func main() {c := make(chan string)var wg sync.WaitGroupwg.Add(2)go func() {defer wg.Done()c <- `Golang梦工厂`}()go func() {defer wg.Done()time.Sleep(time.Second * 1)println(`Message: `+ <-c)}()wg.Wait()
}

带缓冲的通道的使用

Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

有缓冲通道的定义方式如下:

通道实例 := make(chan 通道类型, 缓冲大小)
  • 通道类型:和无缓冲通道用法一致,影响通道发送和接收的数据类型。

  • 缓冲大小:决定通道最多可以保存的元素数量。

  • 通道实例:被创建出的通道实例。

来写一个例子讲解一下:

package mainimport ("sync""time"
)func main() {c := make(chan string, 2)var wg sync.WaitGroupwg.Add(2)go func() {defer wg.Done()c <- `Golang梦工厂`c <- `asong`}()go func() {defer wg.Done()time.Sleep(time.Second * 1)println(`公众号: `+ <-c)println(`作者: `+ <-c)}()wg.Wait()
}

好啦,通道的概念就介绍到这里了,如果需要,下一篇我出一个channel详细讲解的文章。

消息队列编码实现

准备篇

终于开始进入主题了,姐姐都听的快要睡着了,我轰隆一嗓子,立马精神,但是呢,asong也是挨了一顿小电炮,代价惨痛呀,呜呜呜............

在开始编写代码编写直接,我需要构思我们的整个代码架构,这才是正确的编码方式。我们先来定义一个接口,把我们需要实现的方法先列出来,后期对每一个代码进行实现就可以了。因此可以列出如下方法:

type Broker interface {publish(topic string, msg interface{}) errorsubscribe(topic string) (<-chan interface{}, error)unsubscribe(topic string, sub <-chan interface{}) errorclose()broadcast(msg interface{}, subscribers []chan interface{})setConditions(capacity int)
}
  • publish:进行消息的推送,有两个参数即topicmsg,分别是订阅的主题、要传递的消息

  • subscribe:消息的订阅,传入订阅的主题,即可完成订阅,并返回对应的channel通道用来接收数据

  • unsubscribe:取消订阅,传入订阅的主题和对应的通道

  • close:这个的作用就是很明显了,就是用来关闭消息队列的

  • broadCast:这个属于内部方法,作用是进行广播,对推送的消息进行广播,保证每一个订阅者都可以收到

  • setConditions:这里是用来设置条件,条件就是消息队列的容量,这样我们就可以控制消息队列的大小了

细心的你们有没有发现什么问题,这些代码我都定义的是内部方法,也就是包外不可用。为什么这么做呢,因为这里属于代理要做的事情,我们还需要在封装一层,也就是客户端能直接调用的方法,这样才符合软件架构。因此可以写出如下代码:

package mqtype Client struct {bro *BrokerImpl
}func NewClient() *Client {return &Client{bro: NewBroker(),}
}func (c *Client)SetConditions(capacity int)  {c.bro.setConditions(capacity)
}func (c *Client)Publish(topic string, msg interface{}) error{return c.bro.publish(topic,msg)
}func (c *Client)Subscribe(topic string) (<-chan interface{}, error){return c.bro.subscribe(topic)
}func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {return c.bro.unsubscribe(topic,sub)
}func (c *Client)Close()  {c.bro.close()
}func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{for val:= range sub{if val != nil{return val}}return nil
}

上面只是准好了代码结构,但是消息队列实现的结构我们还没有设计,现在我们就来设计一下。

type BrokerImpl struct {exit chan boolcapacity inttopics map[string][]chan interface{} // key:topic  value :queuesync.RWMutex // 同步锁
}
  • exit:也是一个通道,这个用来做关闭消息队列用的

  • capacity:即用来设置消息队列的容量

  • topics:这里使用一个map结构,key即是topic,其值则是一个切片,chan类型,这里这么做的原因是我们一个topic可以有多个订阅者,所以一个订阅者对应着一个通道

  • sync.RWMutex:读写锁,这里是为了防止并发情况下,数据的推送出现错误,所以采用加锁的方式进行保证

好啦,现在我们已经准备的很充分啦,开始接下来方法填充之旅吧~~~

Publishbroadcast

这里两个合在一起讲的原因是braodcast是属于publish里的。这里的思路很简单,我们只需要把传入的数据进行广播即可了,下面我们来看代码实现:

func (b *BrokerImpl) publish(topic string, pub interface{}) error {select {case <-b.exit:return errors.New("broker closed")default:}b.RLock()subscribers, ok := b.topics[topic]b.RUnlock()if !ok {return nil}b.broadcast(pub, subscribers)return nil
}func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {count := len(subscribers)concurrency := 1switch {case count > 1000:concurrency = 3case count > 100:concurrency = 2default:concurrency = 1}pub := func(start int) {for j := start; j < count; j += concurrency {select {case subscribers[j] <- msg:case <-time.After(time.Millisecond * 5):case <-b.exit:return}}}for i := 0; i < concurrency; i++ {go pub(i)}
}

publish方法中没有什么好讲的,这里主要说一下broadcast的实现:

这里主要对数据进行广播,所以数据推送出去就可以了,没必要一直等着他推送成功,所以这里我们我们采用goroutine。在推送的时候,当推送失败时,我们也不能一直等待呀,所以这里我们加了一个超时机制,超过5毫秒就停止推送,接着进行下面的推送。

可能你们会有疑惑,上面怎么还有一个switch选项呀,干什么用的呢?考虑这样一个问题,当有大量的订阅者时,,比如10000个,我们一个for循环去做消息的推送,那推送一次就会耗费很多时间,并且不同的消费者之间也会产生延时,,所以采用这种方法进行分解可以降低一定的时间。

subscribeunsubScribe

我们先来看代码:

func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {select {case <-b.exit:return nil, errors.New("broker closed")default:}ch := make(chan interface{}, b.capacity)b.Lock()b.topics[topic] = append(b.topics[topic], ch)b.Unlock()return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {select {case <-b.exit:return errors.New("broker closed")default:}b.RLock()subscribers, ok := b.topics[topic]b.RUnlock()if !ok {return nil}// delete subscribervar newSubs []chan interface{}for _, subscriber := range subscribers {if subscriber == sub {continue}newSubs = append(newSubs, subscriber)}b.Lock()b.topics[topic] = newSubsb.Unlock()return nil
}

这里其实就很简单了:

  • subscribe:这里的实现则是为订阅的主题创建一个channel,然后将订阅者加入到对应的topic中就可以了,并且返回一个接收channel

  • unsubScribe:这里实现的思路就是将我们刚才添加的channel删除就可以了。

close

func (b *BrokerImpl) close()  {select {case <-b.exit:returndefault:close(b.exit)b.Lock()b.topics = make(map[string][]chan interface{})b.Unlock()}return
}

这里就是为了关闭整个消息队列,这句代码b.topics = make(map[string][]chan interface{})比较重要,这里主要是为了保证下一次使用该消息队列不发生冲突。

setConditions GetPayLoad

还差最后两个方法,一个是设置我们的消息队列容量,另一个是封装一个方法来获取我们订阅的消息:

func (b *BrokerImpl)setConditions(capacity int)  {b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{for val:= range sub{if val != nil{return val}}return nil
}
测试

好啦,代码这么快就被写完了,接下来我们进行测试一下吧。

单元测试

正式测试之前,我们还是需要先进行一下单元测试,养成好的习惯,只有先自测了,才能有底气说我的代码没问题,要不直接跑程序,会出现很多bug的。

这里我们测试方法如下:我们向不同的topic发送不同的信息,当订阅者收到消息后,就行取消订阅。

func TestClient(t *testing.T) {b := NewClient()b.SetConditions(100)var wg sync.WaitGroupfor i := 0; i < 100; i++ {topic := fmt.Sprintf("Golang梦工厂%d", i)payload := fmt.Sprintf("asong%d", i)ch, err := b.Subscribe(topic)if err != nil {t.Fatal(err)}wg.Add(1)go func() {e := b.GetPayLoad(ch)if e != payload {t.Fatalf("%s expected %s but get %s", topic, payload, e)}if err := b.Unsubscribe(topic, ch); err != nil {t.Fatal(err)}wg.Done()}()if err := b.Publish(topic, payload); err != nil {t.Fatal(err)}}wg.Wait()
}

测试通过,没问题,接下来我们在写几个方法测试一下

测试

这里分为两种方式测试

测试一:使用一个定时器,向一个主题定时推送消息.

// 一个topic 测试
func OnceTopic()  {m := mq.NewClient()m.SetConditions(10)ch,err :=m.Subscribe(topic)if err != nil{fmt.Println("subscribe failed")return}go OncePub(m)OnceSub(ch,m)defer m.Close()
}// 定时推送
func OncePub(c *mq.Client)  {t := time.NewTicker(10 * time.Second)defer t.Stop()for  {select {case <- t.C:err := c.Publish(topic,"asong真帅")if err != nil{fmt.Println("pub message failed")}default:}}
}// 接受订阅消息
func OnceSub(m <-chan interface{},c *mq.Client)  {for  {val := c.GetPayLoad(m)fmt.Printf("get message is %s\n",val)}
}

测试二:使用一个定时器,定时向多个主题发送消息:

//多个topic测试
func ManyTopic()  {m := mq.NewClient()defer m.Close()m.SetConditions(10)top := ""for i:=0;i<10;i++{top = fmt.Sprintf("Golang梦工厂_%02d",i)go Sub(m,top)}ManyPub(m)
}func ManyPub(c *mq.Client)  {t := time.NewTicker(10 * time.Second)defer t.Stop()for  {select {case <- t.C:for i:= 0;i<10;i++{//多个topic 推送不同的消息top := fmt.Sprintf("Golang梦工厂_%02d",i)payload := fmt.Sprintf("asong真帅_%02d",i)err := c.Publish(top,payload)if err != nil{fmt.Println("pub message failed")}}default:}}
}func Sub(c *mq.Client,top string)  {ch,err := c.Subscribe(top)if err != nil{fmt.Printf("sub top:%s failed\n",top)}for  {val := c.GetPayLoad(ch)if val != nil{fmt.Printf("%s get message is %s\n",top,val)}}
}

总结

终于帮助姐姐解决了这个问题,姐姐开心死了,给我一顿亲,啊不对,是一顿夸,夸的人家都不好意思了。

这一篇你学会了吗?没学会不要紧,赶快去把源代码下载下来,好好通读一下,很好理解的~~~。

其实这一篇是为了接下来的kafka学习打基础的,学好了这一篇,接下来学习的kafka就会容易很多啦~~~

github地址:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue

如果能给一个小星星就好了~~~

结尾给大家发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,自己也收集了一本PDF,有需要的小伙可以到自行下载。获取方式:关注公众号:[Golang梦工厂],后台回复:[微服务],即可获取。

我翻译了一份GIN中文文档,会定期进行维护,有需要的小伙伴后台回复[gin]即可下载。

我是asong,一名普普通通的程序猿,让我一起慢慢变强吧。我自己建了一个golang交流群,有需要的小伙伴加我vx,我拉你入群。欢迎各位的关注,我们下期见~~~

推荐往期文章:

  • 详解Context包,看这一篇就够了!!!

  • go-ElasticSearch入门看这一篇就够了(一)

  • 面试官:go中for-range使用过吗?这几个问题你能解释一下原因吗

  • 学会wire依赖注入、cron定时任务其实就这么简单!

  • 听说你还不会jwt和swagger-饭我都不吃了带着实践项目我就来了

  • 掌握这些Go语言特性,你的水平将提高N个档次(二)

  • go实现多人聊天室,在这里你想聊什么都可以的啦!!!

  • grpc实践-学会grpc就是这么简单

  • go标准库rpc实践

  • 2020最新Gin框架中文文档 asong又捡起来了英语,用心翻译

  • 基于gin的几种热加载方式

  • boss: 这小子还不会使用validator库进行数据校验,开了~~~

这篇关于手把手教姐姐写消息队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/394144

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

poj3750约瑟夫环,循环队列

Description 有N个小孩围成一圈,给他们从1开始依次编号,现指定从第W个开始报数,报到第S个时,该小孩出列,然后从下一个小孩开始报数,仍是报到S个出列,如此重复下去,直到所有的小孩都出列(总人数不足S个时将循环报数),求小孩出列的顺序。 Input 第一行输入小孩的人数N(N<=64) 接下来每行输入一个小孩的名字(人名不超过15个字符) 最后一行输入W,S (W < N),用

POJ2010 贪心优先队列

c头牛,需要选n头(奇数);学校总共有f的资金, 每头牛分数score和学费cost,问合法招生方案中,中间分数(即排名第(n+1)/2)最高的是多少。 n头牛按照先score后cost从小到大排序; 枚举中间score的牛,  预处理左边与右边的最小花费和。 预处理直接优先队列贪心 public class Main {public static voi

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

FreeRTOS学习笔记(六)队列

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、队列的基本内容1.1 队列的引入1.2 FreeRTOS 队列的功能与作用1.3 队列的结构体1.4 队列的使用流程 二、相关API详解2.1 xQueueCreate2.2 xQueueSend2.3 xQueueReceive2.4 xQueueSendFromISR2.5 xQueueRecei

多线程篇(阻塞队列- LinkedBlockingDeque)(持续更新迭代)

目录 一、LinkedBlockingDeque是什么 二、核心属性详解 三、核心方法详解 addFirst(E e) offerFirst(E e) putFirst(E e) removeFirst() pollFirst() takeFirst() 其他 四、总结 一、LinkedBlockingDeque是什么 首先queue是一种数据结构,一个集合中

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队