Structured Streaming的模型介绍与实际操作

2023-11-30 06:44

本文主要是介绍Structured Streaming的模型介绍与实际操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

微批处理(Micro-Batching)

微批处理是 Structured Streaming 默认的处理模型。

微批处理 (Micro-batching):

  • 在微批处理模型中,实时数据流被分割成小的批次。
  • 这些批次按顺序处理,每个批次处理像一个小的批处理作业。
  • 处理完一个批次后,结果被输出,然后处理下一个批次。
  • 这意味着会有一个小的延迟,等于批次的大小,因为系统需要等待整个批次处理完毕才输出结果。
  • 微批处理模型中通常有一个处理周期的概念,即系统以固定的时间间隔处理数据。

优点:

  • 容错性: 基于 Spark 的容错机制,可以容易地恢复状态和输出。
  • 简单性: 开发人员可以使用与批处理相同的API进行流处理,降低了学习曲线。
  • 集成性: 可以与Spark的其他组件(如MLlib、Spark SQL)无缝集成。

缺点:

  • 延迟性: 因为处理是按批次进行的,所以有固有的延迟,通常是秒级。

持续处理(Continuous Processing)

持续处理是 Structured Streaming 在 Spark 2.3 版本中引入的实验性功能。在这种模型中,实时数据流被视为连续的记录流,Spark 引擎以较低的延迟(毫秒级)持续处理每条记录。

持续处理 (Continuous Processing):

  • 持续处理模型中,数据是随着其到达即时处理的。
  • 没有将数据分批处理,而是持续不断地处理流入的数据。
  • 这种模式可以减少延迟,因为数据一到达就开始处理,不必等待。
  • 持续处理模型通常能够提供更低的端到端延迟,但可能需要更复杂的管理状态和容错机制。

优点:

  • 低延迟: 可以实现毫秒级的处理延迟,适用于对延迟敏感的应用。
  • 高吞吐: 由于不需要划分批次,可以连续不断地处理数据,提高了吞吐量。

缺点:

  • 复杂性: 相对于微批处理,需要更复杂的容错机制。
  • 成熟度: 这是一个较新的功能,可能不如微批处理稳定。

这两种模型可以用以下表格进行比较:

特性微批处理持续处理
处理延迟秒级毫秒级
容错性中到高
API一致性与批处理一致与批处理一致
成熟度
吞吐量
复杂性
状态管理容易较复杂
与其他Spark组件集成无缝无缝

在选择模型时,需要根据具体的应用场景、延迟要求和资源情况来决定使用哪种模型。如果应用可以容忍秒级的延迟,微批处理是一个成熟且简单的选择。如果应用需要极低的延迟,可以尝试使用持续处理模型。

​ 微批处理模型中,“写日志”通常是指在处理批次之前记录其信息以便于故障恢复。而在持续处理模型中,“写日志”可能更多地关联于实时记录每个事件或数据项的处理状态。

编写Structured Streaming程序

导入pyspark模块

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
#如果直接使用pyspark交互就不需要导入,但是如果是自己编写python就需要导入模块

创建SparkSession对象:

  • 在任何Spark应用程序中,第一步是创建一个SparkSession对象。这是Structured Streaming编程的入口点。
from pyspark.sql import SparkSessionspark = SparkSession \.builder \.appName("Structured Streaming App") \ .getOrCreate()
spark.sparkContext.setLogLevel("warn")

稍微讲解一下appName得是被唯一标识的,spark.sparkContext.setLogLevel(“warn”)是设置日志显示级别,无关紧要的就不输出

定义输入源:

  • 定义输入数据源以及如何读取数据。Structured Streaming支持多种输入源,如Kafka、文件系统、Socket等。
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()

定义转换:

  • 对数据流进行转换处理,比如选择需要的字段、进行聚合等操作。
  • 这是需要我们自己定义如何操作的
from pyspark.sql.functions import col, windowwords = df.selectExpr("CAST(value AS STRING)")
wordCounts = words.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"),col("word")
).count()

定义输出接收器:

  • 定义如何输出处理后的数据流。输出可以写入到多种外部存储系统中,如文件系统、数据库、控制台等。
query = wordCounts \.writeStream \.outputMode("complete") \.format("console") \.start()

启动流处理:

  • 最后,启动流处理。启动后,Spark会持续运行,处理实时进入的数据。
query.awaitTermination()

监控和异常处理:

  • 你可以监控流处理的进度和性能,以及添加异常处理逻辑。
try:query.awaitTermination()
except KeyboardInterrupt:query.stop()

输入源

file源

Structured Streaming中的文件源允许你监视指定目录中的新文件,并从中读取数据。这里是一些常见的选项:

  1. path: 需要监控的目录路径。
  2. maxFilesPerTrigger: 每个触发器处理的最大文件数。这个选项可以限制在每个触发器批次中应该读取的文件数量,有助于控制流处理的速率。
  3. latestFirst: 是否先处理最新的文件。设置为true会首先处理最新的文件,这可能对某些实时性要求较高的应用程序有用。
  4. fileFormat: 文件的格式,如jsoncsvparquet等。
# 创建一个DataFrame表示从目录`/path/to/directory`中连续读取的数据
csvDF = spark \.readStream \.option("sep", ",") \.schema(userSchema) \  # 可以定义一个schema.csv("/path/to/directory")# 启动流查询,输出模式为追加模式,并将结果输出到控制台
query = csvDF.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

在这个例子中,我们首先使用readStream来创建一个DataFrame读取流。我们通过.option("sep", ",")指定了CSV值之间的分隔符为逗号。schema(userSchema)部分定义了CSV文件的结构,你需要在代码中提前定义userSchema

然后,我们指定了监视的目录路径。csv("/path/to/directory")表示我们希望读取的文件类型是CSV。

最后,我们定义了一个查询,该查询将输出模式设置为append,这意味着仅将新的数据行附加到结果中。我们使用.format("console")将输出结果发送到控制台,这对于调试和开发是很有用的。start()方法启动流查询,而awaitTermination()方法则是让应用程序等待流处理的终止,以进行长时间运行。

kafka源

在Structured Streaming中,Kafka源允许你从Kafka主题读取数据流。这里是一些常见的Kafka源选项:

  1. kafka.bootstrap.servers: Kafka集群的地址列表,通常是"host1:port1,host2:port2"的形式。
  2. subscribe: 要订阅的Kafka主题的名称或用逗号分隔的多个主题的列表。
  3. startingOffsets: 指定流应从何处开始读取。它可以是"latest"(默认值),“earliest”,或是JSON字符串指定每个主题的分区起始偏移量。
  4. endingOffsets: 流查询终止时的偏移量。仅在批处理查询中使用。
  5. failOnDataLoss: 是否在数据丢失(例如,Kafka主题被删除)时使查询失败。默认为true

下面是一个Kafka源的操作示例,它从Kafka主题读取数据流,并将其加载为DataFrame:

# 创建一个DataFrame表示从名为"updates"的Kafka主题读取的数据
kafkaDF = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "updates") \.option("startingOffsets", "latest") \.load()# 选择我们需要的字段并将value字段从字节转换为字符串
selectedDF = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 启动流查询,将结果输出到控制台
query = selectedDF.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

在这个例子中,我们使用.format("kafka")来告诉Spark我们正在使用Kafka源,并通过option来设置Kafka的相关参数。.load()方法加载Kafka主题为DataFrame。

以下是.option()方法设置的参数的解释:

  1. kafka.bootstrap.servers: 指定Kafka集群的服务器地址及其端口。这个设置是必须的,因为它告诉Spark Streaming在哪里可以找到Kafka集群。格式是一个逗号分隔的主机和端口对的列表(例如:“host1:port1,host2:port2”)。这里的hostport分别对应Kafka服务器的IP地址和监听端口。
  2. subscribe: 这个选项用于指定一个或多个Kafka主题来订阅。只要这些主题有数据写入,Spark Streaming就会读取这些数据。在这个例子中,"updates"是你想要订阅的Kafka主题的名称。
  3. startingOffsets: 定义当你的Spark应用第一次启动并且没有设置偏移量的时候,它应该从Kafka主题的哪里开始读取数据。"latest"表示只读取启动应用程序后生成的数据,而"earliest"表示从可用的最早的数据开始读取。你还可以指定一个JSON字符串来表示每个主题的每个分区的确切开始偏移量。

selectExpr是一个转换操作,它允许你运行SQL表达式。在这里,我们将keyvalue列从字节转换为字符串类型,便于阅读和处理。

查询的其余部分和之前的例子类似,这次我们也是以追加模式输出到控制台,并启动流查询。

请注意,你需要有一个运行中的Kafka集群,并已经创建了相关的主题,以及在Spark集群上配置了适当的Kafka依赖。

socket源

在Structured Streaming中,使用socket源意味着数据流将来自于一个TCP套接字连接。这是最简单的流式数据源之一,通常用于测试和原型设计阶段。它允许您从通过TCP连接发送的数据流中读取文本数据。

以下是如何在Structured Streaming中设置socket源:

from pyspark.sql import SparkSession# 初始化SparkSession
spark = SparkSession.builder \.appName("StructuredSocketRead") \.getOrCreate()# 创建流式DataFrame,连接到指定的socket
lines = spark.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load()# 使用DataFrame API进行数据处理
# ...# 启动流查询
query = lines.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

上面的代码片段执行了以下操作:

  1. 通过SparkSession.builder初始化了一个SparkSession。
  2. 使用readStream方法创建了一个流式DataFrame,它将会连接到在localhost上的9999端口监听的TCP套接字。
  3. 通过format("socket")指定了数据源格式为socket。
  4. 使用.option("host", "localhost").option("port", 9999)设置了监听的主机地址和端口号。
  5. .load()方法触发了对socket源的连接。
  6. 之后可以在lines DataFrame上应用各种转换操作,如过滤、选择、聚合等。
  7. writeStream定义了如何输出处理后的流数据,这里通过.format("console")指定了输出到控制台。
  8. .start()开始接收数据并处理,.awaitTermination()方法让程序持续运行,直到手动停止或者遇到错误。

使用socket源进行Structured Streaming是一个好方法,可以实时测试您的流处理逻辑,因为您可以很容易地通过如netcat之类的工具来发送数据。

然后在另外一个打开虚拟机另外窗口输入

nc -lk 你的端口号

rate源

在Structured Streaming中,rate源每秒生成特定的数据行,两个列的数据流:timestampvalue。这个源非常适合生成简单的数据流进行测试和调试。

每个输出行包含一个timestampvaluetimestamp是每个触发器触发时的当前时间戳,而value是从程序开始以来的触发器触发的次数。

下面是如何在Structured Streaming中设置rate源的例子:

from pyspark.sql import SparkSession# 初始化SparkSession
spark = SparkSession.builder \.appName("StructuredRateRead") \.getOrCreate()# 创建流式DataFrame,用rate源
df = spark.readStream \.format("rate") \.option("rowsPerSecond", "1") \.load()# 使用DataFrame API进行数据处理
# ...# 启动流查询,输出到控制台
query = df.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()

上面的代码片段执行了以下操作:

  1. 使用SparkSession.builder初始化了一个SparkSession。
  2. 使用readStream方法创建了一个流式DataFrame,它将会使用rate源。
  3. 通过.option("rowsPerSecond", "1")设置每秒生成的行数。
  4. .load()方法触发了对rate源的连接。
  5. 使用.writeStream定义了如何输出处理后的流数据。
  6. 通过.format("console")指定了输出到控制台。
  7. .start()开始接收数据并处理,.awaitTermination()方法让程序持续运行,直到手动停止或者遇到错误。

rate源非常适合开发和测试时生成连续的、预测的数据流,但它不适用于生产环境,因为它不是从实际的数据源读取数据。

可以使用option方法来配置这个源的行为。以下是rate源的一些常见选项和它们的功能:

  1. rowsPerSecond:指定每秒生成的行数。这个选项可以帮助你控制数据生成的速率。

    示例:.option("rowsPerSecond", "10") 表示每秒生成10行数据。

  2. rampUpTime:在指定时间内逐渐增加到rowsPerSecond指定的速率。这通常用于模拟数据源在启动时从没有数据到达指定速率的过渡过程。

    示例:.option("rampUpTime", "1m") 在1分钟内逐渐增加生成的数据行数。

  3. numPartitions:指定生成的数据将在多少个分区中分布。这可以帮助模拟并行数据流的情况。

    示例:.option("numPartitions", "2") 表示数据将分布在两个分区中。

使用这些选项的例子:

df = spark.readStream \.format("rate") \.option("rowsPerSecond", "100") \.option("rampUpTime", "5s") \.option("numPartitions", "2") \.load()

这将会创建一个数据流,初始时每秒100行数据,5秒内逐渐增加到这个速率,并且数据在两个分区中分布。

接收器

Structured Streaming中的输出接收器(Sink)是指数据流最终输出到的地方。Spark Structured Streaming提供了多种不同的输出接收器,以支持将数据流输出到各种外部系统和格式。以下是一些常见的输出接收器:

  1. 文件接收器(File Sink):输出数据到文件系统。支持的格式包括文本、JSON、CSV、Parquet等。可以指定文件输出目录和文件格式。

  2. Kafka接收器(Kafka Sink):输出数据到Kafka主题。可以指定Kafka服务器的地址和要写入的主题。

  3. 控制台接收器(Console Sink):将数据输出到控制台,主要用于调试和开发。

  4. 内存接收器(Memory Sink):输出数据到内存表中,允许在内存中查询数据。这主要用于快速测试和原型开发。

  5. Foreach接收器:提供了一个通用接口,允许你对数据流中的每个记录执行任意操作。这可以用于实现自定义的输出逻辑,例如写入自定义外部存储或调用外部API。

使用Structured Streaming的输出接收器时,你需要指定输出模式(如"append"、“complete"或"update”),输出接收器类型(如"console"、“kafka”、"file"等),以及任何必要的配置选项。

例如,将数据流输出到控制台的代码示例:

query = df.writeStream \.outputMode("append") \.format("console") \.start()

将数据流输出到Kafka的代码示例:

query = df.writeStream \.outputMode("update") \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("topic", "updates") \.start()

这些代码示例展示了如何将数据流输出到不同类型的接收器。每种类型的接收器可能有不同的配置选项和限制,所以在使用时需要查阅具体的文档。

这篇关于Structured Streaming的模型介绍与实际操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/436019

相关文章

揭秘未来艺术:AI绘画工具全面介绍

📑前言 随着科技的飞速发展,人工智能(AI)已经逐渐渗透到我们生活的方方面面。在艺术创作领域,AI技术同样展现出了其独特的魅力。今天,我们就来一起探索这个神秘而引人入胜的领域,深入了解AI绘画工具的奥秘及其为艺术创作带来的革命性变革。 一、AI绘画工具的崛起 1.1 颠覆传统绘画模式 在过去,绘画是艺术家们通过手中的画笔,蘸取颜料,在画布上自由挥洒的创造性过程。然而,随着AI绘画工

一份LLM资源清单围观技术大佬的日常;手把手教你在美国搭建「百万卡」AI数据中心;为啥大模型做不好简单的数学计算? | ShowMeAI日报

👀日报&周刊合集 | 🎡ShowMeAI官网 | 🧡 点赞关注评论拜托啦! 1. 为啥大模型做不好简单的数学计算?从大模型高考数学成绩不及格说起 司南评测体系 OpenCompass 选取 7 个大模型 (6 个开源模型+ GPT-4o),组织参与了 2024 年高考「新课标I卷」的语文、数学、英语考试,然后由经验丰富的判卷老师评判得分。 结果如上图所

20.Spring5注解介绍

1.配置组件 Configure Components 注解名称说明@Configuration把一个类作为一个loC容 器 ,它的某个方法头上如果注册7@Bean , 就会作为这个Spring容器中的Bean@ComponentScan在配置类上添加@ComponentScan注解。该注解默认会扫描该类所在的包下所有的配置类,相当于之前的 <context:component-scan>@Sc

大语言模型(LLMs)能够进行推理和规划吗?

大语言模型(LLMs),基本上是经过强化训练的 n-gram 模型,它们在网络规模的语言语料库(实际上,可以说是我们文明的知识库)上进行了训练,展现出了一种超乎预期的语言行为,引发了我们的广泛关注。从训练和操作的角度来看,LLMs 可以被认为是一种巨大的、非真实的记忆库,相当于为我们所有人提供了一个外部的系统 1(见图 1)。然而,它们表面上的多功能性让许多研究者好奇,这些模型是否也能在通常需要系

人工和AI大语言模型成本对比 ai语音模型

这里既有AI,又有生活大道理,无数渺小的思考填满了一生。 上一专题搭建了一套GMM-HMM系统,来识别连续0123456789的英文语音。 但若不是仅针对数字,而是所有普通词汇,可能达到十几万个词,解码过程将非常复杂,识别结果组合太多,识别结果不会理想。因此只有声学模型是完全不够的,需要引入语言模型来约束识别结果。让“今天天气很好”的概率高于“今天天汽很好”的概率,得到声学模型概率高,又符合表达

智能客服到个人助理,国内AI大模型如何改变我们的生活?

引言 随着人工智能(AI)技术的高速发展,AI大模型越来越多地出现在我们的日常生活和工作中。国内的AI大模型在过去几年里取得了显著的进展,不少独创的技术点和实际应用令人瞩目。 那么,国内的AI大模型有哪些独创的技术点?它们在实际应用中又有哪些出色表现呢?此外,普通人又该如何利用这些大模型提升工作和生活的质量和效率呢?本文将为你一一解析。 一、国内AI大模型的独创技术点 多模态学习 多

OpenCompass:大模型测评工具

大模型相关目录 大模型,包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步,扬帆起航。 大模型应用向开发路径:AI代理工作流大模型应用开发实用开源项目汇总大模型问答项目问答性能评估方法大模型数据侧总结大模型token等基本概念及参数和内存的关系大模型应用开发-华为大模型生态规划从零开始的LLaMA-Factor

模型压缩综述

https://www.cnblogs.com/shixiangwan/p/9015010.html

AI赋能天气:微软研究院发布首个大规模大气基础模型Aurora

编者按:气候变化日益加剧,高温、洪水、干旱,频率和强度不断增加的全球极端天气给整个人类社会都带来了难以估计的影响。这给现有的天气预测模型提出了更高的要求——这些模型要更准确地预测极端天气变化,为政府、企业和公众提供更可靠的信息,以便做出及时的准备和响应。为了应对这一挑战,微软研究院开发了首个大规模大气基础模型 Aurora,其超高的预测准确率、效率及计算速度,实现了目前最先进天气预测系统性能的显著

PyTorch模型_trace实战:深入理解与应用

pytorch使用trace模型 1、使用trace生成torchscript模型2、使用trace的模型预测 1、使用trace生成torchscript模型 def save_trace(model, input, save_path):traced_script_model = torch.jit.trace(model, input)<