大数据-SparkStreaming(九)

2024-02-28 07:59
文章标签 数据 sparkstreaming

本文主要是介绍大数据-SparkStreaming(九),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

                         大数据-SparkStreaming(九)

SparkStreaming调优

  • 调整BlockReceiver的数量

案例演示:

val kafkaStream = {  val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"  val kafkaParams = Map(  "zookeeper.connect" -> "node01:2181,node02:2181,node03:2181",  "group.id" -> "spark-streaming-test")  val inputTopic = "test"  val numPartitionsOfInputTopic = 3  val streams = (1 to numPartitionsOfInputTopic) map  {x =>  KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1),      StorageLevel.MEMORY_ONLY_SER).map(_._2)  }  val unifiedStream = ssc.union(streams) 
  •  调整Block的数量

batchInterval : 触发批处理的时间间隔
blockInterval :将接收到的数据生成Block的时间间隔,spark.streaming.blockInterval(默认是200ms),那么,BlockRDD的分区数 = batchInterval / blockInterval,即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10,如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval,blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了。

  • 调整Receiver的接受速率

pps:permits per second 每秒允许接受的数据量(QPS -> queries per second)
Spark Streaming默认的PPS是没有限制的,可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue

  • 调整数据处理的并行度

BlockRDD的分区数

a. 通过Receiver接受数据的特点决定

b. 也可以自己通过repartition设置

ShuffleRDD的分区数

a. 默认的分区数为spark.default.parallelism(core的大小)

b. 通过我们自己设置决定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
  • 数据的序列化

SparkStreaming两种需要序列化的数据: a. 输入的数据:默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中 b. 缓存的数据:默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中 使用Kryo序列化机制,比Java序列化机制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
  • 内存调优

需要内存大小

和transformation的类型有关,如果使用的是updateStateByKey,Window这样的算子,那么内存就要设置得偏大。

数据存储级别

如果把接收到的数据设置的存储级别是MEMORY_DISK这种级别,也就是说如果内存不够可以把数据存储到磁盘上,其实性能还是不好的,性能最好的就是所有的数据都在内存里面,所以如果在资源允许的情况下,把内存调大一点,让所有的数据都存在内存里面。

  • Output Operations性能

保存结果到外部的存储介质中,比如mysql/hbase数据库,使用高性能的算子操作实现。

  • Backpressure(压力反馈)--->背压机制

 

Feedback Loop : 动态使得Streaming app从unstable状态回到stable状态。

从Spark1.5版本开始:spark.streaming.backpressure.enabled = true

  • Elastic Scaling(资源动态分配)

动态分配资源:

批处理动态的决定这个application中需要多少个Executors:

  1. 当一个Executor空闲的时候,将这个Executor杀掉

  2. 当task太多的时候,动态的启动Executors

Streaming分配Executor的原则是比对 process time / batchInterval 的比率。

如果延迟了,那么就自动增加资源。

 

 

从Spark2.0有这个功能,开启资源动态分配: spark.streaming.dynamicAllocation.enabled = true

 

这篇关于大数据-SparkStreaming(九)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754881

相关文章

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内

将sqlserver数据迁移到mysql的详细步骤记录

《将sqlserver数据迁移到mysql的详细步骤记录》:本文主要介绍将SQLServer数据迁移到MySQL的步骤,包括导出数据、转换数据格式和导入数据,通过示例和工具说明,帮助大家顺利完成... 目录前言一、导出SQL Server 数据二、转换数据格式为mysql兼容格式三、导入数据到MySQL数据