流批一体计算引擎-10-[Flink]中的常用算子和DataStream转换

2024-06-08 02:52

本文主要是介绍流批一体计算引擎-10-[Flink]中的常用算子和DataStream转换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

pyflink 处理 kafka数据
在这里插入图片描述

1 DataStream API 示例代码

从非空集合中读取数据,并将结果写入本地文件系统。

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSinkdef tutorial():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)ds = env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')],type_info=Types.ROW([Types.INT(), Types.STRING()]))ds.add_sink(StreamingFileSink.for_row_format('output', Encoder.simple_string_encoder()).build())env.execute("tutorial_job")if __name__ == '__main__':tutorial()

(1)DataStream API应用程序首先需要声明一个执行环境
StreamExecutionEnvironment,这是流式程序执行的上下文。
后续将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

(2)声明数据源
一旦创建了 StreamExecutionEnvironment 之后,可以使用它来声明数据源。
数据源从外部系统(如Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到Flink作业里。
为了简单起见,本次使用元素集合作为数据源。
这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的ROW类型)。

ds = env.from_collection(collection=[(1, 'aaa'), (2, 'bbb')],type_info=Types.ROW([Types.INT(), Types.STRING()]))

(3)转换操作或写入外部系统
现在可以在这个数据流上执行转换操作,或者使用 sink 将数据写入外部系统。
本次使用StreamingFileSink将数据写入output文件目录中。

ds.add_sink(StreamingFileSink.for_row_format('output', Encoder.simple_string_encoder()).build())

(4)执行作业
最后一步是执行真实的 PyFlink DataStream API作业。
PyFlink applications是懒加载的,并且只有在完全构建之后才会提交给集群上执行。
要执行一个应用程序,只需简单地调用env.execute(job_name)。

env.execute("tutorial_job")

在这里插入图片描述

2 自定义转换函数的三种方式

三种方式支持用户自定义函数。

2.1 Lambda函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())mapped_stream.print()
env.execute("tutorial_job")

2.2 python函数[简便]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentdef my_map_func(value):return value + 1def main():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())mapped_stream.print()env.execute("tutorial_job")if __name__ == '__main__':main()

2.3 接口函数[复杂]

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunctionclass MyMapFunction(MapFunction):def map(self, value):return value + 1def main():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)data_stream = env.from_collection([1, 2, 3, 41, 5], type_info=Types.INT())mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())mapped_stream.print()env.execute("tutorial_job")if __name__ == '__main__':main()

3 常用算子

参考官网算子

3.1 map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())mapped_stream.print()
env.execute("tutorial_job")

在这里插入图片描述

3.2 flat_map【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection(collection=['hello apache flink', 'streaming compute'])
out = data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING())out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.3 filter【DataStream->DataStream】

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件def my_func(value):if value % 2 == 0:return valuedata_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
filtered_stream = data_stream.filter(my_func)filtered_stream.print()
env.execute("tutorial_job")

3.4 window_all【DataStream->AllWindowedStream】

根据某些特征(例如,最近 100毫秒秒内到达的数据)对所有流事件进行分组。
所有的元素。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.4.1 apply【AllWindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterablefrom pyflink.common import Time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import AllWindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindowenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件class MyAllWindowFunction(AllWindowFunction[tuple, int, TimeWindow]):def apply(self, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:sum = 0for input in inputs:sum += input[0]yield sumdata_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
all_window_stream = data_stream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
out = all_window_stream.apply(MyAllWindowFunction())out.print()
env.execute("tutorial_job")

3.5 key_by【DataStream->KeyedStream】

需要结合reduce或window算子使用。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())

3.6 reduce【KeyedStream->DataStream】增量

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
out = key_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))out.print()
env.execute("tutorial_job")

在这里插入图片描述
在相同 key 的数据流上“滚动”执行 reduce。
将当前元素与最后一次 reduce 得到的值组合然后输出新值。

3.7 window【KeyedStream->WindowedStream】

在已经分区的 KeyedStreams 上定义 Window。

data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))

3.7.1 apply【WindowedStream->DataStream】

将通用 function 应用于整个窗口。

from typing import Iterablefrom pyflink.common import Time, Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import WindowFunction
from pyflink.datastream.window import TumblingProcessingTimeWindows, TimeWindowenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件class MyWindowFunction(WindowFunction[tuple, int, int, TimeWindow]):def apply(self, key: int, window: TimeWindow, inputs: Iterable[tuple]) -> Iterable[int]:sum = 0for input in inputs:sum += input[0]yield key, sumdata_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.apply(MyWindowFunction())out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.7.2 reduce【WindowedStream->DataStream】

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.window import TumblingEventTimeWindows,TumblingProcessingTimeWindowsenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(lambda a, b: (a[0]+b[0], a[1]))out.print()
env.execute("tutorial_job")

在这里插入图片描述
方式二

from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, ReduceFunction
from pyflink.datastream.window import TumblingProcessingTimeWindowsenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件class MyReduceFunction(ReduceFunction):def reduce(self, value1, value2):return value1[0] + value2[0], value1[1]data_stream = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
key_stream = data_stream.key_by(lambda x: x[1], key_type=Types.STRING())
window_stream = key_stream.window(TumblingProcessingTimeWindows.of(Time.milliseconds(10)))
out = window_stream.reduce(MyReduceFunction())out.print()
env.execute("tutorial_job")

3.8 union【DataStream*->DataStream】

将两个或多个数据流联合来创建一个包含所有流中数据的新流。
注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
out = data_stream2.union(data_stream1)out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9 connect【DataStream,DataStream->ConnectedStream】

stream_1 = ...
stream_2 = ...
connected_streams = stream_1.connect(stream_2)

3.9.1 CoMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoMapFunctionenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)class MyCoMapFunction(CoMapFunction):def map1(self, value):return value[0] *100, value[1]def map2(self, value):return value[0], value[1] + 'flink'out = connected_stream.map(MyCoMapFunction())out.print()
env.execute("tutorial_job")

在这里插入图片描述

3.9.2 CoFlatMap【ConnectedStream->DataStream】

from pyflink.datastream import StreamExecutionEnvironment, CoFlatMapFunctionenv = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 将输出写入一个文件data_stream1 = env.from_collection(collection=[(1, 'm'), (3, 'n'), (2, 'm'), (4,'m')])
data_stream2 = env.from_collection(collection=[(1, 'a'), (3, 'b'), (2, 'a'), (4,'a')])
connected_stream = data_stream1.connect(data_stream2)class MyCoFlatMapFunction(CoFlatMapFunction):def flat_map1(self, value):for i in range(value[0]):yield value[0]*100def flat_map2(self, value):yield value[0] + 10out = connected_stream.flat_map(MyCoFlatMapFunction())out.print()
env.execute("tutorial_job")

在这里插入图片描述

4 对接kafka输入json输出json

输入{“name”:“中文”}
输出{“name”:“中文结果”}

from pyflink.common import SimpleStringSchema, WatermarkStrategy, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, \KafkaRecordSerializationSchema
import jsonenv = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)brokers = "IP:9092"# 读取kafka
source = KafkaSource.builder() \.set_bootstrap_servers(brokers) \.set_topics("flink_source") \.set_group_id("my-group") \.set_starting_offsets(KafkaOffsetsInitializer.latest()) \.set_value_only_deserializer(SimpleStringSchema()) \.build()ds1 = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
ds1.print()# 处理流程
def process_fun(line):data_dict = json.loads(line)result_dict = {"result": data_dict.get("name", "无")+"结果"}return json.dumps(result_dict, ensure_ascii=False)ds2 = ds1.map(process_fun, Types.STRING())
ds2.print()

这篇关于流批一体计算引擎-10-[Flink]中的常用算子和DataStream转换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1041028

相关文章

Python如何计算两个不同类型列表的相似度

《Python如何计算两个不同类型列表的相似度》在编程中,经常需要比较两个列表的相似度,尤其是当这两个列表包含不同类型的元素时,下面小编就来讲讲如何使用Python计算两个不同类型列表的相似度吧... 目录摘要引言数字类型相似度欧几里得距离曼哈顿距离字符串类型相似度Levenshtein距离Jaccard相

Java数字转换工具类NumberUtil的使用

《Java数字转换工具类NumberUtil的使用》NumberUtil是一个功能强大的Java工具类,用于处理数字的各种操作,包括数值运算、格式化、随机数生成和数值判断,下面就来介绍一下Number... 目录一、NumberUtil类概述二、主要功能介绍1. 数值运算2. 格式化3. 数值判断4. 随机

CSS弹性布局常用设置方式

《CSS弹性布局常用设置方式》文章总结了CSS布局与样式的常用属性和技巧,包括视口单位、弹性盒子布局、浮动元素、背景和边框样式、文本和阴影效果、溢出隐藏、定位以及背景渐变等,通过这些技巧,可以实现复杂... 一、单位元素vm 1vm 为视口的1%vh 视口高的1%vmin 参照长边vmax 参照长边re

C语言中自动与强制转换全解析

《C语言中自动与强制转换全解析》在编写C程序时,类型转换是确保数据正确性和一致性的关键环节,无论是隐式转换还是显式转换,都各有特点和应用场景,本文将详细探讨C语言中的类型转换机制,帮助您更好地理解并在... 目录类型转换的重要性自动类型转换(隐式转换)强制类型转换(显式转换)常见错误与注意事项总结与建议类型

Python实现视频转换为音频的方法详解

《Python实现视频转换为音频的方法详解》这篇文章主要为大家详细Python如何将视频转换为音频并将音频文件保存到特定文件夹下,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. python需求的任务2. Python代码的实现3. 代码修改的位置4. 运行结果5. 注意事项

使用Python实现图片和base64转换工具

《使用Python实现图片和base64转换工具》这篇文章主要为大家详细介绍了如何使用Python中的base64模块编写一个工具,可以实现图片和Base64编码之间的转换,感兴趣的小伙伴可以了解下... 简介使用python的base64模块来实现图片和Base64编码之间的转换。可以将图片转换为Bas

Python中操作Redis的常用方法小结

《Python中操作Redis的常用方法小结》这篇文章主要为大家详细介绍了Python中操作Redis的常用方法,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解一下... 目录安装Redis开启、关闭Redisredis数据结构redis-cli操作安装redis-py数据库连接和释放增

一文详解Python中数据清洗与处理的常用方法

《一文详解Python中数据清洗与处理的常用方法》在数据处理与分析过程中,缺失值、重复值、异常值等问题是常见的挑战,本文总结了多种数据清洗与处理方法,文中的示例代码简洁易懂,有需要的小伙伴可以参考下... 目录缺失值处理重复值处理异常值处理数据类型转换文本清洗数据分组统计数据分箱数据标准化在数据处理与分析过

Java中Object类的常用方法小结

《Java中Object类的常用方法小结》JavaObject类是所有类的父类,位于java.lang包中,本文为大家整理了一些Object类的常用方法,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. public boolean equals(Object obj)2. public int ha

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动