本文主要是介绍大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)
一、简述:
Structured Streaming 是基于Spark SQL引擎构建的、可扩展且容错性高的流处理引擎。它以检查点和预写日志记录每个触发时间正处理数据的偏移范围,保证端到端数据的一致性。Spark2.3.0版本引入持续流失处理模型后,可将数据延迟降低到毫秒级。Structured Streaming默认处理模型是微批处理模型,它是将当前一批作业处理完成后,记录日志偏移量后才启动下一批作业,延迟超过100毫秒;持续处理模型将每个任务输入流进行标记记录,遇到任务标记后将偏移量异步报告给引擎,可实现流计算的毫秒级延迟。但持续处理只能做到“至少一次”的一致性。Spark Streaming 采用的数据抽象是DStream(一系列的RDD),Structured Streaming 采用的数据抽象是DataFrame,Spark SQL 只能处理静态数据,而Structured Streaming可处理结构化的流数据,它是Spark Streaming 和 Spark SQL的结合体。
二、Structured Streaming 程序
数据源端,模拟数据发送
[root@hadoop1 ~]# nc -lk 9999
hadoop spark
spark hive
hive spark
流计算端,编写程序
[root@hadoop1 temp]# vi sparksstructtreamwordcount.py
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql import SparkSessionif __name__ == "__main__":
#独立运行该程序,防止别的程序导入spark = SparkSession.builder.appName("structurestreamwordcount").getOrCreate()#遵循工厂设计模式,利用该统一接口创建一系列对象,appName是应用名称,唯一标识应用,getOrCreate检查是否有SparkSession,否则建立一个SparkSession,并设置SparkSession为全局默认的SparkSessionspark.sparkContext.setLogLevel('WARN')#设置日志级别,排除日志查看干扰lines = spark.readStream.format("socket").option("host","192.168.80.2").option("port",9999).load()#创建输入数据源,模式,地址,端口和载入数据words = lines.select(explode(split(lines.value," ")).alias("word"))wordsCounts = words.groupBy("word").count()#定义流计算过程query = wordsCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="5 seconds").start()#启动流计算并输出结果query.awaitTermination()#使得查询在后台持续运行,直到受到用户退出的指令
执行程序
[root@hadoop1 temp]# /home/hadoop/spark/bin/spark-submit /home/hadoop/temp/sparksstructtreamwordcount.py
结果显示
这篇关于大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!