如何管理Spark Streaming消费Kafka的偏移量(三)

2024-05-15 03:08

本文主要是介绍如何管理Spark Streaming消费Kafka的偏移量(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前面的文章已经介绍了在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面,不建议采用其自带的checkpoint来做故障恢复。

在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API也就是更加偏底层的api,我们既可以用checkpoint来容灾,也可以通过低级api来获取偏移量自己管理偏移量,这样以来无论是程序升级,还是故障重启,在框架端都可以做到Exact One准确一次的语义。

本篇文章,会再介绍下,如何手动管理kafka的offset,并给出具体的代码加以分析:

版本:

apache spark streaming2.1

apache kafka 0.9.0.0

手动管理offset的注意点:

(1)第一次项目启动的时候,因为zk里面没有偏移量,所以使用KafkaUtils直接创建InputStream,默认是从最新的偏移量开始消费,这一点可以控制。

(2)如果非第一次启动,zk里面已经存在偏移量,所以我们读取zk的偏移量,并把它传入到KafkaUtils中,从上次结束时的偏移量开始消费处理。

(3)在foreachRDD里面,对每一个批次的数据处理之后,再次更新存在zk里面的偏移量

注意上面的3个步骤,1和2只会加载一次,第3个步骤是每个批次里面都会执行一次。

下面看第一和第二个步骤的核心代码:

/****
    *
    * @param ssc  StreamingContext
    * @param kafkaParams  配置kafka的参数
    * @param zkClient  zk连接的client
    * @param zkOffsetPath zk里面偏移量的路径
    * @param topics     需要处理的topic
    * @return   InputDStream[(String, String)] 返回输入流
    */def createKafkaStream(ssc: StreamingContext,kafkaParams: Map[String, String],zkClient: ZkClient,zkOffsetPath: String,topics: Set[String]): InputDStream[(String, String)]={//目前仅支持一个topic的偏移量处理,读取zk里面偏移量字符串val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last)val kafkaStream = zkOffsetData match {case None =>  //如果从zk里面没有读到偏移量,就说明是系统第一次启动log.info("系统第一次启动,没有读取到偏移量,默认就最新的offset开始消费")//使用最新的偏移量创建DirectStreamKafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)case Some(lastStopOffset) =>log.info("从zk中读取到偏移量,从上次的偏移量开始消费数据......")val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)//使用上次停止时候的偏移量创建DirectStreamKafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler)}kafkaStream//返回创建的kafkaStream}

主要是针对第一次启动,和非首次启动做了不同的处理。

然后看下第三个步骤的代码:

/***** 保存每个批次的rdd的offset到zk中* @param zkClient zk连接的client* @param zkOffsetPath   偏移量路径* @param rdd     每个批次的rdd*/def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {//转换rdd为Array[OffsetRange]val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//转换每个OffsetRange为存储到zk时的字符串格式 :  分区序号1:偏移量1,分区序号2:偏移量2,......val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")log.debug(" 保存的偏移量:  "+offsetsRangesStr)//将最终的字符串结果保存到zk里面ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)}

主要是更新每个批次的偏移量到zk中。

例子已经上传到github中,有兴趣的同学可以参考这个链接:

https://github.com/qindongliang/streaming-offset-to-zk

后续文章会聊一下为了升级应用如何优雅的关闭的流程序,以及在kafka扩展分区时,上面的程序如何自动兼容。

这篇关于如何管理Spark Streaming消费Kafka的偏移量(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/990637

相关文章

高效管理你的Linux系统: Debian操作系统常用命令指南

《高效管理你的Linux系统:Debian操作系统常用命令指南》在Debian操作系统中,了解和掌握常用命令对于提高工作效率和系统管理至关重要,本文将详细介绍Debian的常用命令,帮助读者更好地使... Debian是一个流行的linux发行版,它以其稳定性、强大的软件包管理和丰富的社区资源而闻名。在使用

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

SpringBoot使用minio进行文件管理的流程步骤

《SpringBoot使用minio进行文件管理的流程步骤》MinIO是一个高性能的对象存储系统,兼容AmazonS3API,该软件设计用于处理非结构化数据,如图片、视频、日志文件以及备份数据等,本文... 目录一、拉取minio镜像二、创建配置文件和上传文件的目录三、启动容器四、浏览器登录 minio五、

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

软考系统规划与管理师考试证书含金量高吗?

2024年软考系统规划与管理师考试报名时间节点: 报名时间:2024年上半年软考将于3月中旬陆续开始报名 考试时间:上半年5月25日到28日,下半年11月9日到12日 分数线:所有科目成绩均须达到45分以上(包括45分)方可通过考试 成绩查询:可在“中国计算机技术职业资格网”上查询软考成绩 出成绩时间:预计在11月左右 证书领取时间:一般在考试成绩公布后3~4个月,各地领取时间有所不同

安全管理体系化的智慧油站开源了。

AI视频监控平台简介 AI视频监控平台是一款功能强大且简单易用的实时算法视频监控系统。它的愿景是最底层打通各大芯片厂商相互间的壁垒,省去繁琐重复的适配流程,实现芯片、算法、应用的全流程组合,从而大大减少企业级应用约95%的开发成本。用户只需在界面上进行简单的操作,就可以实现全视频的接入及布控。摄像头管理模块用于多种终端设备、智能设备的接入及管理。平台支持包括摄像头等终端感知设备接入,为整个平台提

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动