aws emr启动standalone的flink集群

2024-05-28 03:52

本文主要是介绍aws emr启动standalone的flink集群,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关键组件

  • Client,代码由客户端获取并做转换,之后提交给JobManger
  • JobManager,对作业进行中央调度管理,获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。
  • TaskManager,数据的处理操作
image-20240105171119363

在emr上自建standalone集群,文件分发脚本xsync

#!/bin/bash
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for host in $nodelist
doecho ==================== $host ====================for file in $@doif [ -e $file ]thenpdir=$(cd -P $(dirname $file); pwd)fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done

先进入节点中创建flink文件(root用户)

#!/bin/bash
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for node in $nodelist; doecho "Executing commands on $node"ssh -o StrictHostKeyChecking=no $node "sudo mkdir -p /usr/lib/flink/sudo chown -R hadoop:hadoop /usr/lib/flink"
done

将/usr/lib/flink/同步到其他节点上

xsync /usr/lib/flink
xsync /etc/flink/conf.dist

重新软连接

#!/bin/bash
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for node in $nodelist; doecho "Executing commands on $node"ssh -o StrictHostKeyChecking=no $node "sudo rm /usr/lib/flink/confsudo ln -s /etc/flink/conf.dist/ /usr/lib/flink/conf"
done

直接启动失败,需要指定workfile

No workers file. Please specify workers in 'conf/workers'.

root用户下写入workers配置

yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}' > /usr/lib/flink/conf/workersecho $(hostname).cn-north-1.compute.internal:8081 > /usr/lib/flink/conf/masters

jobmanager配置

# JobManager 节点地址.
jobmanager.rpc.address: $(hostname).cn-north-1.compute.internal
jobmanager.bind-host: 0.0.0.0 # default
rest.address: 0.0.0.0 # webui
rest.bind-address: 0.0.0.0

taskmanager配置

echo taskmanager.host: $(hostname).cn-north-1.compute.internal >> /usr/lib/flink/conf/flink-conf.yaml
echo taskmanager.bind-host: 0.0.0.0 >> /usr/lib/flink/conf/flink-conf.yaml

修改权限

sudo chown -R hadoop:hadoop /var/lib/flink

启动集群

  • 不知道为什么worker节点始终无法启动taskmanagerrunner,最终发现没有权限

  • 这个路径是提供webui上传jar文件用的

    mkdir: cannot create directory ‘/var/run/flink’: Permission denied
    /usr/lib/flink/bin/flink-daemon.sh: line 82: /var/run/flink: No such file or directory
    flock: 200: Bad file descriptor
    Starting taskexecutor daemon on host ip-192-168-28-247.
    /usr/lib/flink/bin/flink-daemon.sh: line 145: /var/run/flink/flink-hadoop-taskexecutor.pid: No such file or directory
    
/usr/lib/flink/bin/start-cluster.sh

在master上查看日志

在这里插入图片描述

最终taskmanager成功注册

image-20240105201137482

关闭集群

/usr/lib/flink/bin/stop-cluster.sh

jpsall脚本

#!/bin/bash
masternode=$(hostname).cn-north-1.compute.internal
nodelist=$(yarn node -list 2> /dev/null | awk 'NR > 2 {print $1}' | awk -F: '{print $1}')
for host in $masternode $nodelist
doecho =============== $host ===============ssh $host jps
done

在yarn模式下,提交 JAR 文件后,它就会变成由 Flink JobManager 管理的作业。

  • JobManager 位于托管 Flink 会话 Application Master 进程守护程序的 YARN 节点上

集群生命周期大于job,因此实际上对应session模式

flink run --jobmanager localhost:8081 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput flink run --jobmanager localhost:8081 -c org.example.wc.WordCountBatch flinkall-1.0.0.jar

部署模式

session模式

启动flink session

  • 5.5.0 版本中添加了 flink-yarn-session 命令作为 yarn-session.sh 脚本的包装程序以简化执行
flink-yarn-session -d

image-20240105110801556

启动session后提交任务

flink run --jobmanager yarn-cluster -yid application_1704427099392_0001  /usr/lib/flink/examples/streaming/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: 8XKQF6W01QDQCRTD, Extended Request ID: Bsi6AK3alrxV5OYL5aZhu05h/RusTGUBm9P9hRu5dFu0whCv68DKpvFjf8CYL9Wc5zSEoaL759M=)

可以设置region

  • 看起来是6.15版本的emr_flink存在问题,默认region不对
-Dfs.s3a.bucket.endpoint.region=cn-north-1

在resourcemanager的Tracking UI会跳转到flink jobmanager

但是flink的taskmanager日志并没有汇聚到yarn历史服务器中

查看session fluster的ip和端口号

image-20240105222430792

在这里插入图片描述

或者直接在提交jar界面看ip和端口地址,监听的是内网ip

  • 找到后可以在idea中注册

per-job模式

直接运行batch任务

flink run -m yarn-cluster -Dexecution.runtime-mode=BATCH flinktutorial17-1.0.jar

不同类型任务的name

image-20240105155322448

application模式

提交任务

  • 将jar拷贝到/usr/lib/flink/lib

  • 指定作业入口类,脚本会到 lib 目录扫描所有的 jar 包

/usr/lib/flink/bin/standalone-job.sh start --job-classname org.example.wc.WordCountBatchStarting standalonejob daemon on host ip-192-168-30-184.

启动taskmanager

/usr/lib/flink/bin/taskmanager.sh start

在application模式下,yarn中同样没有记录

查看jps进程

image-20240105220805065

在master上查看日志

image-20240105220010908

yarn运行模式

yarn运行模式下同样可以使用三种部署模式

session模式

flink-yarn-session -d命令参数

  • Flink1.11.0 版本不再使用-n 参数和-s 参数分别指定 TaskManager 数量和 slot 数量,YARN 会按照需求动态分配 TaskManager 和 slot
-d:分离模式
-jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB
-nm(--name):配置在 YARN UI 界面上显示的任务名
-qu(--queue):指定 YARN 队列名
-tm(--taskManager):配置每个 TaskManager 所使用内存。

per-job模式

提交任务

/usr/lib/flink/bin/flink run -t yarn-per-job -c org.example.wc.WordCountBatch flinkall-1.0.0.jar-t,--target <arg>     The deployment target for the given application,which is equivalent to the "execution.target" configoption. For the "run" action the currently availabletargets are: "remote", "local", "kubernetes-session","yarn-per-job" (deprecated), "yarn-session". For the"run-application" action the currently availabletargets are: "kubernetes-application","yarn-application".

如下报错的解决

  • 在 flink 的/opt/module/flink-1.17.0/conf/flink-conf.yaml 配置文件中设置classloader.check-leaked-classloader: false

image-20240105223157373

application模式

提交任务

/usr/lib/flink/bin/flink run-application -t yarn-application -c org.example.wc.WordCountBatch flinkall-1.0.0.jar/usr/lib/flink/bin/flink run-application -t yarn-application s3://zhaojiew-bigdata/app/WordCount.jar --input s3://zhaojiew-tmp/shakespeare/ --output s3://zhaojiew-tmp/flinkoutput 

可以看到print输出此时在jobmanager中

image-20240105223507271

但是看起来并不支持将jar存储在s3?已知问题,怀疑和flink本身有关,因为使用s3作为input和output是没有问题的

在这里插入图片描述

可以在提交任务时指定依赖,并非任务jar

bin/flink run-application -t yarnapplication -Dyarn.provided.lib.dirs="hdfs://hadoop102:8020/flinkdist" -c com.atguigu.wc.SocketStreamWordCount
hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar

这篇关于aws emr启动standalone的flink集群的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1009468

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

用命令行的方式启动.netcore webapi

用命令行的方式启动.netcore web项目 进入指定的项目文件夹,比如我发布后的代码放在下面文件夹中 在此地址栏中输入“cmd”,打开命令提示符,进入到发布代码目录 命令行启动.netcore项目的命令为:  dotnet 项目启动文件.dll --urls="http://*:对外端口" --ip="本机ip" --port=项目内部端口 例: dotnet Imagine.M

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包, 通常我们会使用nohup直接启动,但是还是需要手动停止然后再次启动, 那如何更优雅的在服务器上启动jar包呢,让我们一起探讨一下吧。 1、初版 第一个版本是常用的做法,直接使用nohup后台启动jar包, 并将日志输出到当前文件夹n