【Consul】基于Golang实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制

本文主要是介绍【Consul】基于Golang实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

【Consul】基于Go实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制


大家好 我是寸铁👊
总结了一篇【Consul】基于Go实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制✨
这应该是目前全网最全的使用golang手搓Consul服务信息机制✨
喜欢的小伙伴可以点点关注 💝


前言

consul常常被用来作服务注册与服务发现,而它的watch机制则可被用来监控一些数据的更新,包括:nodes, KV pairs, health checks等。另外,在监控到数据变化后,还可以调用外部处理程序,此处理程序可以是任何可执行文件或HTTP调用,具体说明可见官网。


介绍

consul中的watch可以监听servicek-vcheckevent等事件的变化,实时获取最新的数据。
consul支持以下watch类型:

key 监听一个consul kv中的key
keyprefix 监听consul kv中的key的前缀
services 监听有效服务的变化
nodes 监听节点的变化
service 监听服务的变化
checks 监听check的变化
event 监听自定义事件的变化

从以上可以看出consul提供非常丰富的监听类型,通过这些类型我们可以实时观测到consul整个集群中的变化,从而实现一些特别的需求,比如:服务告警,配置实时更新等功能。


启动与后台展示

Windows启动命令如下:

consul agent -dev

在这里插入图片描述


后台运行结果展示如下:

在这里插入图片描述


UI界面展示

在这里插入图片描述

在这里插入图片描述

Consul服务

AgentService结构体字段具体信息

1.Kind:服务类型,是一个自定义类型 ServiceKind。
2.ID:服务的唯一标识符。(一个ID只能对应一个服务)
3.Service:服务的名称(一个服务可以创建多个ID)
4.Tags:服务的标签,以字符串数组形式存储。
5.Meta:服务的元数据,以键值对的形式存储。
6.Port:服务的端口号。
7.Address:服务的地址。
8.SocketPath:服务的套接字路径,可选的,以字符串形式存储。
9.TaggedAddresses:带标签的地址,以键值对的形式存储,键是标签,值是 ServiceAddress 类型。
10.Weights:服务的权重,类型为 AgentWeights。
11.EnableTagOverride:是否启用标签覆盖。
12.CreateIndex:创建索引,无符号整数类型,用于 JSON 序列化时忽略。
13.ModifyIndex:修改索引,无符号整数类型,用于 JSON 序列化时忽略。
14.ContentHash:内容哈希,字符串类型,用于 JSON 序列化时忽略。
15.Proxy:代理配置,类型为 AgentServiceConnectProxyConfig,可选的。
16.Connect:连接信息,类型为 AgentServiceConnect,可选的。
17.PeerName:对等名称,字符串类型,可选的。
18.Namespace:命名空间,字符串类型,用于 JSON 序列化时忽略。
19.Partition:分区,字符串类型,用于 JSON 序列化时忽略。
20.Datacenter:数据中心,字符串类型,用于 JSON 序列化时忽略。
21.Locality:地点信息,类型为 Locality,用于 JSON 序列化时忽略。    

打印的结果如下:

服务的类型Kind:
服务的ID:  him-service-1
服务的名字Service:  him-service
服务的标签Tags:  [tag1 tag2]
服务的元数据Meta:  map[]
服务的端口Port:  8082
服务的地址Address:  127.0.0.4
服务的套接字路径SocketPath:
服务的带标签的地址TaggedAddresses:  map[lan_ipv4:{127.0.0.4 8082} wan_ipv4:{127.0.0.4 8082}]
服务的权重Weights:  {1 1}
服务是否启用标签覆盖EnableTagOverride:  false
服务创建的索引CreateIndex:  5235
服务修改的索引ModifyIndex:  5235
服务内容哈希ContentHash:
服务代理配置Proxy:  &{[]    0   <nil> map[] [] {} {false []} <nil>}
服务连接信息Connect:  &{false <nil>}
服务对等名称PeerName:
服务的命名空间Namespace:
服务的数据中心Datacenter:
服务的分区Partition:
服务的地点信息Locality:  <nil>

同下:

在这里插入图片描述


注册Consul服务

package mainimport ("fmt""github.com/hashicorp/consul/api"
)func main() {//写api的配置信息config := api.DefaultConfig()//注册到consul上的地址config.Address = "127.0.0.1:8500" // Consul 服务器地址//将config注册到客户端,由客户端实现client, err := api.NewClient(config)if err != nil {panic(err)}// 创建一个新的服务条目registration := new(api.AgentServiceRegistration)registration.ID = "my-service-3"registration.Name = "my-service"registration.Port = 8083registration.Address = "127.0.0.1"registration.Tags = []string{"tag1", "tag2"}reg := &api.AgentServiceRegistration{Name:    registration.Name,    // 服务名称ID:      registration.ID,      // 服务 ID,必须唯一Address: registration.Address, //服务的地址Port:    registration.Port,    // 服务端口 服务所在的监听端口Tags:    registration.Tags,    // 可选:服务标签}// 将服务注册到 Consulerr = client.Agent().ServiceRegister(reg)if err != nil {panic(err)}fmt.Println("Service registered successfully")}

注销Consul服务

package mainimport ("fmt""log""github.com/hashicorp/consul/api"
)func main() {// 创建Consul客户端config := api.DefaultConfig()client, err := api.NewClient(config)if err != nil {log.Fatal(err)}// 创建注销的服务IDserviceID := "my-service-2"// 注销服务agent := client.Agent()if err := agent.ServiceDeregister(serviceID); err != nil {log.Fatal(err)}fmt.Println("Service deregistered successfully")
}

实时同步更新Consul服务的信息

package mainimport ("fmt""github.com/hashicorp/consul/api""time"
)type Service struct {Name    stringID      stringAddress stringPort    intTags    []string
}var serviceMap = map[string][]map[string]Service{}func main() {serviceMap = make(map[string][]map[string]Service)// 创建Consul客户端config := api.DefaultConfig()client, err := api.NewClient(config)if err != nil {panic(err)}for {// 查询Consul客户端的服务目录,得到所有的服务名称。catalog := client.Catalog()//得到consul上的所有服务servicesallServices, _, err := catalog.Services(nil)if err != nil {panic(err)}fmt.Println("所有服务services的名称:", allServices)//创建当前的ServiceMap记录当前的Service 出现则标记为truevar currentServiceMap map[string]boolcurrentServiceMap = make(map[string]bool, 0)//遍历所有的Consul服务,将所有的Consul服务的信息存入map中。//如服务: my-service first-servicefor serviceName, _ := range allServices {currentServiceMap[serviceName] = true// 通过服务名称查询服务实例 如my-service下有两个服务实例: my-service1 my-service2// services是每个服务下的服务实例集合services, _, err := client.Health().Service(serviceName, "", true, nil)if err != nil {panic(err)}var servcieSlice []map[string]ServiceservcieSlice = make([]map[string]Service, 0)// 遍历服务实例集合for _, service := range services {fmt.Printf("Service %s:%d\n", service.Service.Service, service.Service.Port)//map先定义var instanceMap map[string]Service//定义后用make进行创建instanceMap = make(map[string]Service)//每一个服务的实例ID对应该服务的信息instanceMap[service.Service.ID] = Service{Name:    service.Service.Service,ID:      service.Service.ID,Address: service.Service.Address,Port:    service.Service.Port,Tags:    service.Service.Tags,}fmt.Println(instanceMap)servcieSlice = append(servcieSlice, instanceMap)将serviceMap全局map之前出现过的service.Service.Service不存在则赋值为nil//if _, ok := serviceMap[service.Service.Service]; !ok {//	serviceMap[service.Service.Service] = nil//}//service.Service.Service作为serviceMap的键,map[string]Service{}作为值存储入map中。//这里最后一个值会覆盖掉同一个键前面多个值,采用一个值对应一个map的数组serviceMap[service.Service.Service] = servcieSlice}fmt.Println(serviceMap)}//serviceMap为全局的Map,最后遍历一遍serviceMap,看一下里面的serviceName在当前的currentServiceMap中能否找到。//如果说找不到,则把serviceMap中这个serviceName服务给删除掉。for serviceName := range serviceMap {//遍历一遍全局的serviceMap,把不存在的服务删除掉。//如果说currentServiceMap不存在serviceName,则把serviceName从serviceMap中移除。if _, ok := currentServiceMap[serviceName]; !ok {delete(serviceMap, serviceName)}}time.Sleep(10 * time.Second)}
}

长轮询方式监控服务

package mainimport ("fmt""github.com/hashicorp/consul/api"
)func main() {// 创建Consul客户端配置config := api.DefaultConfig()client, err := api.NewClient(config)if err != nil {panic(err)}// 创建WatchParamsparams := &api.QueryOptions{WaitIndex: 0,    // 初始的索引,设置为0表示从最新的变更开始监听WaitTime:  1000, // 设置长轮询的等待时间,单位为秒}// 循环监听服务变化for {// 查询Consul客户端的服务目录,得到所有的服务名称catalog := client.Catalog()allServices, _, err := catalog.Services(params)if err != nil {panic(err)}fmt.Println("所有服务的名称:", allServices)// 查询服务健康状态//for serviceName := range allServices {services, _, err := client.Health().Service("my-service", "", true, params)if err != nil {panic(err)}for _, service := range services {fmt.Printf("服务: %s, 端口号: %v\n", service.Service.Service, service.Service.Port)}//}// 更新WaitIndex,以便下次长轮询从更新后的索引开始params.WaitIndex = 0 // 使用长轮询时,将WaitIndex设置为0,以获取最新的变更}
}

Watch机制监控注册的服务变化

consul官方提供了Golang版的watch包。其实际上也是对watch机制进行了一层封装,最终代码实现的还是对consul HTTP API 的 endpoints的使用。 文章开始说过,“在监控到数据变化后,还可以调用外部处理程序”。是了,数据变化后调用外部处理程序才是有意义的,Golang的watch包中对应的外部处理程序是一个函数handler。因为业务的关系,这里只实现了watch对service的变化的监控,其主要创建了一个plan 来对整个服务的变化做一个监控,以及再为每个服务创建一个 plan,对单个服务变化作监控。话不多说,上代码:

//Watch机制同步
package mainimport ("fmt""github.com/hashicorp/consul/api""github.com/hashicorp/consul/api/watch""log"
)var serviceMap map[string][]map[string]Servicetype Service struct {Name    stringID      stringAddress stringPort    intTags    []string
}func main() {serviceMap = make(map[string][]map[string]Service)// 创建Consul客户端config := api.DefaultConfig()client, err := api.NewClient(config)if err != nil {panic(err)}// 初始化监视器计划params := map[string]interface{}{"type": "services"}plan, err := watch.Parse(params)if err != nil {log.Fatal(err)}// 设置监视器的处理函数plan.Handler = func(idx uint64, data interface{}) {services, ok := data.(map[string][]string)if !ok {log.Println("Error: Data format unexpected")return}// 重置服务映射serviceMap = make(map[string][]map[string]Service)// 在这里阻塞住了// 遍历服务列表for serviceName := range services {// 查询服务实例instances, _, err := client.Health().Service(serviceName, "", true, nil)if err != nil {log.Printf("Error retrieving instances for service %s: %v\n", serviceName, err)continue}// 创建服务实例切片var serviceInstances []map[string]Service// 遍历服务实例for _, instance := range instances {service := Service{Name:    instance.Service.Service,ID:      instance.Service.ID,Address: instance.Service.Address,Port:    instance.Service.Port,Tags:    instance.Service.Tags,}instanceMap := map[string]Service{instance.Service.ID: service}serviceInstances = append(serviceInstances, instanceMap)}// 更新服务映射serviceMap[serviceName] = serviceInstances}// 输出服务映射fmt.Println("Updated Service Map:")for serviceName, instances := range serviceMap {fmt.Println("Service:", serviceName)for _, instance := range instances {for id, service := range instance {fmt.Printf("Instance ID: %s, Address: %s, Port: %d, Tags: %v\n", id, service.Address, service.Port, service.Tags)}}}}// 启动监视器plan.Run("http://localhost:8500")// 保持程序运行,直到手动中断select {}
}

长时间没有响应,则进程结束。
在这里插入图片描述


采用goroutinr消息型监控服务


package mainimport ("context""fmt""github.com/hashicorp/consul/api"
)// 定义服务信息
type Service struct {Name    stringID      stringAddress stringPort    intTags    []string
}// 全部服务信息的字典
var servicesMap = map[string]map[string]Service{}// 记录本次目录存在的服务的字典,布尔型,用于和lastServiceMap进行判断
var currentServicesMap = map[string]bool{}// 记录上次目录存在的服务的字典,布尔型,用于和currentServicesMap进行判断
var lastServiceMap = map[string]bool{}// 取消服务的协程的字典,存的是服务名和上下文cancel方法的映射,用于删除指定的goroutine
var withCancelMap = map[string]context.CancelFunc{}func main() {//初始化mapcurrentServicesMap = make(map[string]bool)lastServiceMap = make(map[string]bool)servicesMap = make(map[string]map[string]Service)withCancelMap = make(map[string]context.CancelFunc)// 创建Consul客户端config := api.DefaultConfig()client, err := api.NewClient(config)if err != nil {panic(err)}// 创建WatchParams//params := &api.QueryOptions{//	WaitIndex: 0, // 初始的索引,设置为0表示从最新的变更开始监听//}//配置参数放在外面,则达到获取目录信息阻塞的效果queryOptions := &api.QueryOptions{WaitIndex: 0, // 初始索引}for {catalog := client.Catalog()// 查询Consul客户端的服务目录,得到所有的服务名称。//得到consul上的所有服务services//目录发生变化再更新,不发生变化则不更新,需要在这里阻塞。allServices, meta, err := catalog.Services(queryOptions)fmt.Println(err)if err != nil {panic(err)}//currentServicesMap记录allServices出现过的服务for name := range allServices {currentServicesMap[name] = true}//遍历一遍上次服务的哈希表lastServiceMapfor lastServiceName := range lastServiceMap {//如果说上次服务的哈希表中存在这个服务,现在遍历目录没有这个服务if allServices[lastServiceName] == nil {//将lastServiceName从当前的currentServicesMap移除delete(currentServicesMap, lastServiceName)}}//如果没有一个检查机制的话,这里相当于一直去读取目录,读完目录后再不断启协程//这里就会造成不断的for死循环,所以需要一个检查机制,控制目录的更新。//当目录没发生更新的时候则阻塞,目录发生更新了则进行检查。//这样就确保了你目录没更新时,我启动的go func就一直在监听服务的变化即可fmt.Println("所有服务services的名称:", allServices)//如果说协程中不启动go routine则相当于每次遍历服务时都去创建go routine 导致启动的go routine数量比较多//每次目录发生变化后,这里就会创建新的协程,就会导致多了几个协程。//正确的话,这里应该是先阻塞,然后如果说哪个协程监听到变化,则这个协程发消息即可//其他的协程不用动,需要编写一个服务的检查机制,让之前启动的goroutine去监控对应的服务即可,其他不变化的go routine不用动。fmt.Println("lastServiceMap:", lastServiceMap)fmt.Println("currentServicesMap:", currentServicesMap)for serviceName := range allServices {if currentServicesMap[serviceName] && lastServiceMap[serviceName] {//如果说上次和这次都存在该服务,说明之前已经创建过了,则跳过fmt.Println("跳过……", serviceName)continue} else if currentServicesMap[serviceName] && !lastServiceMap[serviceName] {//如果说上次不存在,这次存在则说明要进行创建//如果说目录多增加了一个服务,则启动多一个服务。fmt.Println("启动协程……", serviceName)//使用上下文进行协程的注销ctx, cancel := context.WithCancel(context.Background())//存储一个serviceName为键、cancel()方法为值的withCancelMapwithCancelMap[serviceName] = cancelgo syncInfo(ctx, serviceName, client, queryOptions)//range上次的lastServiceMap}}//比较上次目录的服务信息和这次目录的服务信息//执行删除指定go routine的操作for lastName := range lastServiceMap {if !currentServicesMap[lastName] && lastServiceMap[lastName] {//如果说上次该服务存在,这次该服务不存在则说明要进行销毁fmt.Println("进入删除协程函数……", lastName)//当走到这里时,执行serviceName对应的cancel方法cacnelService := withCancelMap[lastName]cacnelService() //取消掉该服务的协程fmt.Println("删除协程……", lastName)//由于该服务lastName已经取消了,则从serviceMap中移除掉delete(servicesMap, lastName)//由于该服务lastName已经取消了,则从lastServiceMap中移除掉delete(lastServiceMap, lastName)//由于该服务lastName已经取消了,则从withCancelMap中移除掉delete(withCancelMap, lastName)}}// 赋值 currentServicesMap 的值给 lastServiceMap,用于下一次的目录信息检查。for key, value := range currentServicesMap {lastServiceMap[key] = value}//fmt.Println(serviceMap)// 更新长轮询参数中的索引queryOptions.WaitIndex = meta.LastIndex}
}/*
将服务的各个实例的信息同步更新到全部服务信息的字典中,对开启服务的协程持续监控服务的信息变化。
当服务不存在时,执行取消服务所在的协程的操作。
*/
func syncInfo(ctx context.Context, serviceName string, client *api.Client, params *api.QueryOptions) {for {select {//执行ctx绑定cancel//执行cancel时,绑定对应的ctx执行Done()方法,取消掉协程。case <-ctx.Done(): //会有一点延迟fmt.Println("协程取消了……")returndefault://params要作为参数传入函数,这样params := &api.QueryOptions{WaitIndex: 0, // 初始的索引,设置为0表示从最新的变更开始监听}fmt.Println("1111111111", serviceName)services, meta, err := client.Health().Service(serviceName, "", true, params)fmt.Println("2222222222", serviceName)if err != nil {panic(err)}//var servcieSlice []map[string]Service//servcieSlice = make([]map[string]Service, 0)// 遍历服务实例集合//map先定义var instanceMap map[string]Service//定义后用make进行创建instanceMap = make(map[string]Service)for _, service := range services {//fmt.Printf("Service %s:%d\n", service.Service.Service, service.Service.Port)//每一个服务的实例ID对应该服务的信息instanceMap[service.Service.ID] = Service{Name:    service.Service.Service,ID:      service.Service.ID,Address: service.Service.Address,Port:    service.Service.Port,Tags:    service.Service.Tags,}//fmt.Println(instanceMap)打印出服务中每个实例的信息servicesMap[service.Service.Service] = instanceMap}fmt.Println(instanceMap)params.WaitIndex = meta.LastIndex // 更新版本,根据和上次的不同进行变化。}}
}

参考网址

https://vearne.cc/archives/13983
https://juejin.cn/post/6984378158347157512
https://juejin.cn/post/6883095345623597064
https://zhuanlan.zhihu.com/p/111673886
https://developer.hashicorp.com/consul/api-docs/catalog


看到这里的小伙伴,恭喜你又掌握了一个技能👊
希望大家能取得胜利,坚持就是胜利💪
我是寸铁!我们下期再见💕


往期好文💕

保姆级教程

【保姆级教程】Windows11下go-zero的etcd安装与初步使用

【保姆级教程】Windows11安装go-zero代码生成工具goctl、protoc、go-zero

【Go-Zero】手把手带你在goland中创建api文件并设置高亮


报错解决

【Go-Zero】Error: user.api 27:9 syntax error: expected ‘:‘ | ‘IDENT‘ | ‘INT‘, got ‘(‘ 报错解决方案及api路由注意事项

【Go-Zero】Error: only one service expected goctl一键转换生成rpc服务错误解决方案

【Go-Zero】【error】 failed to initialize database, got error Error 1045 (28000):报错解决方案

【Go-Zero】Error 1045 (28000): Access denied for user ‘root‘@‘localhost‘ (using password: YES)报错解决方案

【Go-Zero】type mismatch for field “Auth.AccessSecret“, expect “string“, actual “number“报错解决方案

【Go-Zero】Error: user.api 30:2 syntax error: expected ‘)‘ | ‘KEY‘, got ‘IDENT‘报错解决方案

【Go-Zero】Windows启动rpc服务报错panic:context deadline exceeded解决方案


Go面试向

【Go面试向】defer与time.sleep初探

【Go面试向】defer与return的执行顺序初探

【Go面试向】Go程序的执行顺序

【Go面试向】rune和byte类型的认识与使用

【Go面试向】实现map稳定的有序遍历的方式

这篇关于【Consul】基于Golang实现Consul服务的注册、注销、修改、监控注册的服务变化、实时同步服务信息机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/880700

相关文章

详谈redis跟数据库的数据同步问题

《详谈redis跟数据库的数据同步问题》文章讨论了在Redis和数据库数据一致性问题上的解决方案,主要比较了先更新Redis缓存再更新数据库和先更新数据库再更新Redis缓存两种方案,文章指出,删除R... 目录一、Redis 数据库数据一致性的解决方案1.1、更新Redis缓存、删除Redis缓存的区别二

python使用watchdog实现文件资源监控

《python使用watchdog实现文件资源监控》watchdog支持跨平台文件资源监控,可以检测指定文件夹下文件及文件夹变动,下面我们来看看Python如何使用watchdog实现文件资源监控吧... python文件监控库watchdogs简介随着Python在各种应用领域中的广泛使用,其生态环境也

el-select下拉选择缓存的实现

《el-select下拉选择缓存的实现》本文主要介绍了在使用el-select实现下拉选择缓存时遇到的问题及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录项目场景:问题描述解决方案:项目场景:从左侧列表中选取字段填入右侧下拉多选框,用户可以对右侧

Golang使用minio替代文件系统的实战教程

《Golang使用minio替代文件系统的实战教程》本文讨论项目开发中直接文件系统的限制或不足,接着介绍Minio对象存储的优势,同时给出Golang的实际示例代码,包括初始化客户端、读取minio对... 目录文件系统 vs Minio文件系统不足:对象存储:miniogolang连接Minio配置Min

Python pyinstaller实现图形化打包工具

《Pythonpyinstaller实现图形化打包工具》:本文主要介绍一个使用PythonPYQT5制作的关于pyinstaller打包工具,代替传统的cmd黑窗口模式打包页面,实现更快捷方便的... 目录1.简介2.运行效果3.相关源码1.简介一个使用python PYQT5制作的关于pyinstall

使用Python实现大文件切片上传及断点续传的方法

《使用Python实现大文件切片上传及断点续传的方法》本文介绍了使用Python实现大文件切片上传及断点续传的方法,包括功能模块划分(获取上传文件接口状态、临时文件夹状态信息、切片上传、切片合并)、整... 目录概要整体架构流程技术细节获取上传文件状态接口获取临时文件夹状态信息接口切片上传功能文件合并功能小

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

python实现自动登录12306自动抢票功能

《python实现自动登录12306自动抢票功能》随着互联网技术的发展,越来越多的人选择通过网络平台购票,特别是在中国,12306作为官方火车票预订平台,承担了巨大的访问量,对于热门线路或者节假日出行... 目录一、遇到的问题?二、改进三、进阶–展望总结一、遇到的问题?1.url-正确的表头:就是首先ur

C#实现文件读写到SQLite数据库

《C#实现文件读写到SQLite数据库》这篇文章主要为大家详细介绍了使用C#将文件读写到SQLite数据库的几种方法,文中的示例代码讲解详细,感兴趣的小伙伴可以参考一下... 目录1. 使用 BLOB 存储文件2. 存储文件路径3. 分块存储文件《文件读写到SQLite数据库China编程的方法》博客中,介绍了文

Redis主从复制实现原理分析

《Redis主从复制实现原理分析》Redis主从复制通过Sync和CommandPropagate阶段实现数据同步,2.8版本后引入Psync指令,根据复制偏移量进行全量或部分同步,优化了数据传输效率... 目录Redis主DodMIK从复制实现原理实现原理Psync: 2.8版本后总结Redis主从复制实