Apache Beam 快速入门(Python 版)

2023-12-11 03:10
文章标签 python 入门 快速 apache beam

本文主要是介绍Apache Beam 快速入门(Python 版),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Beam 是一种大数据处理标准,由谷歌于 2016 年创建。它提供了一套统一的 DSL 用以处理离线和实时数据,并能在目前主流的大数据处理平台上使用,包括 Spark、Flink、以及谷歌自身的商业套件 Dataflow。Beam 的数据模型基于过去的几项研究成果:FlumeJava、Millwheel,适用场景包括 ETL、统计分析、实时计算等。目前,Beam 提供了两种语言的 SDK:Java、Python。本文将讲述如何使用 Python 编写 Beam 应用程序。

Apache Beam PipelineApache Beam Pipeline

安装 Apache Beam

Apache Beam Python SDK 必须使用 Python 2.7.x 版本,你可以安装 pyenv 来管理不同版本的 Python,或者直接从源代码编译安装(需要支持 SSL)。之后,你便可以在 Python 虚拟环境中安装 Beam SDK 了:

     
1
2
3
     
$ virtualenv venv --distribute
$ source venv/bin/activate
(venv) $ pip install apache-beam

Wordcount 示例

Wordcount 是大数据领域的 Hello World,我们来看如何使用 Beam 实现:

     
1
2
3
4
5
6
7
8
9
10
11
12
13
     
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | 'Create' >> beam.Create(['cat dog', 'snake cat', 'dog'])
counts = (
lines
| 'Split' >> (beam.FlatMap(lambda x: x.split(' '))
.with_output_types(unicode))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum)
)
counts | 'Print' >> beam.ParDo(lambda (w, c): print('%s: %s' % (w, c)))

运行脚本,我们便可得到每个单词出现的次数:

     
1
2
3
4
     
(venv) $ python wordcount.py
cat: 2
snake: 1
dog: 2

Apache Beam 有三个重要的基本概念:Pipeline、PCollection、以及 Transform。

  • Pipeline (管道)用以构建数据集和处理过程的 DAG(有向无环图)。我们可以将它看成 MapReduce 中的 Job 或是 Storm 的 Topology
  • PCollection 是一种数据结构,我们可以对其进行各类转换操作,如解析、过滤、聚合等。它和 Spark 中的 RDD 概念类似。
  • Transform (转换)则用于编写业务逻辑。通过它,我们可以将一个 PCollection 转换成另一个 PCollection。Beam 提供了许多内置的转换函数,我们将在下文讨论。

在本例中,Pipeline 和 PipelineOptions 用来创建一个管道。通过 with 关键字,上下文管理器会自动调用 Pipeline.run 和 wait_until_finish 方法。

     
1
     
[Output PCollection] = [Input PCollection] | [Label] >> [Transform]

| 是 Beam 引入的新操作符,用来添加一个转换。每次转换都可以定义一个唯一的标签,默认由 Beam 自动生成。转换能够串联,我们可以构建出不同形态的转换流程,它们在运行时会表示为一个 DAG。

beam.Create 用来从内存数据创建出一个 PCollection,主要用于测试和演示。Beam 提供了多种内置的输入源(Source)和输出目标(Sink),可以接收和写入有界(Bounded)或无界(Unbounded)的数据,并且能进行自定义。

beam.Map 是一种 一对一 的转换,本例中我们将一个个单词转换成形如 (word, 1) 的元组。beam.FlatMap 则是 Map 和 Flatten 的结合体,通过它,我们将包含多个单词的数组合并成一个一维的数组。

CombinePerKey 的输入源是一系列的二元组(2-element tuple)。这个操作会将元素的第一个元素作为键进行分组,并将相同键的值(第二个元素)组成一个列表。最后,我们使用 beam.ParDo 输出统计结果。这个转换函数比较底层,我们会在下文详述。

输入与输出

目前,Beam Python SDK 对输入输出的支持十分有限。下表列出了现阶段支持的数据源(资料来源):

语言 文件系统 消息队列 数据库
Java HDFS
TextIO
XML
AMQP
Kafka
JMS
Hive
Solr
JDBC
Python textio
avroio
tfrecordio
- Google Big Query
Google Cloud Datastore

这段代码演示了如何使用 textio 对文本文件进行读写:

     
1
2
     
lines = p | 'Read' >> beam.io.ReadFromText('/path/to/input-*.csv')
lines | 'Write' >> beam.io.WriteToText('/path/to/output', file_name_suffix='.csv')

通过使用通配符,textio 可以读取多个文件。我们还可以从不同的数据源中读取文件,并用 Flatten 方法将多个 PCollection 合并成一个。输出文件默认也会是多个,因为 Beam Pipeline 是并发执行的,不同的进程会写入独立的文件。

转换函数

Beam 中提供了基础和上层的转换函数。通常我们更偏向于使用上层函数,这样就可以将精力聚焦在实现业务逻辑上。下表列出了常用的上层转换函数:

转换函数 功能含义
Create(value) 基于内存中的集合数据生成一个 PCollection。
Filter(fn) 使用 fn 函数过滤 PCollection 中的元素。
Map(fn) 使用 fn 函数做一对一的转换处理。
FlatMap(fn) 功能和 Map 类似,但是 fn 需要返回一个集合,里面包含零个或多个元素,最终 FlatMap 会将这些集合合并成一个 PCollection。
Flatten() 合并多个 PCollection。
Partition(fn) 将一个 PCollection 切分成多个分区。fn 可以是 PartitionFn 或一个普通函数,能够接受两个参数:elementnum_partitions
GroupByKey() 输入源必须是使用二元组表示的键值对,该方法会按键进行分组,并返回一个 (key, iter<value>) 的序列。
CoGroupByKey() 对多个二元组 PCollection 按相同键进行合并,如输入的是 (k, v) 和 (k, w),则输出 (k, (iter<v>, iter<w>))
RemoveDuplicates() 对 PCollection 的元素进行去重。
CombinePerKey(fn) 功能和 GroupByKey 类似,但会进一步使用 fn 对值列表进行合并。fn 可以是一个 CombineFn,或是一个普通函数,接收序列并返回结果,如 summax 函数等。
CombineGlobally(fn) 使用 fn 将整个 PCollection 合并计算成单个值。

Callable, DoFn, ParDo

可以看到,多数转换函数都会接收另一个函数(Callable)做为参数。在 Python 中,Callable 可以是一个函数、类方法、Lambda 表达式、或是任何包含 __call__ 方法的对象实例。Beam 会将这些函数包装成一个 DoFn 类,所有转换函数最终都会调用最基础的 ParDo 函数,并将 DoFn 传递给它。

我们可以尝试将 lambda x: x.split(' ') 这个表达式转换成 DoFn 类:

     
1
2
3
4
5
     
class SplitFn(beam.DoFn):
def process(self, element):
return element.split(' ')
lines | beam.ParDo(SplitFn())

ParDo 转换和 FlatMap 的功能类似,只是它的 fn 参数必须是一个 DoFn。除了使用 return,我们还可以用 yield 语句来返回结果:

     
1
2
3
4
     
class SplitAndPairWithOneFn(beam.DoFn):
def process(self, element):
for word in element.split(' '):
yield (word, 1)

合并函数

合并函数(CombineFn)用来将集合数据合并计算成单个值。我们既可以对整个 PCollection 做合并(CombineGlobally),也可以计算每个键的合并结果(CombinePerKey)。Beam 会将普通函数(Callable)包装成 CombineFn,这些函数需要接收一个集合,并返回单个结果。需要注意的是,Beam 会将计算过程分发到多台服务器上,合并函数会被多次调用来计算中间结果,因此需要满足交换律和结合律。summinmax 是符合这样的要求的。

Beam 提供了许多内置的合并函数,如计数、求平均值、排序等。以计数为例,下面两种写法都可以用来统计整个 PCollection 中元素的个数:

     
1
2
     
lines | beam.combiners.Count.Globally()
lines | beam.CombineGlobally(beam.combiners.CountCombineFn())

其他合并函数可以参考 Python SDK 的官方文档(链接)。我们也可以自行实现合并函数,只需继承 CombineFn,并实现四个方法。我们以内置的 Mean 平均值合并函数的源码为例:

apache_beam/transforms/combiners.py

     
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
     
class MeanCombineFn(core.CombineFn):
def create_accumulator(self):
"""创建一个“本地”的中间结果,记录合计值和记录数。"""
return (0, 0)
def add_input(self, (sum_, count), element):
"""处理新接收到的值。"""
return sum_ + element, count + 1
def merge_accumulators(self, accumulators):
"""合并多个中间结果。"""
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, (sum_, count)):
"""计算平均值。"""
if count == 0:
return float('NaN')
return sum_ / float(count)

复合转换函数

我们简单看一下上文中使用到的 beam.combiners.Count.Globally 的源码(链接),它继承了 PTransform 类,并在 expand方法中对 PCollection 应用了转换函数。这会形成一个小型的有向无环图,并合并到最终的 DAG 中。我们称其为复合转换函数,主要用于将相关的转换逻辑整合起来,便于理解和管理。

     
1
2
3
4
     
class Count(object):
class Globally(ptransform.PTransform):
def expand(self, pcoll):
return pcoll | core.CombineGlobally(CountCombineFn())

更多内置的复合转换函数如下表所示:

复合转换函数 功能含义
Count.Globally() 计算元素总数。
Count.PerKey() 计算每个键的元素数。
Count.PerElement() 计算每个元素出现的次数,类似 Wordcount。
Mean.Globally() 计算所有元素的平均值。
Mean.PerKey() 计算每个键的元素平均值。
Top.Of(n, reverse) 获取 PCollection 中最大或最小的 n 个元素,另有 Top.Largest(n), Top.Smallest(n).
Top.PerKey(n, reverse) 获取每个键的值列表中最大或最小的 n 个元素,另有 Top.LargestPerKey(n), Top.SmallestPerKey(n)
Sample.FixedSizeGlobally(n) 随机获取 n 个元素。
Sample.FixedSizePerKey(n) 随机获取每个键下的 n 个元素。
ToList() 将 PCollection 合并成一个列表。
ToDict() 将 PCollection 合并成一个哈希表,输入数据需要是二元组集合。

时间窗口

在处理事件数据时,如访问日志、用户点击流,每条数据都会有一个 事件时间 属性,而通常我们会按事件时间对数据进行分组统计,这些分组即时间窗口。在 Beam 中,我们可以定义不同的时间窗口类型,能够支持有界和无界数据。由于 Python SDK 暂时只支持有界数据,我们就以一个离线访问日志文件作为输入源,统计每个时间窗口的记录条数。对于无界数据,概念和处理流程也是类似的。

     
1
2
3
4
5
     
64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /edit HTTP/1.1" 401 12846
64.242.88.10 - - [07/Mar/2004:16:06:51 -0800] "GET /rdiff HTTP/1.1" 200 4523
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /hsdivision HTTP/1.1" 200 6291
64.242.88.10 - - [07/Mar/2004:16:11:58 -0800] "GET /view HTTP/1.1" 200 7352
64.242.88.10 - - [07/Mar/2004:16:20:55 -0800] "GET /view HTTP/1.1" 200 5253

logmining.py 的完整源码可以在 GitHub(链接)中找到:

     
1
2
3
4
5
6
7
8
9
10
     
lines = p | 'Create' >> beam.io.ReadFromText('access.log')
windowed_counts = (
lines
| 'Timestamp' >> beam.Map(lambda x: beam.window.TimestampedValue(
x, extract_timestamp(x)))
| 'Window' >> beam.WindowInto(beam.window.SlidingWindows(600, 300))
| 'Count' >> (beam.CombineGlobally(beam.combiners.CountCombineFn())
.without_defaults())
)
windowed_counts = windowed_counts | beam.ParDo(PrintWindowFn())

首先,我们需要为每一条记录附加上时间戳。自定义函数 extract_timestamp 用以将日志中的时间 [07/Mar/2004:16:05:49 -0800] 转换成 Unix 时间戳,TimestampedValue 则会将这个时间戳和对应记录关联起来。之后,我们定义了一个大小为 10 分钟,间隔为 5 分钟 的滑动窗口(Sliding Window)。从零点开始,第一个窗口的范围是 [00:00, 00:10),第二个窗口的范围是 [00:05, 00:15),以此类推。所有窗口的长度都是 10 分钟,相邻两个窗口之间相隔 5 分钟。滑动窗口和固定窗口(Fixed Window)不同,因为相同的元素可能会落入不同的窗口中参与计算。最后,我们使用一个合并函数计算每个窗口中的记录数。通过这个方法得到前五条记录的计算结果为:

     
1
2
3
4
5
     
[2004-03-08T00:00:00Z, 2004-03-08T00:10:00Z) @ 2
[2004-03-08T00:05:00Z, 2004-03-08T00:15:00Z) @ 4
[2004-03-08T00:10:00Z, 2004-03-08T00:20:00Z) @ 2
[2004-03-08T00:15:00Z, 2004-03-08T00:25:00Z) @ 1
[2004-03-08T00:20:00Z, 2004-03-08T00:30:00Z) @ 1

在无界数据的实时计算过程中,事件数据的接收顺序是不固定的,因此需要利用 Beam 的水位线和触发器机制来处理延迟数据(Late Data)。这个话题比较复杂,而且 Python SDK 尚未支持这些特性,感兴趣的读者可以参考 Stream 101 和 102 这两篇文章。

Pipeline 运行时

上文中提到,Apache Beam 是一个数据处理标准,只提供了 SDK 和 API,因而必须使用 Spark、Flink 这样的计算引擎来运行它。下表列出了当前支持 Beam Model 的引擎,以及他们的兼容程度:

Beam 运行时能力矩阵Beam 运行时能力矩阵

这篇关于Apache Beam 快速入门(Python 版)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/479266

相关文章

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四

Python中win32包的安装及常见用途介绍

《Python中win32包的安装及常见用途介绍》在Windows环境下,PythonWin32模块通常随Python安装包一起安装,:本文主要介绍Python中win32包的安装及常见用途的相关... 目录前言主要组件安装方法常见用途1. 操作Windows注册表2. 操作Windows服务3. 窗口操作

Python中re模块结合正则表达式的实际应用案例

《Python中re模块结合正则表达式的实际应用案例》Python中的re模块是用于处理正则表达式的强大工具,正则表达式是一种用来匹配字符串的模式,它可以在文本中搜索和匹配特定的字符串模式,这篇文章主... 目录前言re模块常用函数一、查看文本中是否包含 A 或 B 字符串二、替换多个关键词为统一格式三、提

从入门到精通C++11 <chrono> 库特性

《从入门到精通C++11<chrono>库特性》chrono库是C++11中一个非常强大和实用的库,它为时间处理提供了丰富的功能和类型安全的接口,通过本文的介绍,我们了解了chrono库的基本概念... 目录一、引言1.1 为什么需要<chrono>库1.2<chrono>库的基本概念二、时间段(Durat

python常用的正则表达式及作用

《python常用的正则表达式及作用》正则表达式是处理字符串的强大工具,Python通过re模块提供正则表达式支持,本文给大家介绍python常用的正则表达式及作用详解,感兴趣的朋友跟随小编一起看看吧... 目录python常用正则表达式及作用基本匹配模式常用正则表达式示例常用量词边界匹配分组和捕获常用re

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

python删除xml中的w:ascii属性的步骤

《python删除xml中的w:ascii属性的步骤》使用xml.etree.ElementTree删除WordXML中w:ascii属性,需注册命名空间并定位rFonts元素,通过del操作删除属... 可以使用python的XML.etree.ElementTree模块通过以下步骤删除XML中的w:as