大数据-SparkStreaming(七)

2024-02-28 07:59
文章标签 数据 sparkstreaming

本文主要是介绍大数据-SparkStreaming(七),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

                        大数据-SparkStreaming(七)

SparkStreaming语义

  1. At most once  一条记录要么被处理一次,要么没有被处理。
  2. At least once 一条记录可能被处理一次或者多次,可能会重复处理。
  3. Exactly once 一条记录只被处理一次,它最严格,实现起来也是比较困难。数据被处理且只被处理一次。

SparkStreaming与Kafka整合

SparkStreaming整合Kafka官方文档

http://spark.apache.org/docs/2.3.3/streaming-kafka-integration.html

方式一:Receiver-based Approach(不推荐使用)

此方法使用Receiver接收数据。Receiver是使用Kafka高级消费者API实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。但是在默认配置下,此方法可能会在失败时丢失数据(请参阅接收器可靠性。为确保零数据丢失,必须在Spark Streaming中另外启用Write Ahead Logs(在Spark 1.2中引入)。这将同步保存所有收到的Kafka将数据写入分布式文件系统(例如HDFS)上的预写日志,以便在发生故障时可以恢复所有数据,但是性能不好。

  • pom.xml文件添加如下:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.3.3</version>
</dependency>
  • 核心代码:
import org.apache.spark.streaming.kafka._val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
  • 代码演示:
package com.kaikeba.streaming.kafkaimport org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** sparkStreaming使用kafka 0.8API基于recevier来接受消息*/
object KafkaReceiver08 {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)//1、创建StreamingContext对象val sparkConf= new SparkConf().setAppName("KafkaReceiver08").setMaster("local[2]")//开启WAL机制.set("spark.streaming.receiver.writeAheadLog.enable", "true")val ssc = new StreamingContext(sparkConf,Seconds(2))//需要设置checkpoint,将接受到的数据持久化写入到hdfs上ssc.checkpoint("hdfs://node01:8020/wal")//2、接受kafka数据val zkQuorum="node01:2181,node02:2181,node03:2181"val groupid="KafkaReceiver08"val topics=Map("test" ->1)//(String, String) 元组的第一位是消息的key,第二位表示消息的valueval receiverDstream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupid,topics)//3、获取kafka的topic数据val data: DStream[String] = receiverDstream.map(_._2)//4、单词计数val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)//5、打印结果result.print()//6、开启流式计算ssc.start()ssc.awaitTermination()}
}

方式二: Direct Approach (No Receivers)

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。 这种方式有如下优点:

  • 1、简化并行读取:

如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

  • 2、高性能

如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

  • 3、一次且仅一次的事务机制

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

  • 4、降低资源

Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。

  • 5、降低内存

Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。

  • 6、鲁棒性更好

Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。

SparkStreaming与Kafka-0-8整合

  • 支持0.8版本,或者更高的版本
  • pom.xml文件添加内容如下:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.3.3</version>
</dependency>
  • 代码演示:
package com.kaikeba.streaming.kafkaimport kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils/*** sparkStreaming使用kafka 0.8API基于Direct直连来接受消息* spark direct API接收kafka消息,从而不需要经过zookeeper,直接从broker上获取信息。*/
object KafkaDirect08 {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)//1、创建StreamingContext对象val sparkConf= new SparkConf().setAppName("KafkaDirect08").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2))//2、接受kafka数据val  kafkaParams=Map("metadata.broker.list"->"node01:9092,node02:9092,node03:9092","group.id" -> "KafkaDirect08")val topics=Set("test")//使用direct直连的方式接受数据val kafkaDstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)//3、获取kafka的topic数据val data: DStream[String] = kafkaDstream.map(_._2)//4、单词计数val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)//5、打印结果result.print()//6、开启流式计算ssc.start()ssc.awaitTermination()}
}

要想保证数据不丢失,最简单的就是靠checkpoint的机制,但是checkpoint机制有个特点,如果代码升级了,checkpoint机制就失效了。所以如果想实现数据不丢失,那么就需要自己管理offset。

SparkStreaming与Kafka-0-10整合

  • 支持0.10版本,或者更高的版本(推荐使用这个版本)
  • pom.xml文件添加内容如下:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.3.3</version>
</dependency>

代码演示:

package com.kaikeba.streaming.kafkaimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}object KafkaDirect10 {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)//1、创建StreamingContext对象val sparkConf= new SparkConf().setAppName("KafkaDirect10").setMaster("local[2]")val ssc = new StreamingContext(sparkConf,Seconds(2))//2、使用direct接受kafka数据//准备配置val topic =Set("test")val kafkaParams=Map("bootstrap.servers" ->"node01:9092,node02:9092,node03:9092","group.id" -> "KafkaDirect10","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"enable.auto.commit" -> "false")val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = 	 KafkaUtils.createDirectStream[String, String](ssc,//数据本地性策略LocationStrategies.PreferConsistent, //指定要订阅的topicConsumerStrategies.Subscribe[String, String](topic, kafkaParams))//3、对数据进行处理//如果你想获取到消息消费的偏移,这里需要拿到最开始的这个Dstream进行操作//如果你对该DStream进行了其他的转换之后,生成了新的DStream,新的DStream不在保存对应的消息的偏移量kafkaDStream.foreachRDD(rdd =>{//获取消息内容val dataRDD: RDD[String] = rdd.map(_.value())//打印dataRDD.foreach(line =>{println(line)})//4、提交偏移量信息,把偏移量信息添加到kafka中val offsetRanges: Array[OffsetRange] =   rdd.asInstanceOf[HasOffsetRanges].offsetRangeskafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})//5、开启流式计算ssc.start()ssc.awaitTermination()}}

解决SparkStreaming与Kafka0.8版本整合数据不丢失方案

方案设计如下:

一般企业来说无论你是使用哪一套api去消费kafka中的数据,都是设置手动提交偏移量。

如果是自动提交偏移量(默认60s提交一次)这里可能会出现问题?
(1)数据处理失败了,自动提交了偏移量。会出现数据的丢失。
(2)数据处理成功了,自动提交偏移量成功(比较理想),但是有可能出现自动提交偏移量失败。
    会出现把之前消费过的数据再次消费,这里就出现了数据的重复处理。
    自动提交偏移量风险比较高,可能会出现数据丢失或者数据被重复处理,一般来说就手动去提交偏移量,这里我们是可以去操作什么时候去提交偏移量,把偏移量的提交通过消费者程序自己去维护。

代码开发,偏移量存入Zookeeper

package com.kaikeba.streaming.kafkaimport kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 使用直连方式 SparkStreaming连接kafka0.8获取数据* 手动将偏移量数据保存到zookeeper中*/
object KafkaManagerOffset08 {def main(args: Array[String]): Unit = {//todo:1、创建SparkConf 提交到集群中运行 不要设置master参数val conf = new SparkConf().setAppName("KafkaManagerOffset08").setMaster("local[4]")//todo: 2、设置SparkStreaming,并设定间隔时间val ssc = new StreamingContext(conf, Seconds(5))//todo:3、指定相关参数//指定组名val groupID = "consumer-kaikeba"//指定消费者的topic名字val topic = "wordcount"//指定kafka的broker地址val brokerList = "node01:9092,node02:9092,node03:9092"//指定zookeeper的地址,用来存放数据偏移量数据,也可以使用Redis MySQL等val zkQuorum = "node01:2181,node02:2181,node03:2181"//创建Stream时使用的topic名字集合,SparkStreaming可同时消费多个topicval topics: Set[String] = Set(topic)//创建一个 ZKGroupTopicDirs 对象,就是用来指定在zk中的存储目录,用来保存数据偏移量val topicDirs = new ZKGroupTopicDirs(groupID, topic)//获取 zookeeper 中的路径 "/consumers/consumer-kaikeba/offsets/wordcount"val zkTopicPath = topicDirs.consumerOffsetDir//构造一个zookeeper的客户端 用来读写偏移量数据val zkClient = new ZkClient(zkQuorum)//准备kafka的参数val kafkaParams = Map("metadata.broker.list" -> brokerList,"group.id" -> groupID,"enable.auto.commit" -> "false")//todo:4、定义kafkaStream流var kafkaStream: InputDStream[(String, String)] = null//todo:5、获取指定的zk节点的子节点个数val childrenNum = getZkChildrenNum(zkClient,zkTopicPath)//todo:6、判断是否保存过数据 根据子节点的数量是否为0if (childrenNum > 0) {//构造一个map集合用来存放数据偏移量信息var fromOffsets: Map[TopicAndPartition, Long] = Map()//遍历子节点for (i <- 0 until childrenNum) {//获取子节点  /consumers/consumer-kaikeba/offsets/wordcount/0val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/$i")// /wordcount-----0val tp = TopicAndPartition(topic, i)//获取数据偏移量  将不同分区内的数据偏移量保存到map集合中//  wordcount/0 -> 1001fromOffsets += (tp -> partitionOffset.toLong)}// 泛型中 key:kafka中的key   value:hello tom hello jerry//创建函数 解析数据 转换为(topic_name, message)的元组val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())//todo:7、利用底层的API创建DStream 采用直连的方式(之前已经消费了,从指定的位置消费)kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)} else {//todo:7、利用底层的API创建DStream 采用直连的方式(之前没有消费,这是第一次读取数据)//zk中没有子节点数据 就是第一次读取数据 直接创建直连对象kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)}//todo:8、直接操作kafkaStream//依次迭代DStream中的kafkaRDD 只有kafkaRDD才可以强转为HasOffsetRanges  从中获取数据偏移量信息//之后是操作的RDD 不能够直接操作DStream 因为调用Transformation方法之后就不是kafkaRDD了获取不了偏移量信息kafkaStream.foreachRDD(kafkaRDD => {//强转为HasOffsetRanges 获取offset偏移量数据val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges//获取数据val lines: RDD[String] =kafkaRDD.map(_._2)//todo:9、接下来就是对RDD进行操作 触发actionlines.foreachPartition(patition => {patition.foreach(x => println(x))})//todo: 10、手动提交偏移量到zk集群上for (o <- offsetRanges) {//拼接zk路径   /consumers/consumer-kaikeba/offsets/wordcount/0val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"//将 partition 的偏移量数据 offset 保存到zookeeper中ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)}})//开启SparkStreaming 并等待退出ssc.start()ssc.awaitTermination()}/*** 获取zk节点上的子节点的个数* @param zkClient* @param zkTopicPath* @return*/def getZkChildrenNum(zkClient:ZkClient,zkTopicPath:String):Int ={//查询该路径下是否有子节点,即是否有分区读取数据记录的读取的偏移量// /consumers/consumer-kaikeba/offsets/wordcount/0// /consumers/consumer-kaikeba/offsets/wordcount/1// /consumers/consumer-kaikeba/offsets/wordcount/2//子节点的个数val childrenNum: Int = zkClient.countChildren(zkTopicPath)childrenNum}
}

 

 

这篇关于大数据-SparkStreaming(七)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/754880

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X

pandas数据过滤

Pandas 数据过滤方法 Pandas 提供了多种方法来过滤数据,可以根据不同的条件进行筛选。以下是一些常见的 Pandas 数据过滤方法,结合实例进行讲解,希望能帮你快速理解。 1. 基于条件筛选行 可以使用布尔索引来根据条件过滤行。 import pandas as pd# 创建示例数据data = {'Name': ['Alice', 'Bob', 'Charlie', 'Dav

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者