监听DB配置变更之go-broadcast简单实现

2024-06-10 16:20

本文主要是介绍监听DB配置变更之go-broadcast简单实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1. 前言
  • 2. 分析
  • 3. 实现
  • 4. 问题
  • 5. 小结
  • 6. 参考

1. 前言

之前遇到一个需求,因为配置的查找是基于db的,而db的更改却无法实时通知到具体利用到这条数据的使用方,为了实现db数据变动时,能够尽快让使用方知道这条数据发生了变更,从而进行后续数据变更等相关逻辑的运行,就需要实现db数据变动时的通知。

在观察者模式中,因为观察者模式是一种一对多的关系模式,即多个观察者观察同一个主题对象,当主题对象发生变化时,会通知所有的观察者对象。

2. 分析

使用观察者模式来实现的话,则需要实现如下四个部分的结构:

  1. 抽象主题
  2. 具体主题
  3. 抽象观察者
  4. 具体观察者

举个例子,在我们日常使用微信公众号中,当你关注了一个公众号,这个公众号如果有更新的话,则会推送给每一个关注过这个公众号的用户。此时我们可以将具体的部分的接收映射到微信公众号中,即:

  1. 抽象主题:公众号,具备订阅、取消订阅和发送消息的功能
  2. 具体主题:具体某一个公众号
  3. 抽象观察者:用户(泛指使用微信公众号的用户受众)
  4. 具体观察者:某一个具体的用户

分析了以上四个结构之后,我们需要实现的功能部分就清楚了。即我们需要实现一个抽象主题,这个主题需要有提供注册、取消注册以及提交信息的能力,当提交信息到抽象主题的时候,抽象主题需要将这个消息通知到所有已经注册过的具体观察者。

3. 实现

在明确了需求之后, 就开始进行功能的实现,因为使用的是go语言,则第一时间肯定是希望通过chan这样的功能来实现,因为chan天生具备监听的能力,我们可以通过监听注册到抽象主题的chan,从而实现抽象主题消息的实时监听。

但秉持着“你需要的功能,基本都有人实现过”的方针,第一时间还是上到了github,看看是否有现成的开源方案,经过一番查找,还真发现了一个开源库可以使用,这个库的名称是go-broadcast。

下面就来说下broadcaster是如何实现上面的功能逻辑的,broadcaster这个库的代码很简单,主体实现逻辑只有110行代码左右,但符合我们的功能逻辑实现需要。

type broadcaster struct {input chan interface{}reg   chan chan<- interface{}unreg chan chan<- interface{}outputs map[chan<- interface{}]bool
}// The Broadcaster interface describes the main entry points to
// broadcasters.
type Broadcaster interface {// Register a new channel to receive broadcastsRegister(chan<- interface{})// Unregister a channel so that it no longer receives broadcasts.Unregister(chan<- interface{})// Shut this broadcaster down.Close() error// Submit a new object to all subscribersSubmit(interface{})// Try Submit a new object to all subscribers return false if input chan is fillTrySubmit(interface{}) bool
}

首先定义了一个接口叫做Broadcaster,然后定义了一个broadcaster实现了Broadcaster的所有方法逻辑。

func (b *broadcaster) Register(newch chan<- interface{}) {b.reg <- newch
}func (b *broadcaster) Unregister(newch chan<- interface{}) {b.unreg <- newch
}func (b *broadcaster) Close() error {close(b.reg)close(b.unreg)return nil
}// Submit an item to be broadcast to all listeners.
func (b *broadcaster) Submit(m interface{}) {if b != nil {b.input <- m}
}
  • Register方法主要实现了将注册的chan直接放入到reg这个chan中,用于后续注册
  • Register方法主要实现了将注册的chan直接让如到ureg这个chan中,用于后续注销
  • Close方法主要是关闭reg和ureg两个chan
  • Submit方法主要实现对抽象主题broadcaster发送消息,将消息放入input这个chan中

上面的方法都是基于chan作为通信的,而chan中有了数据,后续需要消费数据。

// NewBroadcaster creates a new broadcaster with the given input
// channel buffer length.
func NewBroadcaster(buflen int) Broadcaster {b := &broadcaster{input:   make(chan interface{}, buflen),reg:     make(chan chan<- interface{}),unreg:   make(chan chan<- interface{}),outputs: make(map[chan<- interface{}]bool),}go b.run()return b
}

这里的run()方法则是消费所有chan数据的地方。

func (b *broadcaster) broadcast(m interface{}) {for ch := range b.outputs { // 遍历所有注册的chan,将消息发送到注册的chan中ch <- m}
}func (b *broadcaster) run() {for {select {case m := <-b.input: // 如果有消息输入,则广播出去b.broadcast(m)case ch, ok := <-b.reg: // 如果有新注册的,则进行output的添加if ok {b.outputs[ch] = true} else {return}case ch := <-b.unreg: // 如果有注销的,则进行output的删除delete(b.outputs, ch)}}
}

整体的运行图如下:

在这里插入图片描述

  • 对应chan通过reg进行注册,注册后的chan记录在outputs中
  • 对应chan通过ureg进行注销,注销后的chan从output中移除
  • 对应的信息通过input输入,输入后的msg通过遍历outputs注册列表,从而通知到每一个注册者

4. 问题

在使用go-broadcast的过程中,看到之前有个pr加了一个TrySubmit的逻辑,这个逻辑主要是解决当input被装满了以后,broadcast会被阻塞,这个时候如果有新的消息进来,如何办呢?

// TrySubmit attempts to submit an item to be broadcast, returning
// true iff it the item was broadcast, else false.
func (b *broadcaster) TrySubmit(m interface{}) bool {if b == nil {return false}select {case b.input <- m:return truedefault:return false}
}

解决办法是采用select的方法尝试去塞入,塞入不成功则意味着消息提交失败,返回false,让使用者根据消息提交的结果进行后续的逻辑处理。

但这里还存在另外一个问题,库中给了一个样本case,这个样本case基于的条件都是消息传递给chan的时候没有阻塞。如下代码所示:

func (b *broadcaster) broadcast(m interface{}) {for ch := range b.outputs {ch <- m}
}

但一旦有注册的chan消费的时候阻塞了,这时候就会产生问题,会导致其它正常消费的chan因为一个异常chan而全部被阻塞住,导致其他chan都无法正常消费。

这个时候就会导致在input没有满的时候,即消息可以放入,但是消息无法被正常的消费,进而又反向导致input逐渐被塞满,最终导致input无法被塞入,消息也无法被发送到对应的chan中,导致run方法逻辑卡在broadcast中,导致整个运行出现问题。

解决办法:

func (b *broadcaster) broadcast(m interface{}) {for ch := range b.outputs {// if exist one output consume the chan message is too slow,// will block other output receive the msg.select {case ch <- m:default:}}
}

但这种虽然解决了一个chan满消费block其他chan的问题,随之也引入了丢消息的问题了,即有些消费慢的chan,由于chan消费慢导致无法接收新的消息,进而导致新消息丢失的问题。

5. 小结

因为需要实时监听db配置的变更,所以去探寻了一下方案,最终采用了go-broadcast的方案,但在使用go-broadcast的过程中,发现在broadcast消息的时候存在阻塞的行为,为了保证整个服务不被某个chan阻塞而停止运行,在broadcast消息的时候添加了select default条件来规避这个问题。

6. 参考

  • go-broadcast

这篇关于监听DB配置变更之go-broadcast简单实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1048658

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

CentOS7安装配置mysql5.7 tar免安装版

一、CentOS7.4系统自带mariadb # 查看系统自带的Mariadb[root@localhost~]# rpm -qa|grep mariadbmariadb-libs-5.5.44-2.el7.centos.x86_64# 卸载系统自带的Mariadb[root@localhost ~]# rpm -e --nodeps mariadb-libs-5.5.44-2.el7

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

NameNode内存生产配置

Hadoop2.x 系列,配置 NameNode 内存 NameNode 内存默认 2000m ,如果服务器内存 4G , NameNode 内存可以配置 3g 。在 hadoop-env.sh 文件中配置如下。 HADOOP_NAMENODE_OPTS=-Xmx3072m Hadoop3.x 系列,配置 Nam

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间