go-zero微服务到k8s部署应有尽有系列(八)消息、延迟、定时队列

本文主要是介绍go-zero微服务到k8s部署应有尽有系列(八)消息、延迟、定时队列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

八、消息、延迟、定时队列

本项目地址 : https://github.com/Mikaelemmmm/go-zero-looklook

1、概述

消息队列有很多种,有rabbitmq、rocketmq、kafka等常用的,其中go-queue(https://github.com/zeromicro/go-queue)是go-zero官方开发的消息队列组件,其中分为2类,一种是kq、一种是dq,kq是基于kafka的消息队列,dq是基于beanstalkd的延迟队列,但是go-queue不支持定时任务。具体想更多了解go-queue的我之前也写过一篇教程可以去看一下这里不细说了。

本项目采用的是go-queue做消息队列,asynq做延迟队列、定时队列

为什么使用asynq的几个原因

  • 直接基于redis,一般项目都有redis,而asynq本身就是基于redis所以可以少维护一个中间件
  • 支持消息队列、延迟队列、定时任务调度 , 因为希望项目支持定时任务而asynq直接就支持
  • 有webui界面,每个任务都可以暂停、归档、通过ui界面查看成功失败、监控

为什么asynq支持消息队列还在使用go-queue?

  • kafka的吞吐是业绩出名的,如果前期量不大可以直接用asynq
  • 没啥目的,就是想给你们演示一下go-queue

在我们使用go-zero的时候,goctl给我们带了很大的便利,但是目前go-zero只有生成api、rpc,很多同学在群里问定时任务、延迟队列、消息队列如何生成,目录结构该怎样做,其实go-zero是为我们设计好了的,就是serviceGroup,使用serviceGroup管理你的服务。

2、如何使用

在前面订单、消息等场景我们其实已经演示过了,这里在额外单独补充一次

我们还是拿order-mq来举例子,显然使用goctl生成api、rpc不是我们想要的,那我们就自己使用serviceGroup改造,目录结构还是延续api的基本差不多,只是将handler改成了listen , 将logic换成了mqs 。

2.1 在main中代码如下
var configFile = flag.String("f", "etc/order.yaml", "Specify the config file")func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)// log、prometheus、trace、metricsUrl.if err := c.SetUp(); err != nil {panic(err)}serviceGroup := service.NewServiceGroup()defer serviceGroup.Stop()for _, mq := range listen.Mqs(c) {serviceGroup.Add(mq)}serviceGroup.Start()
}
  • 首先我们要定义配置以及解析配置。

  • 其次为什么我们要在这里加SetUp而api、rpc不需要呢?因为api、rpc都是在MustNewServer中已经框架写的,但是我们用serviceGroup管理没有,可以手动点进去SetUp看看,这个方法中包含了log、prometheus、trace、metricsUrl的定义,一个方法可以省很多事情,这样我们直接修改配置文件就可以实现日志、监控、链路追踪了。

  • 接下来就是go-zero的serivceGroup管理服务了,serviceGroup是用来管理一组service的,那service其实就是一个接口,代码如下

    Service (代码在go-zero/core/service/servicegroup.go)

    // Service is the interface that groups Start and Stop methods.Service interface {Starter //StartStopper //Stop}
    

    所以,只要你的服务实现了这2个接口,就可以加入到serviceGroup统一管理

    那可以看到我们把所有的mq都实现这个接口,然后统一放到都 list.Mqs中,在启动服务即可

2.2 mq分类管理

go-zero-looklook/app/order/cmd/mq/internal/listen目录下代码

该目录下代码是统一管理不同类型mq,因为我们要管理kq、asynq可能后续还有rabbitmq、rocketmq等等,所以在这里做了分类方便维护

统一管理在go-zero-looklook/app/order/cmd/mq/internal/listen/listen.go,然后在main中调用listen.Mqs可以获取所有mq一起start

//返回所有消费者
func Mqs(c config.Config) []service.Service {svcContext := svc.NewServiceContext(c)ctx := context.Background()var services []service.Service//kq :消息队列.services = append(services, KqMqs(c, ctx, svcContext)...)//asynq : 延迟队列、定时任务services = append(services, AsynqMqs(c, ctx, svcContext)...)//other mq ....return services
}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的asynq

//asynq
//定时任务、延迟任务
func AsynqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {return []service.Service{//监听延迟队列deferMq.NewAsynqTask(ctx, svcContext),//监听定时任务}}

go-zero-looklook/app/order/cmd/mq/internal/listen/asynqMqs.go就是定义的kq (go-queue的kafka)

//kq
//消息队列
func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {return []service.Service{//监听消费流水状态变更kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),//.....}}
2.3 实际业务

编写实际业务,我们就在go-zero-looklook/app/order/cmd/mq/internal/listen/mqs下,这里为了方便维护,也是做了分类

  • deferMq : 延迟队列
  • kq:消息队列
2.3.1 延迟队列
/**
监听关闭订单
*/
type AsynqTask struct {ctx    context.ContextsvcCtx *svc.ServiceContext
}func NewAsynqTask(ctx context.Context, svcCtx *svc.ServiceContext) *AsynqTask {return &AsynqTask{ctx:    ctx,svcCtx: svcCtx,}
}func (l *AsynqTask) Start() {fmt.Println("AsynqTask start ")srv := asynq.NewServer(asynq.RedisClientOpt{Addr: l.svcCtx.Config.Redis.Host, Password: l.svcCtx.Config.Redis.Pass},asynq.Config{Concurrency: 10,Queues: map[string]int{"critical": 6,"default":  3,"low":      1,},},)mux := asynq.NewServeMux()//关闭民宿订单任务mux.HandleFunc(asynqmq.TypeHomestayOrderCloseDelivery, l.closeHomestayOrderStateMqHandler)if err := srv.Run(mux); err != nil {log.Fatalf("could not run server: %v", err)}
}func (l *AsynqTask) Stop() {fmt.Println("AsynqTask stop")
}

因为asynq 要先启动,然后定义路由任务,所有我们在asynqTask.go中做了统一的路由管理,之后我们每个业务都单独的在deferMq的文件夹下面定义一个文件(如“延迟关闭订单:closeHomestayOrderState.go”),这样每个业务一个文件,跟go-zero的api、rpc的logic一样,维护很方便

closeHomestayOrderState.go 关闭订单逻辑

package deferMqimport ("context""encoding/json""looklook/app/order/cmd/rpc/order""looklook/app/order/model""looklook/common/asynqmq""looklook/common/xerr""github.com/hibiken/asynq""github.com/pkg/errors"
)func (l *AsynqTask) closeHomestayOrderStateMqHandler(ctx context.Context, t *asynq.Task) error {var p asynqmq.HomestayOrderCloseTaskPayloadif err := json.Unmarshal(t.Payload(), &p); err != nil {return errors.Wrapf(xerr.NewErrMsg("解析asynq task payload err"), "closeHomestayOrderStateMqHandler payload err:%v, payLoad:%+v", err, t.Payload())}resp, err := l.svcCtx.OrderRpc.HomestayOrderDetail(ctx, &order.HomestayOrderDetailReq{Sn: p.Sn,})if err != nil || resp.HomestayOrder == nil {return errors.Wrapf(xerr.NewErrMsg("获取订单失败"), "closeHomestayOrderStateMqHandler 获取订单失败 or 订单不存在 err:%v, sn:%s ,HomestayOrder : %+v", err, p.Sn, resp.HomestayOrder)}if resp.HomestayOrder.TradeState == model.HomestayOrderTradeStateWaitPay {_, err := l.svcCtx.OrderRpc.UpdateHomestayOrderTradeState(ctx, &order.UpdateHomestayOrderTradeStateReq{Sn:         p.Sn,TradeState: model.HomestayOrderTradeStateCancel,})if err != nil {return errors.Wrapf(xerr.NewErrMsg("关闭订单失败"), "closeHomestayOrderStateMqHandler 关闭订单失败  err:%v, sn:%s ", err, p.Sn)}}return nil
}
2.3.2 kq消息队列

看go-zero-looklook/app/order/cmd/mq/internal/mqs/kq文件夹下,因为kq跟asynq不太一样,它本身就是使用go-zero的Service管理的,已经实现了starter、stopper接口了,所以我们在/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go中直接定义好一个go-queue业务扔给serviceGroup,去交给main启动就好了 , 我们的业务代码只需要实现go-queue的Consumer直接写我们自己业务即可。

1)/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/listen/kqMqs.go

func KqMqs(c config.Config, ctx context.Context, svcContext *svc.ServiceContext) []service.Service {return []service.Service{//监听消费流水状态变更kq.MustNewQueue(c.PaymentUpdateStatusConf, kqMq.NewPaymentUpdateStatusMq(ctx, svcContext)),//.....}
}

可以看到kq.MustNewQueue本身返回就是 queue.MessageQueue , queue.MessageQueue又实现了Start、Stop

2)业务中

/Users/seven/Developer/goenv/go-zero-looklook/app/order/cmd/mq/internal/mqs/kq/paymentUpdateStatus.go

func (l *PaymentUpdateStatusMq) Consume(_, val string) error {	fmt.Printf(" PaymentUpdateStatusMq Consume val : %s \n", val)	//解析数据	var message kqueue.ThirdPaymentUpdatePayStatusNotifyMessage	if err := json.Unmarshal([]byte(val), &message); err != nil {		logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->Consume Unmarshal err : %v , val : %s", err, val)		return err	}	//执行业务..	if err := l.execService(message); err != nil {		logx.WithContext(l.ctx).Error("PaymentUpdateStatusMq->execService  err : %v , val : %s , message:%+v", err, val, message)		return err	}	return nil}

我们在paymentUpdateStatus.go中只需要实现接口Consume 就可以接受来自kq传过来的kafka的消息了,我们只管在我们Consumer中处理我们业务即可

3、定时任务

关于定时任务,目前go-zero-looklook没有使用,这里我也说明一下

  • 如果你想简单一点直接使用cron(裸机、k8s都有),
  • 如果稍微复杂一点可以使用https://github.com/robfig/cron包,在代码中定义时间
  • 使用xxl-job、gocron分布式定时任务系统接入
  • asynq的shedule

这里因为项目用的asynq,我就演示一下asynq的shedule吧

分为client与server , client用来定义调度时间,server是到了时间接受client的消息触发来执行我们写的业务的,实际业务我们应该写在server,client用来定义业务调度时间的

asynqtest/docker-compose.yml

version: '3'services:  #asynqmon asynq延迟队列、定时队列的webui  asynqmon:    image: hibiken/asynqmon:latest    container_name: asynqmon_asynq    ports:      - 8980:8080    command:      - '--redis-addr=redis:6379'      - '--redis-password=G62m50oigInC30sf'    restart: always    networks:      - asynqtest_net    depends_on:      - redis    #redis容器  redis:    image: redis:6.2.5    container_name: redis_asynq    ports:      - 63779:6379    environment:      # 时区上海      TZ: Asia/Shanghai    volumes:      # 数据文件      - ./data/redis/data:/data:rw    command: "redis-server --requirepass G62m50oigInC30sf  --appendonly yes"    privileged: true    restart: always    networks:      - asynqtest_netnetworks:  asynqtest_net:    driver: bridge    ipam:      config:        - subnet: 172.22.0.0/16

asynqtest/shedule/client/client.go

package mainimport (	"asynqtest/tpl"	"encoding/json"	"log"	"github.com/hibiken/asynq")const redisAddr = "127.0.0.1:63779"const redisPwd = "G62m50oigInC30sf"func main() {	// 周期性任务	scheduler := asynq.NewScheduler(		asynq.RedisClientOpt{			Addr:     redisAddr,			Password: redisPwd,		}, nil)	payload, err := json.Marshal(tpl.EmailPayload{Email: "546630576@qq.com", Content: "发邮件呀"})	if err != nil {		log.Fatal(err)	}	task := asynq.NewTask(tpl.EMAIL_TPL, payload)	// 每隔1分钟同步一次	entryID, err := scheduler.Register("*/1 * * * *", task)	if err != nil {		log.Fatal(err)	}	log.Printf("registered an entry: %q\n", entryID)	if err := scheduler.Run(); err != nil {		log.Fatal(err)	}}

asynqtest/shedule/server/server.go

package mainimport (	"context"	"encoding/json"	"fmt"	"log"	"asynqtest/tpl"	"github.com/hibiken/asynq")func main() {	srv := asynq.NewServer(		asynq.RedisClientOpt{Addr: "127.0.0.1:63779", Password: "G62m50oigInC30sf"},		asynq.Config{			Concurrency: 10,			Queues: map[string]int{				"critical": 6,				"default":  3,				"low":      1,			},		},	)	mux := asynq.NewServeMux()	//关闭民宿订单任务	mux.HandleFunc(tpl.EMAIL_TPL, emailMqHandler)	if err := srv.Run(mux); err != nil {		log.Fatalf("could not run server: %v", err)	}}func emailMqHandler(ctx context.Context, t *asynq.Task) error {	var p tpl.EmailPayload	if err := json.Unmarshal(t.Payload(), &p); err != nil {		return fmt.Errorf("emailMqHandler err:%+v", err)	}	fmt.Printf("p : %+v \n", p)	return nil}

asynqtest/tpl/tpl.go

package tplconst EMAIL_TPL = "schedule:email"type EmailPayload struct {	Email   string	Content string}

启动server.go、client.go

浏览器输入http://127.0.0.1:8980/schedulers这里 可以看到所有client定义的任务

在这里插入图片描述

浏览器输入http://127.0.0.1:8990/这里可以看到我们的server消费请

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cCrA6TOc-1645068713854)(./images/8/image-20220128113651818.png)]控制台消费情况
在这里插入图片描述

在这里插入图片描述

说一下asynq的shedule在集成到项目中的思路,可以单独启动一个服务作为调度client定义系统的定时任务调度管理,将server定义在每个业务自己的mq的asynq一起即可。

4、结尾

在这一节中,我们学会使用了消息队列、延迟队列 ,kafka可以通过管理工具去查看,至于asynq查看webui在go-zero-looklook/docker-compose-env.yml中我们已经启动好了asynqmon,直接使用http://127.0.0.1:8980 即可查看

这篇关于go-zero微服务到k8s部署应有尽有系列(八)消息、延迟、定时队列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/714600

相关文章

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

Android 悬浮窗开发示例((动态权限请求 | 前台服务和通知 | 悬浮窗创建 )

《Android悬浮窗开发示例((动态权限请求|前台服务和通知|悬浮窗创建)》本文介绍了Android悬浮窗的实现效果,包括动态权限请求、前台服务和通知的使用,悬浮窗权限需要动态申请并引导... 目录一、悬浮窗 动态权限请求1、动态请求权限2、悬浮窗权限说明3、检查动态权限4、申请动态权限5、权限设置完毕后

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

redis群集简单部署过程

《redis群集简单部署过程》文章介绍了Redis,一个高性能的键值存储系统,其支持多种数据结构和命令,它还讨论了Redis的服务器端架构、数据存储和获取、协议和命令、高可用性方案、缓存机制以及监控和... 目录Redis介绍1. 基本概念2. 服务器端3. 存储和获取数据4. 协议和命令5. 高可用性6.

Go路由注册方法详解

《Go路由注册方法详解》Go语言中,http.NewServeMux()和http.HandleFunc()是两种不同的路由注册方式,前者创建独立的ServeMux实例,适合模块化和分层路由,灵活性高... 目录Go路由注册方法1. 路由注册的方式2. 路由器的独立性3. 灵活性4. 启动服务器的方式5.

TP-Link PDDNS服将于务6月30日正式停运:用户需转向第三方DDNS服务

《TP-LinkPDDNS服将于务6月30日正式停运:用户需转向第三方DDNS服务》近期,路由器制造巨头普联(TP-Link)在用户群体中引发了一系列重要变动,上个月,公司发出了一则通知,明确要求所... 路由器厂商普联(TP-Link)上个月发布公告要求所有用户必须完成实名认证后才能继续使用普联提供的 D

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll