Spark Streaming 的 Receiver和 Direct模式

2024-06-10 04:18

本文主要是介绍Spark Streaming 的 Receiver和 Direct模式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 两种模式的原理和区别

Receiver模式

1. Receiver模式下的运行架构

1)InputDStream: 从流数据源接收的输入数据。

2)Receiver:负责接收数据流,并将数据写到本地。

3)Streaming Context:代表SparkStreaming,负责Streaming层面的任务调度,生成jobs发送到Spark engine处理。

4)Spark Context: 代表Spark Core,负责批处理层面的任务调度,真正执行job的Spark engine。

2. Receiver从kafka拉取数据的过程

1)在executor上会有receiver从kafka接收数据并存储在Spark executor中,在到了batch时间后触发job去处理接收到的数据,1个receiver占用1个core;

2)为了不丢数据需要开启WAL机制,这会将receiver接收到的数据写一份备份到第三方系统上(如:HDFS);

3)receiver内部使用kafka High Level API去消费数据及自动更新offset。

Direct模式

1. Direct模式下的运行架构

与receiver模式类似,不同在于executor中没有receiver组件,从kafka拉去数据的方式不同。

2. Direct从kafka拉取数据的过程

该模式下:

1)没有receiver,无需额外的core用于不停地接收数据,而是定期查询kafka中的每个partition的最新的offset,每个批次拉取上次处理的offset和当前查询的offset的范围的数据进行处理;

2)为了不丢数据,无需将数据备份落地,而只需要手动保存offset即可

3)内部使用kafka simple Level API去消费数据, 需要手动维护offset,kafka zk上不会自动更新offset

Receiver与Direct模式的区别

1.前者在executor中有Receiver接受数据,并且1个Receiver占用一个core;而后者无Receiver,所以不会暂用core;

2.前者InputDStream的分区是 num_receiver *batchInterval /  blockInteral,后者的分区数是kafka topic partition的数量。Receiver模式下num_receiver的设置不合理会影响性能或造成资源浪费;如果设置太小,并行度不够,整个链路上接收数据将是瓶颈;如果设置太多,则会浪费资源;

3.前者使用zookeeper来维护consumer的偏移量,而后者需要自己维护偏移量;

4.为了保证不丢失数据,前者需要开启WAL机制,而后者不需要,只需要在程序中成功消费完数据后再更新偏移量即可。

2 Receiver改造成Direct模式

个推使用Spark Streaming做实时处理kafka数据,先前使用的是receiver模式;

receiver有以下特点 

1.receiver模式下,每个receiver需要单独占用一个core;

2.为了保证不丢失数据,需要开启WAL机制,使用checkpoint保存状态;

3.当receiver接受数据速率大于处理数据速率,导致数据积压,最终可能会导致程序挂掉。

由于以上特点,receiver模式下会造成一定的资源浪费;使用checkpoint保存状态, 如果需要升级程序,则会导致checkpoint无法使用;第3点receiver模式下会导致程序不太稳定;并且如果设置receiver数量不合理也会造成性能瓶颈在receiver。为了优化资源和程序稳定性,应将receiver模式改造成direct模式。

修改方式如下:

1. 修改InputDStream的创建

将receiver的:

  1. val kafkaStream = KafkaUtils.createStream(
  2. streamingContext,
  3. [ZK quorum],
  4. [consumer group id],
  5. [per-topic number of Kafka partitions to consume]
  6. )

改成direct的:

  1. val directKafkaStream = KafkaUtils.createDirectStream
  2. [
  3. [key class],
  4. [value class],
  5. [key decoder class],
  6. [value decoder class]
  7. ]
  8. (
  9. streamingContext,
  10. [map of Kafka parameters],
  11. [set of topics to consume]

2. 手动维护offset

receiver模式代码: 
(receiver模式不需要手动维护offset,而是内部通过kafka consumer high level API 提交到kafka/zk保存)

  1. kafkaStream.map {
  2. ...
  3. }.foreachRDD { rdd =>
  4. // 数据处理
  5. doCompute(rdd)
  6. }
  7.  

direct模式代码:

  1. directKafkaStream.map {
  2. ...
  3. }.foreachRDD { rdd =>
  4. // 获取当前rdd数据对应的offset
  5. val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  6. // 数据处理
  7. doCompute(rdd)
  8. // 自己实现保存offset
  9. commitOffsets(offsetRanges)
  10. }

3 其他优化点

1. 在receiver模式下 

1)拆分InputDStream,增加Receiver,从而增加接收数据的并行度;

2)调整blockInterval,适当减小,增加task数量,从而增加并行度(在core的数量>task数量的情况下);

3)如果开启了WAL机制,数据的存储级别设置为MOMERY_AND_DISK_SER。

2.数据序列化 使用Kryoserializationl ,相比Java serializationl 更快,序列化后的数据更小;

3.建议 使用CMS垃圾回收器 降低GC开销;

4. 选择高性能的算子 (mapPartitions, foreachPartitions, aggregateByKey等);

5. repartition的使用: 在streaming程序中因为batch时间特别短,所以数据量一般较小,所以repartition的时间短,可以解决一些因为topicpartition中数据分配不均匀导致的数据倾斜问题;

6.因为SparkStreaming生产的job最终都是在sparkcore上运行的,所以 sparkCore的优化 也很重要;

7. BackPressure流控

1)为什么引入Backpressure? 
当batch processing time>batchinterval 这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题;

2)Backpressure:根据JobScheduler反馈作业的执行信息来动态调整数据接收率;

3)配置使用:

  1.  
  2. spark.streaming.backpressure.enabled
  3. 含义: 是否启用 SparkStreaming内部的backpressure机制,
  4. 默认值:false ,表示禁用
  5. spark.streaming.backpressure.initialRate
  6. 含义: receiver 为第一个batch接收数据时的比率
  7. spark.streaming.receiver.maxRate
  8. 含义: receiver接收数据的最大比率,如果设置值<=0, 则receiver接收数据比率不受限制
  9. spark.streaming.kafka.maxRatePerPartition
  10. 含义: 从每个kafka partition中读取数据的最大比率
  11.  

8.speculation机制

spark内置speculation机制,推测job中的运行特别慢的task,将这些task kill,并重新调度这些task执行。 
默认speculation机制是关闭的,通过以下配置参数开启:

  1.  
  2. spark.speculation=true
  3.  

注意 :在有些情况下,开启speculation反而效果不好,比如:streaming程序消费多个topic时,从kafka读取数据直接处理,没有重新分区,这时如果多个topic的partition的数据量相差较大那么可能会导致正常执行更大数据量的task会被认为执行缓慢,而被中途kill掉,这种情况下可能导致batch的处理时间反而变长;可以通过repartition来解决这个问题,但是要衡量repartition的时间;而在streaming程序中因为batch时间特别短,所以数据量一般较小,所以repartition的时间短,不像spark_batch一次处理大量数据一旦repartition则会特别久,所以最终还是要根据具体情况测试来决定。

4 总结

将Receiver模式改成Direct模式,实现了资源优化,提升了程序的稳定性,缺点是需要自己管理offset,操作相对复杂。 未来,个推将不断探索和优化Spark Streaming技术,发挥其强大的数据处理能力,为建设实时数仓提供保障。

 

 

 

 

 

 

这篇关于Spark Streaming 的 Receiver和 Direct模式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1047194

相关文章

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

模版方法模式template method

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/template-method 超类中定义了一个算法的框架, 允许子类在不修改结构的情况下重写算法的特定步骤。 上层接口有默认实现的方法和子类需要自己实现的方法

【iOS】MVC模式

MVC模式 MVC模式MVC模式demo MVC模式 MVC模式全称为model(模型)view(视图)controller(控制器),他分为三个不同的层分别负责不同的职责。 View:该层用于存放视图,该层中我们可以对页面及控件进行布局。Model:模型一般都拥有很好的可复用性,在该层中,我们可以统一管理一些数据。Controlller:该层充当一个CPU的功能,即该应用程序

迭代器模式iterator

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/iterator 不暴露集合底层表现形式 (列表、 栈和树等) 的情况下遍历集合中所有的元素

《x86汇编语言:从实模式到保护模式》视频来了

《x86汇编语言:从实模式到保护模式》视频来了 很多朋友留言,说我的专栏《x86汇编语言:从实模式到保护模式》写得很详细,还有的朋友希望我能写得更细,最好是覆盖全书的所有章节。 毕竟我不是作者,只有作者的解读才是最权威的。 当初我学习这本书的时候,只能靠自己摸索,网上搜不到什么好资源。 如果你正在学这本书或者汇编语言,那你有福气了。 本书作者李忠老师,以此书为蓝本,录制了全套视频。 试

利用命令模式构建高效的手游后端架构

在现代手游开发中,后端架构的设计对于支持高并发、快速迭代和复杂游戏逻辑至关重要。命令模式作为一种行为设计模式,可以有效地解耦请求的发起者与接收者,提升系统的可维护性和扩展性。本文将深入探讨如何利用命令模式构建一个强大且灵活的手游后端架构。 1. 命令模式的概念与优势 命令模式通过将请求封装为对象,使得请求的发起者和接收者之间的耦合度降低。这种模式的主要优势包括: 解耦请求发起者与处理者

springboot实战学习(1)(开发模式与环境)

目录 一、实战学习的引言 (1)前后端的大致学习模块 (2)后端 (3)前端 二、开发模式 一、实战学习的引言 (1)前后端的大致学习模块 (2)后端 Validation:做参数校验Mybatis:做数据库的操作Redis:做缓存Junit:单元测试项目部署:springboot项目部署相关的知识 (3)前端 Vite:Vue项目的脚手架Router:路由Pina:状态管理Eleme

状态模式state

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/state 在一个对象的内部状态变化时改变其行为, 使其看上去就像改变了自身所属的类一样。 在状态模式中,player.getState()获取的是player的当前状态,通常是一个实现了状态接口的对象。 onPlay()是状态模式中定义的一个方法,不同状态下(例如“正在播放”、“暂停

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa