最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?

2023-11-23 06:50

本文主要是介绍最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

自Amazon EMR推出Serverlesss形态以来,得益于开箱即用和零运维的优质特性,越来越多的EMR用户开始尝试EMR Serverless。在使用过程中,一个常被提及的问题是:我们应该如何在EMR Serverless上提交Spark/Hive作业?本文我们将分享一些这方面的最佳实践,帮助大家以一种更优雅的方式使用这项服务。

一份通俗易懂的讲解最好配一个形象生动的例子,本文选择《CDC一键入湖:当 Apache Hudi DeltaStreamer 遇见 Serverless Spark》一文介绍的DeltaStreamer作业作为讲解示例,因为这个作业既有一定的通用性又足够复杂,可以涵盖大多数EMR Serverless作业遇到的场景,更重要的是,该作业的提交方式遵循了本文要介绍的各项最佳实践(本文是其姊妹篇)。不了解Apache Hudi的读者不必担心,本文的关注点在于如何提交EMR Serverless作业本身,而非DeltaStreamer的技术细节,所以不会影响到您阅读此文。

参考范本

首先,我们整理一下提交DeltaStreamer CDC作业的几项关键操作,下文会以这些脚本为例,介绍蕴含其中的各项最佳实践。

1. 导出环境相关变量

export APP_NAME='apache-hudi-delta-streamer'
export APP_S3_HOME='s3://apache-hudi-delta-streamer'
export APP_LOCAL_HOME='/home/ec2-user/apache-hudi-delta-streamer'
export EMR_SERVERLESS_APP_ID='00fbfel40ee59k09'
export EMR_SERVERLESS_EXECUTION_ROLE_ARN='arn:aws:iam::123456789000:role/EMR_SERVERLESS_ADMIN'

2. 创建作业专属工作目录和S3存储桶

mkdir -p $APP_LOCAL_HOME
aws s3 mb $APP_S3_HOME

3. 准备作业描述文件

cat << EOF > $APP_LOCAL_HOME/start-job-run.json
{"name":"apache-hudi-delta-streamer","applicationId":"$EMR_SERVERLESS_APP_ID","executionRoleArn":"$EMR_SERVERLESS_EXECUTION_ROLE_ARN","jobDriver":{"sparkSubmit":{"entryPoint":"/usr/lib/hudi/hudi-utilities-bundle.jar","entryPointArguments":["--continuous","--enable-sync","--table-type", "COPY_ON_WRITE","--op", "UPSERT","--target-base-path", "$APP_S3_HOME/data/mysql-server-3/inventory/orders","--target-table", "orders","--min-sync-interval-seconds", "60","--source-class", "org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource","--source-ordering-field", "_event_origin_ts_ms","--payload-class", "org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload","--hoodie-conf", "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS","--hoodie-conf", "schema.registry.url=$SCHEMA_REGISTRY_URL","--hoodie-conf", "hoodie.deltastreamer.schemaprovider.registry.url=${SCHEMA_REGISTRY_URL}/subjects/osci.mysql-server-3.inventory.orders-value/versions/latest","--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer","--hoodie-conf", "hoodie.deltastreamer.source.kafka.topic=osci.mysql-server-3.inventory.orders","--hoodie-conf", "auto.offset.reset=earliest","--hoodie-conf", "hoodie.datasource.write.recordkey.field=order_number","--hoodie-conf", "hoodie.datasource.write.partitionpath.field=order_date","--hoodie-conf", "hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor","--hoodie-conf", "hoodie.datasource.write.hive_style_partitioning=true","--hoodie-conf", "hoodie.datasource.hive_sync.database=inventory","--hoodie-conf", "hoodie.datasource.hive_sync.table=orders","--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=order_date"],"sparkSubmitParameters":"--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory --conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')"}},"configurationOverrides":{"monitoringConfiguration":{"s3MonitoringConfiguration":{"logUri":"$APP_S3_HOME/logs"}}}
}
EOF
jq . $APP_LOCAL_HOME/start-job-run.json

4. 提交作业

export EMR_SERVERLESS_JOB_RUN_ID=$(aws emr-serverless start-job-run \--no-paginate --no-cli-pager --output text \--name apache-hudi-delta-streamer \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--execution-timeout-minutes 0 \--cli-input-json file://$APP_LOCAL_HOME/start-job-run.json \--query jobRunId)

5. 监控作业

now=$(date +%s)sec
while true; dojobStatus=$(aws emr-serverless get-job-run \--no-paginate --no-cli-pager --output text \--application-id $EMR_SERVERLESS_APP_ID \--job-run-id $EMR_SERVERLESS_JOB_RUN_ID \--query jobRun.state)if [ "$jobStatus" = "PENDING" ] || [ "$jobStatus" = "SCHEDULED" ] || [ "$jobStatus" = "RUNNING" ]; thenfor i in {0..5}; doecho -ne "\E[33;5m>>> The job [ $EMR_SERVERLESS_JOB_RUN_ID ] state is [ $jobStatus ], duration [ $(date -u --date now-$now +%H:%M:%S) ] ....\r\E[0m"sleep 1doneelseecho -ne "The job [ $EMR_SERVERLESS_JOB_RUN_ID ] is [ $jobStatus ]\n\n"breakfi
done

6. 检查错误

JOB_LOG_HOME=$APP_LOCAL_HOME/log/$EMR_SERVERLESS_JOB_RUN_ID
rm -rf $JOB_LOG_HOME && mkdir -p $JOB_LOG_HOME
aws s3 cp --recursive $APP_S3_HOME/logs/applications/$EMR_SERVERLESS_APP_ID/jobs/$EMR_SERVERLESS_JOB_RUN_ID/ $JOB_LOG_HOME >& /dev/null
gzip -d -r -f $JOB_LOG_HOME >& /dev/null
grep --color=always -r -i -E 'error|failed|exception' $JOB_LOG_HOME

最佳实践 (1):提取环境相关信息集中配置,提升脚本可移植性

※ 此项最佳实践参考《参考范本:1. 导出环境相关变量》

在EMR Serverless的作业脚本中经常会出现与AWS账号和本地环境有关的信息,例如资源的ARN,各种路径等,当我们要在不同环境(如开发、测试或生产)中提交作业时,就需要查找和替换这些环境相关的信息。为了让脚本具备良好的可移植性,推荐的做法是将这些信息抽离出来,以全局变量的形式集中配置,这样,当在一个新环境(新的AWS账号或服务器)中提交作业时,只需修改这些变量即可,而不是具体的脚本。

最佳实践 (2):为作业创建专用的工作目录和S3存储桶

※ 此项最佳实践参考《参考范本:2. 创建作业专属工作目录和S3存储桶》

为一个作业或应用程序创建专用的工作目录和S3存储桶是一个良好的规范和习惯。一方面,将本作业/应用的所有“资源”,包括:脚本、配置文件、依赖包、日志以及产生的数据统一存放在有利于集中管理和维护,如果要在Linux和S3上给作业赋予读写权限,操作起来了也会简单一些。

最佳实践 (3):使用作业描述文件规避字符转义问题

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

我们通常见到的EMR Serverless作业提交示例是将作业描述以字符串参数形式传递给命令行的,就像下面这样:

aws emr-serverless start-job-run \--application-id $EMR_SERVERLESS_APP_ID \--execution-role-arn $EMR_SERVERLESS_EXECUTION_ROLE_ARN \--job-driver '{"sparkSubmit": {"entryPoint": "s3://us-east-1.elasticmapreduce/emr-containers/samples/wordcount/scripts/wordcount.py","entryPointArguments": ["s3://my-job-bucket/output"]}}'

这种方式只能应对简单的作业提交,当作业中包含大量参数和变量时,很容易出现单引号、双引号、美元符等特殊字符的转义问题,由于这里牵涉shell字符串和json字符串的双重嵌套和解析,所以会非常麻烦。此时在命令行中给出作业描述是很不明智的,更好的做法是:使用cat命令联合heredoc来创建作业描述文件,然后在命令行中以--cli-input-json file://xxx.json形式将作业描述传递给命令行:

# 生成作业描述文件
cat << EOF > xxx.json... ...... ...... ...   
EOF
# 使用作业描述文件提交作业
aws emr-serverless start-job-run ... --cli-input-json file://xxx.json ...

这是一个非常重要的技巧,使用这种形式提交作业有如下两个好处:

  • 在cat + heredoc中编辑的文本为原生字符串,不用考虑字符转义问题

  • 在cat + heredoc中可嵌入shell变量、函数调用和if…else等结构体,实现“动态”构建作业描述文件

最佳实践 (4):在作业描述文件中嵌入shell变量和脚本片段,实现“动态”构建

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

如上所述,采用cat + heredoc编辑作业描述文件后,可以在编辑文件的过程中嵌入shell变量、函数调用和if...else...等复合结构体,使得我们可以动态构建作业描述文件,这是非常重要的一个能力。在《参考范本:3. 准备作业描述文件》中有一个很好的例证,就是“动态拼接依赖Jar包的路径”:

--conf spark.jars=$(aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//')

这是在构建作业描述文件start-job-run.json的过程中通过$(....)嵌入的一段shell脚本,这段脚本遍历了指定目录下的jar文件并拼接成一个字符串输出出来,而输出的字符串会在嵌入脚本的地方变成文本的一部分,我们还可以在编辑文本时调用shell函数,嵌入if...else...whilecase等多重复合逻辑结构,让作业描述文件可以根据不同的参数和条件动态生成期望的内容,这种灵活性足以让开发者应对任何复杂的情况。

最佳实践 (5):使用jq校验并格式化作业描述文件

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

jq是一个处理json文件的命令行工具,对于AWS CLI来说,jq可以说是一个“最佳伴侣”。原因是使用AWS CLI创建资源时,除了传入常规参数之外,还可以通过--cli-input-json参数传入一个json文件来描述所要创建的资源。当创建的资源配置过于复杂时,json文件的优势就会凸显出来,就像我们参考范本中的这个EMR Serverless Job一样。所以,使用AWS CLI时经常有编辑和操作json文件的需求,此时jq就成为了一个强有力的辅助工具。在参考范本中,我们仅仅使用jq打印了一下生成的作业描述文件:

jq . $APP_LOCAL_HOME/start-job-run.json

这一步操作有两个作用:一是利用jq校验了json文件,这能帮助排查文件中的json格式错误,二是jq输出的json经过了格式化和语法着色,更加易读。

其实jq在AWS CLI上还有更多高级应用,只是在我们的参考范本中并没有体现出来。在某些情况下,我们可以通过jq直接检索和编辑作业描述文件,将jq和使用cat + heredoc的json编辑方式结合起来,可以创建更加复杂和动态化的作业描述文件。

最佳实践 (6):可复用的依赖Jar包路径拼接脚本

※ 此项最佳实践参考《参考范本:3. 准备作业描述文件》

拼接依赖Jar包路径几乎是每个作业都要解决的问题,手动拼接虽然可行,但费力且容易出错。过去在本地环境中,我们可以使用:--jars $(echo /path/*.jar | tr ' ' ',')这种简洁而优雅的方式拼接Jar包路径。但是EMR Serverless作业的依赖Jar包是存放在S3上的,这此,我们针对性地编写了一段可复用的脚本来拼接位于S3指定目录下的Jar包路径,供大家参考(请注意替换脚本中出现的两处文件夹路径):

aws s3 ls $APP_S3_HOME/jars/ | grep -o '\S*\.jar$'| awk '{print "'"$APP_S3_HOME/jars/"'"$1","}' | tr -d '\n' | sed 's/,$//'

最佳实践 (7):可复用的作业监控脚本

※ 此项最佳实践参考《参考范本:5. 监控作业》

使用命令行提交EMR Serverless作业后,用户可以转到AWS控制台上查看作业的状态,但是对开发者来说,这种切换会分散注意力,最完美的方式莫过于提交作业后继续在命令行窗口监控作业状态,直到其失败或成功运行。为此,《参考范本:5. 监控作业》给出了一种实现,可复用于所有EMR Serverless作业,供大家参考。

最佳实践 (8):可复用的日志错误信息检索脚本

※ 此项最佳实践参考《参考范本:6. 检查错误》

在日常开发中,“提交作业报错 -> 查看日志中的报错信息 -> 修改代码重新提交”是一个反复迭代的过程,在EMR Serverless中,用户需要切换到AWS控制台查看错误日志,并且有时日志量会非常大,在控制台上查看效率很低。一种更高效的做法是:将存放于S3上的日志文件统一下载到本地并解压,然后使用grep命令快速检索日志中含有error, failed, exception等关键字的行,然后再打开具体文件仔细查看。将这些动作脚本化后,我们就能得到一段可复用的日志错误信息检索脚本,对于调试和排查错误有很大的帮助。为此,《参考范本:6. 检查错误》给出了一种实现,可复用于所有EMR Serverless作业,供大家参考。

这篇关于最佳实践:如何优雅地提交一个 Amazon EMR Serverless 作业?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/416143

相关文章

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

在C#中获取端口号与系统信息的高效实践

《在C#中获取端口号与系统信息的高效实践》在现代软件开发中,尤其是系统管理、运维、监控和性能优化等场景中,了解计算机硬件和网络的状态至关重要,C#作为一种广泛应用的编程语言,提供了丰富的API来帮助开... 目录引言1. 获取端口号信息1.1 获取活动的 TCP 和 UDP 连接说明:应用场景:2. 获取硬

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

Linux中Curl参数详解实践应用

《Linux中Curl参数详解实践应用》在现代网络开发和运维工作中,curl命令是一个不可或缺的工具,它是一个利用URL语法在命令行下工作的文件传输工具,支持多种协议,如HTTP、HTTPS、FTP等... 目录引言一、基础请求参数1. -X 或 --request2. -d 或 --data3. -H 或

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke

轻松掌握python的dataclass让你的代码更简洁优雅

《轻松掌握python的dataclass让你的代码更简洁优雅》本文总结了几个我在使用Python的dataclass时常用的技巧,dataclass装饰器可以帮助我们简化数据类的定义过程,包括设置默... 目录1. 传统的类定义方式2. dataclass装饰器定义类2.1. 默认值2.2. 隐藏敏感信息

Go信号处理如何优雅地关闭你的应用

《Go信号处理如何优雅地关闭你的应用》Go中的优雅关闭机制使得在应用程序接收到终止信号时,能够进行平滑的资源清理,通过使用context来管理goroutine的生命周期,结合signal... 目录1. 什么是信号处理?2. 如何优雅地关闭 Go 应用?3. 代码实现3.1 基本的信号捕获和优雅关闭3.2

C#如何优雅地取消进程的执行之Cancellation详解

《C#如何优雅地取消进程的执行之Cancellation详解》本文介绍了.NET框架中的取消协作模型,包括CancellationToken的使用、取消请求的发送和接收、以及如何处理取消事件... 目录概述与取消线程相关的类型代码举例操作取消vs对象取消监听并响应取消请求轮询监听通过回调注册进行监听使用Wa

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp