Flinkx启动流程-整体理解

2023-11-11 21:58

本文主要是介绍Flinkx启动流程-整体理解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 先看启动脚本

在bin/flinkx的内容

set -e
export FLINKX_HOME="$(cd "`dirname "$0"`"/..; pwd)"
# Find the java binary
if [ -n "${JAVA_HOME}" ]; thenJAVA_RUN="${JAVA_HOME}/bin/java"
elseif [ `command -v java` ]; thenJAVA_RUN="java"elseecho "JAVA_HOME is not set" >&2exit 1fi
fi
JAR_DIR=$FLINKX_HOME/lib/*
CLASS_NAME=com.dtstack.flinkx.launcher.Launcher
# 参数1.java的命令
# -cp就是classpath :cp解释https://zhuanlan.zhihu.com/p/214093661
# -cp $JAR_DIR $CLASS_NAME 就是指定运行哪个jar下的哪个主类
# $@为传递的参数
# &为后端运行
echo "flinkx starting ..."
nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &
  1. 先导入flinkx的环境
  2. 然后是java的二进制文件
  3. 指定flinkx相关jar包和启动类
  4. nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &执行

2. 在看Launcher启动类

定位到flinkx-1.8.5\flinkx-launcher\src\main\java\com\dtstack\flinkx\launcher\Launcher.java的main方法,第95行,查看本地模式

if(mode.equals(ClusterMode.local.name())) {String[] localArgs = argList.toArray(new String[argList.size()]);com.dtstack.flinkx.Main.main(localArgs);
}

flinkx本地模式启动实际调用的是 com.dtstack.flinkx.Main.main方法,进去看看

public static void main(String[] args) throws Exception {//  获取传递的参数com.dtstack.flinkx.options.Options options = new OptionParser(args).getOptions();String job = options.getJob();  // -jobString jobIdString = options.getJobid(); // Flink JobString monitor = options.getMonitor();String pluginRoot = options.getPluginRoot(); // -pluginRoot String savepointPath = options.getS();Properties confProperties = parseConf(options.getConfProp()); // -flinkconf// 解析jobPath指定的任务配置json文件DataTransferConfig config = DataTransferConfig.parse(job);speedTest(config);if(StringUtils.isNotEmpty(monitor)) {config.setMonitorUrls(monitor);}if(StringUtils.isNotEmpty(pluginRoot)) {config.setPluginRoot(pluginRoot);}StreamExecutionEnvironment env = (StringUtils.isNotBlank(monitor)) ?StreamExecutionEnvironment.getExecutionEnvironment() :new MyLocalStreamEnvironment();env = openCheckpointConf(env, confProperties);configRestartStrategy(env, config);SpeedConfig speedConfig = config.getJob().getSetting().getSpeed();env.setParallelism(speedConfig.getChannel());env.setRestartStrategy(RestartStrategies.noRestart());// 得到具体的reader类名BaseDataReader dataReader = DataReaderFactory.getDataReader(config, env);DataStream<Row> dataStream = dataReader.readData();dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}// 得到具体的writer类名BaseDataWriter dataWriter = DataWriterFactory.getDataWriter(config);dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());if(env instanceof MyLocalStreamEnvironment) {if(StringUtils.isNotEmpty(savepointPath)){((MyLocalStreamEnvironment) env).setSettings(SavepointRestoreSettings.forPath(savepointPath));}}addEnvClassPath(env, ClassLoaderManager.getClassPath());// 得到执行的结果JobExecutionResult result = env.execute(jobIdString);if(env instanceof MyLocalStreamEnvironment){ResultPrintUtil.printResult(result);}
}

3. 在结合start.sh脚本

结合start.sh脚本,就清楚了

D:/Projects/flinkx-1.8.5/bin/flinkx \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}"

flinkx之后执行执行的语句是这样的

nohup java -cp \
D:\Projects\flinkx-1.8.5\lib\flinkx-launcher-1.6.jar \
com.dtstack.flinkx.launcher.Launcher \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}" &

Launcher类中通过获取mode,pluginRoot ,flinkconf加载环境,通过json文件得知reader和writer的类型和同步策略

这篇关于Flinkx启动流程-整体理解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/393066

相关文章

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Redis在windows环境下如何启动

《Redis在windows环境下如何启动》:本文主要介绍Redis在windows环境下如何启动的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Redis在Windows环境下启动1.在redis的安装目录下2.输入·redis-server.exe

解决SpringBoot启动报错:Failed to load property source from location 'classpath:/application.yml'

《解决SpringBoot启动报错:Failedtoloadpropertysourcefromlocationclasspath:/application.yml问题》这篇文章主要介绍... 目录在启动SpringBoot项目时报如下错误原因可能是1.yml中语法错误2.yml文件格式是GBK总结在启动S

SpringBoot启动报错的11个高频问题排查与解决终极指南

《SpringBoot启动报错的11个高频问题排查与解决终极指南》这篇文章主要为大家详细介绍了SpringBoot启动报错的11个高频问题的排查与解决,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一... 目录1. 依赖冲突:NoSuchMethodError 的终极解法2. Bean注入失败:No qu

一文带你了解SpringBoot中启动参数的各种用法

《一文带你了解SpringBoot中启动参数的各种用法》在使用SpringBoot开发应用时,我们通常需要根据不同的环境或特定需求调整启动参数,那么,SpringBoot提供了哪些方式来配置这些启动参... 目录一、启动参数的常见传递方式二、通过命令行参数传递启动参数三、使用 application.pro

SpringBoot项目启动报错"找不到或无法加载主类"的解决方法

《SpringBoot项目启动报错找不到或无法加载主类的解决方法》在使用IntelliJIDEA开发基于SpringBoot框架的Java程序时,可能会出现找不到或无法加载主类com.example.... 目录一、问题描述二、排查过程三、解决方案一、问题描述在使用 IntelliJ IDEA 开发基于

Spring AI ectorStore的使用流程

《SpringAIectorStore的使用流程》SpringAI中的VectorStore是一种用于存储和检索高维向量数据的数据库或存储解决方案,它在AI应用中发挥着至关重要的作用,本文给大家介... 目录一、VectorStore的基本概念二、VectorStore的核心接口三、VectorStore的

python之流程控制语句match-case详解

《python之流程控制语句match-case详解》:本文主要介绍python之流程控制语句match-case使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录match-case 语法详解与实战一、基础值匹配(类似 switch-case)二、数据结构解构匹

SpringBoot项目启动错误:找不到或无法加载主类的几种解决方法

《SpringBoot项目启动错误:找不到或无法加载主类的几种解决方法》本文主要介绍了SpringBoot项目启动错误:找不到或无法加载主类的几种解决方法,具有一定的参考价值,感兴趣的可以了解一下... 目录方法1:更改IDE配置方法2:在Eclipse中清理项目方法3:使用Maven命令行在开发Sprin

在VSCode中本地运行DeepSeek的流程步骤

《在VSCode中本地运行DeepSeek的流程步骤》本文详细介绍了如何在本地VSCode中安装和配置Ollama和CodeGPT,以使用DeepSeek进行AI编码辅助,无需依赖云服务,需要的朋友可... 目录步骤 1:在 VSCode 中安装 Ollama 和 CodeGPT安装Ollama下载Olla