大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)

2024-03-09 17:58

本文主要是介绍大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)

一、简述:
Structured Streaming 是基于Spark SQL引擎构建的、可扩展且容错性高的流处理引擎。它以检查点和预写日志记录每个触发时间正处理数据的偏移范围,保证端到端数据的一致性。Spark2.3.0版本引入持续流失处理模型后,可将数据延迟降低到毫秒级。Structured Streaming默认处理模型是微批处理模型,它是将当前一批作业处理完成后,记录日志偏移量后才启动下一批作业,延迟超过100毫秒;持续处理模型将每个任务输入流进行标记记录,遇到任务标记后将偏移量异步报告给引擎,可实现流计算的毫秒级延迟。但持续处理只能做到“至少一次”的一致性。Spark Streaming 采用的数据抽象是DStream(一系列的RDD),Structured Streaming 采用的数据抽象是DataFrame,Spark SQL 只能处理静态数据,而Structured Streaming可处理结构化的流数据,它是Spark Streaming 和 Spark SQL的结合体。
二、Structured Streaming 程序

数据源端,模拟数据发送

[root@hadoop1 ~]# nc -lk 9999
hadoop spark
spark hive
hive spark

流计算端,编写程序

[root@hadoop1 temp]# vi sparksstructtreamwordcount.py
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql import SparkSessionif __name__ == "__main__":
#独立运行该程序,防止别的程序导入spark = SparkSession.builder.appName("structurestreamwordcount").getOrCreate()#遵循工厂设计模式,利用该统一接口创建一系列对象,appName是应用名称,唯一标识应用,getOrCreate检查是否有SparkSession,否则建立一个SparkSession,并设置SparkSession为全局默认的SparkSessionspark.sparkContext.setLogLevel('WARN')#设置日志级别,排除日志查看干扰lines = spark.readStream.format("socket").option("host","192.168.80.2").option("port",9999).load()#创建输入数据源,模式,地址,端口和载入数据words = lines.select(explode(split(lines.value," ")).alias("word"))wordsCounts = words.groupBy("word").count()#定义流计算过程query = wordsCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="5 seconds").start()#启动流计算并输出结果query.awaitTermination()#使得查询在后台持续运行,直到受到用户退出的指令

执行程序

[root@hadoop1 temp]# /home/hadoop/spark/bin/spark-submit /home/hadoop/temp/sparksstructtreamwordcount.py

结果显示
在这里插入图片描述

这篇关于大数据-玩转数据-Spark-Structured Streaming 简述及编程初步(python版)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/791482

相关文章

零基础STM32单片机编程入门(一)初识STM32单片机

文章目录 一.概要二.单片机型号命名规则三.STM32F103系统架构四.STM32F103C8T6单片机启动流程五.STM32F103C8T6单片机主要外设资源六.编程过程中芯片数据手册的作用1.单片机外设资源情况2.STM32单片机内部框图3.STM32单片机管脚图4.STM32单片机每个管脚可配功能5.单片机功耗数据6.FALSH编程时间,擦写次数7.I/O高低电平电压表格8.外设接口

16.Spring前世今生与Spring编程思想

1.1.课程目标 1、通过对本章内容的学习,可以掌握Spring的基本架构及各子模块之间的依赖关系。 2、 了解Spring的发展历史,启发思维。 3、 对 Spring形成一个整体的认识,为之后的深入学习做铺垫。 4、 通过对本章内容的学习,可以了解Spring版本升级的规律,从而应用到自己的系统升级版本命名。 5、Spring编程思想总结。 1.2.内容定位 Spring使用经验

Python 字符串占位

在Python中,可以使用字符串的格式化方法来实现字符串的占位。常见的方法有百分号操作符 % 以及 str.format() 方法 百分号操作符 % name = "张三"age = 20message = "我叫%s,今年%d岁。" % (name, age)print(message) # 我叫张三,今年20岁。 str.format() 方法 name = "张三"age

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

一道经典Python程序样例带你飞速掌握Python的字典和列表

Python中的列表(list)和字典(dict)是两种常用的数据结构,它们在数据组织和存储方面有很大的不同。 列表(List) 列表是Python中的一种有序集合,可以随时添加和删除其中的元素。列表中的元素可以是任何数据类型,包括数字、字符串、其他列表等。列表使用方括号[]表示,元素之间用逗号,分隔。 定义和使用 # 定义一个列表 fruits = ['apple', 'banana

Python应用开发——30天学习Streamlit Python包进行APP的构建(9)

st.area_chart 显示区域图。 这是围绕 st.altair_chart 的语法糖。主要区别在于该命令使用数据自身的列和指数来计算图表的 Altair 规格。因此,在许多 "只需绘制此图 "的情况下,该命令更易于使用,但可定制性较差。 如果 st.area_chart 无法正确猜测数据规格,请尝试使用 st.altair_chart 指定所需的图表。 Function signa

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

python实现最简单循环神经网络(RNNs)

Recurrent Neural Networks(RNNs) 的模型: 上图中红色部分是输入向量。文本、单词、数据都是输入,在网络里都以向量的形式进行表示。 绿色部分是隐藏向量。是加工处理过程。 蓝色部分是输出向量。 python代码表示如下: rnn = RNN()y = rnn.step(x) # x为输入向量,y为输出向量 RNNs神经网络由神经元组成, python

python 喷泉码

因为要完成毕业设计,毕业设计做的是数据分发与传输的东西。在网络中数据容易丢失,所以我用fountain code做所发送数据包的数据恢复。fountain code属于有限域编码的一部分,有很广泛的应用。 我们日常生活中使用的二维码,就用到foutain code做数据恢复。你遮住二维码的四分之一,用手机的相机也照样能识别。你遮住的四分之一就相当于丢失的数据包。 为了实现并理解foutain

python 点滴学

1 python 里面tuple是无法改变的 tuple = (1,),计算tuple里面只有一个元素,也要加上逗号 2  1 毕业论文改 2 leetcode第一题做出来