本文主要是介绍flink per job on yarn 找不到或无法加载主类org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前言
最近一直忙着规则引擎项目的开发。目前规则引擎项目基于flink流式计算来开发的。对于用户配置的规则要动态生成flink job 并自动化的发布到yarn上进行执行。考虑到将多个flink job共享一个yarn session势必会造成资源的争夺以及相互影响,觉得发布方式采用单个flink job直接发布到yarn上,作为单独的application在yarn上执行。针对这种方式,目前flink没有对外提供restful方式,所以没办法,只能研究flink 发布到yarn的代码。利用flink的源码来进行改造,对外提供restflu,这样对任务的管理非常方便。目前开发工作正在进行中。对于flink发布到yarn的方式可以查看flink 发布方式
问题:找不到或无法加载主类 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
最近通过restful将flink job发布到yarn上报如下错误:
错误: 找不到或无法加载主类 org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
一看就是classpath的问题,但是我已经在yarn-site.xml中配置yarn.application.classpath的属性,并且我已将相关依赖的jar放入到集群classpath中。折腾了半天还是搞不定,怎么办?那只有一个办法了:根据执行的日志,看源码。只有将hadoop的源码下载下来开始看源码。我本人测试环境使用的hadoop版本是2.4.1.。打开日志,看到产生错误的地方在DefaultContainerExecutor这个类的launchContainer方法中。
2020-04-21 14:02:35,300 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1587448653036_0001_01_000001 transitioned from LOCALIZING to LOCALIZED
2020-04-21 14:02:35,414 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1587448653036_0001_01_000001 transitioned from LOCALIZED to RUNNING
2020-04-21 14:02:35,463 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: launchContainer: [nice, -n, 0, bash, /home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587448653036_0001/container_1587448653036_0001_01_000001/default_container_executor.sh]
2020-04-21 14:02:35,700 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1587448653036_0001_01_000001 is : 1
2020-04-21 14:02:35,703 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_1587448653036_0001_01_000001 and exit code: 1
org.apache.hadoop.util.Shell$ExitCodeException:at org.apache.hadoop.util.Shell.runCommand(Shell.java:505)at org.apache.hadoop.util.Shell.run(Shell.java:418)at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:300)at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)
2020-04-21 14:02:35,706 INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor:
2020-04-21 14:02:35,710 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Container exited with a non-zero exit code 1
2020-04-21 14:02:35,712 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1587448653036_0001_01_000001 transitioned from RUNNING to EXITED_WITH_FAILURE
DefaultContainerExecutor这个类的launchContainer方法如下:
@Overridepublic int launchContainer(Container container,Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,String userName, String appId, Path containerWorkDir,List<String> localDirs, List<String> logDirs) throws IOException {LOG.info("launchContainer开始执行,appId =>"+appId+",containerWorkDir =》"+containerWorkDir.toString());FsPermission dirPerm = new FsPermission(APPDIR_PERM);ContainerId containerId = container.getContainerId();// 获取ContainerIdString containerIdStr = ConverterUtils.toString(containerId);//获取应用IDString appIdStr =ConverterUtils.toString(containerId.getApplicationAttemptId().getApplicationId());//循环创建本地资源工作目录for (String sLocalDir : localDirs) {Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);Path userdir = new Path(usersdir, userName);Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);Path appDir = new Path(appCacheDir, appIdStr);Path containerDir = new Path(appDir, containerIdStr);createDir(containerDir, dirPerm, true);}//创建container的日志目录createContainerLogDirs(appIdStr, containerIdStr, logDirs);Path tmpDir = new Path(containerWorkDir,YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);createDir(tmpDir, dirPerm, false);// copy launch script to work dirPath launchDst =new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);lfs.util().copy(nmPrivateContainerScriptPath, launchDst);// copy container tokens to work dirPath tokenDst =new Path(containerWorkDir, ContainerLaunch.FINAL_CONTAINER_TOKENS_FILE);lfs.util().copy(nmPrivateTokensPath, tokenDst);// Create new local launch wrapper scriptLocalWrapperScriptBuilder sb = Shell.WINDOWS ?new WindowsLocalWrapperScriptBuilder(containerIdStr, containerWorkDir) :new UnixLocalWrapperScriptBuilder(containerWorkDir);// Fail fast if attempting to launch the wrapper script would fail due to// Windows path length limitation.if (Shell.WINDOWS &&sb.getWrapperScriptPath().toString().length() > WIN_MAX_PATH) {throw new IOException(String.format("Cannot launch container using script at path %s, because it exceeds " +"the maximum supported path length of %d characters. Consider " +"configuring shorter directories in %s.", sb.getWrapperScriptPath(),WIN_MAX_PATH, YarnConfiguration.NM_LOCAL_DIRS));}Path pidFile = getPidFilePath(containerId);if (pidFile != null) {sb.writeLocalWrapperScript(launchDst, pidFile);} else {LOG.info("Container " + containerIdStr+ " was marked as inactive. Returning terminated error");return ExitCode.TERMINATED.getExitCode();}File sourceFile = new File(launchDst.toUri().getPath().toString());File directFile = new File("/tmp/hadoop/"+appId+"/"+sourceFile.getParent());if(!directFile.exists()){directFile.mkdirs();}File dst = new File(directFile.getPath()+"/launch_container.sh");Files.copy(sourceFile,dst);LOG.info("launch_container脚本拷贝完成");// create log dir under app// fork scriptShellCommandExecutor shExec = null;try {lfs.setPermission(launchDst,ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);lfs.setPermission(sb.getWrapperScriptPath(),ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);// Setup command to runString[] command = getRunCommand(sb.getWrapperScriptPath().toString(),containerIdStr, this.getConf());LOG.info("launchContainer: " + Arrays.toString(command));String copyPathForShellScript = "/tmp/hadoop/"+appId;LOG.info("执行脚本拷贝后的路径为:"+copyPathForShellScript);File newpaths = new File(copyPathForShellScript);if(!newpaths.exists()){newpaths.mkdirs();}File shellFile = new File(copyPathForShellScript+"/default_container_executor.sh");File oldPaths = new File(command[command.length-1]);Files.copy(oldPaths,shellFile);if(container.getLaunchContext().getEnvironment() != null){LOG.info("执行脚本中的环境变量如下:");for (Map.Entry<String, String> stringStringEntry : container.getLaunchContext().getEnvironment().entrySet()) {LOG.info("===变量名称=》"+stringStringEntry.getKey()+",变量值=》"+stringStringEntry.getValue());}}shExec = new ShellCommandExecutor(command,new File(containerWorkDir.toUri().getPath()),container.getLaunchContext().getEnvironment()); // sanitized envif (isContainerActive(containerId)) {shExec.execute();}else {LOG.info("Container " + containerIdStr +" was marked as inactive. Returning terminated error");return ExitCode.TERMINATED.getExitCode();}} catch (IOException e) {e.printStackTrace(System.out);if (null == shExec) {return -1;}int exitCode = shExec.getExitCode();LOG.warn("Exit code from container " + containerId + " is : " + exitCode);// 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was// terminated/killed forcefully. In all other cases, log the// container-executor's outputif (exitCode != ExitCode.FORCE_KILLED.getExitCode()&& exitCode != ExitCode.TERMINATED.getExitCode()) {LOG.warn("Exception from container-launch with container ID: "+ containerId + " and exit code: " + exitCode , e);logOutput(shExec.getOutput());String diagnostics = "Exception from container-launch: "+ e + "\n"+ StringUtils.stringifyException(e) + "\n" + shExec.getOutput();container.handle(new ContainerDiagnosticsUpdateEvent(containerId,diagnostics));} else {container.handle(new ContainerDiagnosticsUpdateEvent(containerId,"Container killed on request. Exit code is " + exitCode));}return exitCode;} finally {; //}return 0;}
这个方法干了如下几件事情:
- 创建本节点的资源存放目录(包括日志目录)以及设置对应的工作目录;
- 在工作目录下生成default_container_executor.sh 脚本;
- 在工作目录下生成launch_container.sh脚本;
- 执行default_container_executor.sh 脚本;
- 最后如果执行没有任何异常,则直接返回0;
到这里,我比较好奇的是default_container_executor.sh与launch_container.sh这2个脚本执行的是什么内容,然而集群环境下,任务执行失败之后会将这2个脚本文件进行清除,无法查看里面内容,于是灵机一动,修改源代码对文件进行拷贝,修改完后进行编译打包(hadoop源码打包编译可查看https://blog.csdn.net/impler/article/details/80787156 感觉不错),替换掉hadoop-yarn-server-nodemanager-2.4.1这个包,重启集群,提交job,得到这以上2个脚本的内容,内容如下:
default_container_executor.sh脚本内容如下:
#!/bin/bashecho $$ > /home/hadoop/tmp/nm-local-dir/nmPrivate/container_1587521447973_0002_01_000001.pid.tmp
/bin/mv -f /home/hadoop/tmp/nm-local-dir/nmPrivate/container_1587521447973_0002_01_000001.pid.tmp /home/hadoop/tmp/nm-local-dir/nmPrivate/container_1587521447973_0002_01_000001.pid
exec setsid /bin/bash "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/container_1587521447973_0002_01_000001/launch_container.sh"
通过查看脚本发现,default_container_executor.sh脚本干了2件事情:将本脚本执行的进程id写入到文件中与调用launch_container.sh脚本,并将pid设置成launch_container.sh执行进程的组ID.
我们再看launch_container.sh这个脚本干了什么事情。脚本如下:
#!/bin/bashexport HADOOP_CONF_DIR="/home/hadoop/hadoop-2.4.1/etc/hadoop"
export MAX_APP_ATTEMPTS="1"
export _SLOTS="8"
export JAVA_HOME="/usr/local/jdk1.8.0_161"
export _CLIENT_HOME_DIR="hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin"
export APP_SUBMIT_TIME_ENV="1587521493874"
export NM_HOST="SY-SUZ-SRV119.suiyi.com.cn"
export _APP_ID="application_1587521447973_0002"
export HADOOP_USER_NAME="dwadmin"
export HADOOP_HDFS_HOME="/home/hadoop/hadoop-2.4.1"
export LOGNAME="dwadmin"
export JVM_PID="$$"
export _DETACHED="true"
export PWD="/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/container_1587521447973_0002_01_000001"
export HADOOP_COMMON_HOME="/home/hadoop/hadoop-2.4.1"
export LOCAL_DIRS="/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002"
export APPLICATION_WEB_PROXY_BASE="/proxy/application_1587521447973_0002"
export _DYNAMIC_PROPERTIES=""
export NM_HTTP_PORT="8042"
export LOG_DIRS="/home/hadoop/data/log/application_1587521447973_0002/container_1587521447973_0002_01_000001"
export _CLIENT_SHIP_FILES="log4j.properties=hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/log4j.properties,lib\flink-shaded-hadoop-2-uber-2.8.3-10.0.jar=hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar,lib\log4j-1.2.17.jar=hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/lib/log4j-1.2.17.jar,lib\my-flink-dist_2.12-1.8.0.jar=hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/lib/my-flink-dist_2.12-1.8.0.jar,lib\slf4j-log4j12-1.7.15.jar=hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/lib/slf4j-log4j12-1.7.15.jar,1f218-202004151215.jar=hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/1f218-202004151215.jar,flink-conf.yaml=hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/application_1587521447973_0002-flink-conf.yaml6999094645340378629.tmp,"
export NM_PORT="49470"
export USER="dwadmin"
export HADOOP_YARN_HOME="/home/hadoop/hadoop-2.4.1"
export CLASSPATH="1f218-202004151215.jar;lib\flink-shaded-hadoop-2-uber-2.8.3-10.0.jar;lib\log4j-1.2.17.jar;lib\my-flink-dist_2.12-1.8.0.jar;lib\slf4j-log4j12-1.7.15.jar;log4j.properties;flink.jar;flink-conf.yaml;job.graph;;${HADOOP_HOME}/etc/hadoop;${HADOOP_HOME}/share/hadoop/common/lib/*;${HADOOP_HOME}/share/hadoop/common/*;${HADOOP_HOME}/share/hadoop/hdfs/*;${HADOOP_HOME}/share/hadoop/hdfs/lib/*;${HADOOP_HOME}/share/hadoop/yarn/lib/*;${HADOOP_HOME}/share/hadoop/yarn/*;${HADOOP_HOME}/share/hadoop/mapreduce/lib/*;${HADOOP_HOME}/share/hadoop/mapreduce/*;${HADOOP_HOME}/contrib/capacity-scheduler/*"
export _FLINK_YARN_FILES="hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002"
export _ZOOKEEPER_NAMESPACE="application_1587521447973_0002"
export HADOOP_TOKEN_FILE_LOCATION="/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/container_1587521447973_0002_01_000001/container_tokens"
export _FLINK_JAR_PATH="hdfs://sy-suz-srv116.suiyi.com.cn:8020/user/dwadmin/.flink/application_1587521447973_0002/flink-yarn_2.11-1.8.3.jar"
export _FLINK_CLASSPATH="1f218-202004151215.jar;lib\flink-shaded-hadoop-2-uber-2.8.3-10.0.jar;lib\log4j-1.2.17.jar;lib\my-flink-dist_2.12-1.8.0.jar;lib\slf4j-log4j12-1.7.15.jar;log4j.properties;flink.jar;flink-conf.yaml;job.graph;"
export _CLIENT_TM_COUNT="1"
export _CLIENT_TM_MEMORY="2048"
export HOME="/home/"
export CONTAINER_ID="container_1587521447973_0002_01_000001"
export MALLOC_ARENA_MAX="4"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/13/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" "lib\flink-shaded-hadoop-2-uber-2.8.3-10.0.jar"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/14/log4j-1.2.17.jar" "lib\log4j-1.2.17.jar"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/11/application_1587521447973_0002-flink-conf.yaml6999094645340378629.tmp" "flink-conf.yaml"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/16/application_1587521447973_00024033991992159005903.tmp" "job.graph"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/12/my-flink-dist_2.12-1.8.0.jar" "lib\my-flink-dist_2.12-1.8.0.jar"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/10/slf4j-log4j12-1.7.15.jar" "lib\slf4j-log4j12-1.7.15.jar"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/17/log4j.properties" "log4j.properties"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/15/1f218-202004151215.jar" "1f218-202004151215.jar"
ln -sf "/home/hadoop/tmp/nm-local-dir/usercache/dwadmin/appcache/application_1587521447973_0002/filecache/18/flink-yarn_2.11-1.8.3.jar" "flink.jar"
exec /bin/bash -c "$JAVA_HOME/bin/java -Xms424m -Xmx424m -Dlog.file="/home/hadoop/data/log/application_1587521447973_0002/container_1587521447973_0002_01_000001/jobmanager.log" -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 1> /home/hadoop/data/log/application_1587521447973_0002/container_1587521447973_0002_01_000001/jobmanager.out 2> /home/hadoop/data/log/application_1587521447973_0002/container_1587521447973_0002_01_000001/jobmanager.err"
看了脚本之后,launch_container.sh干了以下几件事情:
- 设置环境变量;
- 为资源文件创建软连接;
- 启动java进程并设置相关参数;
看到脚本的最后一行,我心中暗暗窃喜,原来nodemanger是通过sh脚本启动一个AM,并且是通过java命令的方式。对于开头提到的问题,突然有了思路:能否在脚本的最后一行添加指定的classpath呢?答案是肯定的,但是怎么去设置呢?因为脚本是动态生成。脚本生成的源材料就是我的客户端传递过去的。好办,修改我的client。我们先看client的关键点:ContainerLaunchContext这个类,之前配置了启动参数class值为org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint,我只有修改class的值就可以了,最后将class的值修改为:
startCommandValues.put("class", "-classpath '.:${HADOOP_HOME}/etc/hadoop/*:${HADOOP_HOME}/share/hadoop/yarn/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/*:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/mapreduce/lib/*:${HADOOP_HOME}/share/hadoop/tools/lib/*:${HADOOP_HOME}/share/hadoop/yarn/*:${HADOOP_HOME}/share/hadoop/yarn/lib/*' "+yarnClusterEntrypoint);
重新启动客户端,重新提交任务,搞定!!!!折腾了一天的问题终于得到了解决!!!
(ps:对于如何写Yarn Application 可以查看这里)
如果有问题,欢迎加我微信,一起讨论。
只有一起交流再会有所收获,只有一起探讨,才会走得更远!!
这篇关于flink per job on yarn 找不到或无法加载主类org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!