go与kafka(github.com/Shopify/sarama)

2023-12-21 00:08
文章标签 go com kafka github shopify sarama

本文主要是介绍go与kafka(github.com/Shopify/sarama),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

启动kafka

分别命令行启动zookeeper和kafka

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

 producer

import ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll  //赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner  //写到随机分区中,默认设置8个分区config.Producer.Return.Successes = truemsg := &sarama.ProducerMessage{}msg.Topic = `nginx_log`msg.Value = sarama.StringEncoder("this is a good test")client,err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"},config)if err != nil{fmt.Println("producer close err, ",err)return}defer client.Close()pid, offset, err := client.SendMessage(msg)if err != nil{fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

consumer

func main() {var wg sync.WaitGroupconsumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)if err != nil{fmt.Println("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("nginx_log")  //获得该topic所有的分区if err != nil{fmt.Println("Failed to get the list of partition:, ",err)return}fmt.Println(partitionList)for partition := range partitionList{pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)if err != nil{fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}wg.Add(1)go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值for msg := range pc.Messages(){  //阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key),string(msg.Value))}defer pc.AsyncClose()wg.Done()}(pc)}wg.Wait()consumer.Close()
}

 

这篇关于go与kafka(github.com/Shopify/sarama)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/518022

相关文章

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

go中空接口的具体使用

《go中空接口的具体使用》空接口是一种特殊的接口类型,它不包含任何方法,本文主要介绍了go中空接口的具体使用,具有一定的参考价值,感兴趣的可以了解一下... 目录接口-空接口1. 什么是空接口?2. 如何使用空接口?第一,第二,第三,3. 空接口几个要注意的坑坑1:坑2:坑3:接口-空接口1. 什么是空接

利用Go语言开发文件操作工具轻松处理所有文件

《利用Go语言开发文件操作工具轻松处理所有文件》在后端开发中,文件操作是一个非常常见但又容易出错的场景,本文小编要向大家介绍一个强大的Go语言文件操作工具库,它能帮你轻松处理各种文件操作场景... 目录为什么需要这个工具?核心功能详解1. 文件/目录存javascript在性检查2. 批量创建目录3. 文件

Go语言中最便捷的http请求包resty的使用详解

《Go语言中最便捷的http请求包resty的使用详解》go语言虽然自身就有net/http包,但是说实话用起来没那么好用,resty包是go语言中一个非常受欢迎的http请求处理包,下面我们一起来学... 目录安装一、一个简单的get二、带查询参数三、设置请求头、body四、设置表单数据五、处理响应六、超

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

Golang基于内存的键值存储缓存库go-cache

《Golang基于内存的键值存储缓存库go-cache》go-cache是一个内存中的key:valuestore/cache库,适用于单机应用程序,本文主要介绍了Golang基于内存的键值存储缓存库... 目录文档安装方法示例1示例2使用注意点优点缺点go-cache 和 Redis 缓存对比1)功能特性

Go 1.23中Timer无buffer的实现方式详解

《Go1.23中Timer无buffer的实现方式详解》在Go1.23中,Timer的实现通常是通过time包提供的time.Timer类型来实现的,本文主要介绍了Go1.23中Timer无buff... 目录Timer 的基本实现无缓冲区的实现自定义无缓冲 Timer 实现更复杂的 Timer 实现总结在

Go使用pprof进行CPU,内存和阻塞情况分析

《Go使用pprof进行CPU,内存和阻塞情况分析》Go语言提供了强大的pprof工具,用于分析CPU、内存、Goroutine阻塞等性能问题,帮助开发者优化程序,提高运行效率,下面我们就来深入了解下... 目录1. pprof 介绍2. 快速上手:启用 pprof3. CPU Profiling:分析 C

使用Go语言开发一个命令行文件管理工具

《使用Go语言开发一个命令行文件管理工具》这篇文章主要为大家详细介绍了如何使用Go语言开发一款命令行文件管理工具,支持批量重命名,删除,创建,移动文件,需要的小伙伴可以了解下... 目录一、工具功能一览二、核心代码解析1. 主程序结构2. 批量重命名3. 批量删除4. 创建文件/目录5. 批量移动三、如何安

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D