大数据-玩转数据-Spark-Structured Streaming 容错(python版)

2024-03-09 17:58

本文主要是介绍大数据-玩转数据-Spark-Structured Streaming 容错(python版),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

大数据-玩转数据-Spark-Structured Streaming 容错(python版)

说明:
由于网络问题,链路中断,系统崩溃,JVM故障都会导致数据流的运行结果出现错误,Spark设计了输入源,执行引擎和接收器多个松散耦合组件隔离故障。

输入源通过位置偏移量来记录目前所处位置,引擎通过检查点保存中间状态,接收器使用“幂等”的接收器来保障输出的稳定性。

我们希望数据是它产生的时间,而不是到达的时间,Spark模型当中,事件时间是数据中的一列,为了避免存储空间无限扩大,同时还引入“水印”机制,将超过时间阈值的数据抛弃掉。

代码举例

#!/usr/bin/env python3
# -*- coding: utf-8 -*-# 导入需要用到的模块
import os
import shutil
from functools import partialfrom pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType# 定义CSV文件的路径常量
TEST_DATA_DIR = '/tmp/testdata/'
TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'# 测试的环境搭建,判断CSV文件夹是否存在,如果存在则删除旧数据,并建立文件夹
def test_setUp():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)os.mkdir(TEST_DATA_DIR)# 测试环境的恢复,对CSV文件夹进行清理
def test_tearDown():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)# 写模拟输入的函数,传入CSV文件名和数据。注意写入应当是原子性的
# 如果写入时间较长,应当先写到临时文件在移动到CSV目录内。
# 这里采取直接写入的方式。
def write_to_cvs(filename, data):with open(TEST_DATA_DIR + filename, "wt", encoding="utf-8") as f:f.write(data)if __name__ == "__main__":test_setUp()# 定义模式,为字符串类型的word和时间戳类型的eventTime两个列组成schema = StructType([StructField("word", StringType(), True),StructField("eventTime", TimestampType(), True)])spark = SparkSession \.builder \.appName("StructuredNetworkWordCountWindowedDelay") \.getOrCreate()spark.sparkContext.setLogLevel('WARN')lines = spark \.readStream \.format('csv') \.schema(schema) \.option("sep", ";") \.option("header", "false") \.load(TEST_DATA_DIR_SPARK)# 定义窗口windowDuration = '1 hour'windowedCounts = lines \.withWatermark("eventTime", "1 hour") \.groupBy('word', window('eventTime', windowDuration)) \.count()query = windowedCounts \.writeStream \.outputMode("update") \.format("console") \.option('truncate', 'false') \.trigger(processingTime="8 seconds") \.start()# 写入测试文件file1.cvswrite_to_cvs('file1.cvs', """
正常;2018-10-01 08:00:00
正常;2018-10-01 08:10:00
正常;2018-10-01 08:20:00
""")# 处理当前数据query.processAllAvailable()# 这时候事件时间更新到上次看到的最大的2018-10-01 08:20:00write_to_cvs('file2.cvs', """
正常;2018-10-01 20:00:00
一小时以内延迟到达;2018-10-01 10:00:00
一小时以内延迟到达;2018-10-01 10:50:00
""")# 处理当前数据query.processAllAvailable()# 这时候事件时间更新到上次看到的最大的2018-10-01 20:00:00write_to_cvs('file3.cvs', """
正常;2018-10-01 20:00:00
一小时外延迟到达;2018-10-01 10:00:00
一小时外延迟到达;2018-10-01 10:50:00
一小时以内延迟到达;2018-10-01 19:00:00
""")# 处理当前数据query.processAllAvailable()query.stop()test_tearDown()

这篇关于大数据-玩转数据-Spark-Structured Streaming 容错(python版)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/791484

相关文章

Python 字符串占位

在Python中,可以使用字符串的格式化方法来实现字符串的占位。常见的方法有百分号操作符 % 以及 str.format() 方法 百分号操作符 % name = "张三"age = 20message = "我叫%s,今年%d岁。" % (name, age)print(message) # 我叫张三,今年20岁。 str.format() 方法 name = "张三"age

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

一道经典Python程序样例带你飞速掌握Python的字典和列表

Python中的列表(list)和字典(dict)是两种常用的数据结构,它们在数据组织和存储方面有很大的不同。 列表(List) 列表是Python中的一种有序集合,可以随时添加和删除其中的元素。列表中的元素可以是任何数据类型,包括数字、字符串、其他列表等。列表使用方括号[]表示,元素之间用逗号,分隔。 定义和使用 # 定义一个列表 fruits = ['apple', 'banana

Python应用开发——30天学习Streamlit Python包进行APP的构建(9)

st.area_chart 显示区域图。 这是围绕 st.altair_chart 的语法糖。主要区别在于该命令使用数据自身的列和指数来计算图表的 Altair 规格。因此,在许多 "只需绘制此图 "的情况下,该命令更易于使用,但可定制性较差。 如果 st.area_chart 无法正确猜测数据规格,请尝试使用 st.altair_chart 指定所需的图表。 Function signa

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

python实现最简单循环神经网络(RNNs)

Recurrent Neural Networks(RNNs) 的模型: 上图中红色部分是输入向量。文本、单词、数据都是输入,在网络里都以向量的形式进行表示。 绿色部分是隐藏向量。是加工处理过程。 蓝色部分是输出向量。 python代码表示如下: rnn = RNN()y = rnn.step(x) # x为输入向量,y为输出向量 RNNs神经网络由神经元组成, python

python 喷泉码

因为要完成毕业设计,毕业设计做的是数据分发与传输的东西。在网络中数据容易丢失,所以我用fountain code做所发送数据包的数据恢复。fountain code属于有限域编码的一部分,有很广泛的应用。 我们日常生活中使用的二维码,就用到foutain code做数据恢复。你遮住二维码的四分之一,用手机的相机也照样能识别。你遮住的四分之一就相当于丢失的数据包。 为了实现并理解foutain

python 点滴学

1 python 里面tuple是无法改变的 tuple = (1,),计算tuple里面只有一个元素,也要加上逗号 2  1 毕业论文改 2 leetcode第一题做出来

Python爬虫-贝壳新房

前言 本文是该专栏的第32篇,后面会持续分享python爬虫干货知识,记得关注。 本文以某房网为例,如下图所示,采集对应城市的新房房源数据。具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。接下来,跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地址:aHR0cHM6Ly93aC5mYW5nLmtlLmNvbS9sb3VwYW4v 目标:采集对应城市的

python 在pycharm下能导入外面的模块,到terminal下就不能导入

项目结构如下,在ic2ctw.py 中导入util,在pycharm下不报错,但是到terminal下运行报错  File "deal_data/ic2ctw.py", line 3, in <module>     import util 解决方案: 暂时方案:在终端下:export PYTHONPATH=/Users/fujingling/PycharmProjects/PSENe