Flinkx启动流程-整体理解

2023-11-11 21:58

本文主要是介绍Flinkx启动流程-整体理解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 先看启动脚本

在bin/flinkx的内容

set -e
export FLINKX_HOME="$(cd "`dirname "$0"`"/..; pwd)"
# Find the java binary
if [ -n "${JAVA_HOME}" ]; thenJAVA_RUN="${JAVA_HOME}/bin/java"
elseif [ `command -v java` ]; thenJAVA_RUN="java"elseecho "JAVA_HOME is not set" >&2exit 1fi
fi
JAR_DIR=$FLINKX_HOME/lib/*
CLASS_NAME=com.dtstack.flinkx.launcher.Launcher
# 参数1.java的命令
# -cp就是classpath :cp解释https://zhuanlan.zhihu.com/p/214093661
# -cp $JAR_DIR $CLASS_NAME 就是指定运行哪个jar下的哪个主类
# $@为传递的参数
# &为后端运行
echo "flinkx starting ..."
nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &
  1. 先导入flinkx的环境
  2. 然后是java的二进制文件
  3. 指定flinkx相关jar包和启动类
  4. nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &执行

2. 在看Launcher启动类

定位到flinkx-1.8.5\flinkx-launcher\src\main\java\com\dtstack\flinkx\launcher\Launcher.java的main方法,第95行,查看本地模式

if(mode.equals(ClusterMode.local.name())) {String[] localArgs = argList.toArray(new String[argList.size()]);com.dtstack.flinkx.Main.main(localArgs);
}

flinkx本地模式启动实际调用的是 com.dtstack.flinkx.Main.main方法,进去看看

public static void main(String[] args) throws Exception {//  获取传递的参数com.dtstack.flinkx.options.Options options = new OptionParser(args).getOptions();String job = options.getJob();  // -jobString jobIdString = options.getJobid(); // Flink JobString monitor = options.getMonitor();String pluginRoot = options.getPluginRoot(); // -pluginRoot String savepointPath = options.getS();Properties confProperties = parseConf(options.getConfProp()); // -flinkconf// 解析jobPath指定的任务配置json文件DataTransferConfig config = DataTransferConfig.parse(job);speedTest(config);if(StringUtils.isNotEmpty(monitor)) {config.setMonitorUrls(monitor);}if(StringUtils.isNotEmpty(pluginRoot)) {config.setPluginRoot(pluginRoot);}StreamExecutionEnvironment env = (StringUtils.isNotBlank(monitor)) ?StreamExecutionEnvironment.getExecutionEnvironment() :new MyLocalStreamEnvironment();env = openCheckpointConf(env, confProperties);configRestartStrategy(env, config);SpeedConfig speedConfig = config.getJob().getSetting().getSpeed();env.setParallelism(speedConfig.getChannel());env.setRestartStrategy(RestartStrategies.noRestart());// 得到具体的reader类名BaseDataReader dataReader = DataReaderFactory.getDataReader(config, env);DataStream<Row> dataStream = dataReader.readData();dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}// 得到具体的writer类名BaseDataWriter dataWriter = DataWriterFactory.getDataWriter(config);dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());if(env instanceof MyLocalStreamEnvironment) {if(StringUtils.isNotEmpty(savepointPath)){((MyLocalStreamEnvironment) env).setSettings(SavepointRestoreSettings.forPath(savepointPath));}}addEnvClassPath(env, ClassLoaderManager.getClassPath());// 得到执行的结果JobExecutionResult result = env.execute(jobIdString);if(env instanceof MyLocalStreamEnvironment){ResultPrintUtil.printResult(result);}
}

3. 在结合start.sh脚本

结合start.sh脚本,就清楚了

D:/Projects/flinkx-1.8.5/bin/flinkx \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}"

flinkx之后执行执行的语句是这样的

nohup java -cp \
D:\Projects\flinkx-1.8.5\lib\flinkx-launcher-1.6.jar \
com.dtstack.flinkx.launcher.Launcher \
-mode "local"  \
-job D:/Projects/flinkx-1.8.5/job/ftp2stream.json \
-pluginRoot "D:/Projects/flinkx-1.8.5/plugins" \
-flinkconf "D:/Projects/flinkx-1.8.5/flinkconf"  \
-confProp "{\"flink.checkpoint.interval\":60000}" &

Launcher类中通过获取mode,pluginRoot ,flinkconf加载环境,通过json文件得知reader和writer的类型和同步策略

这篇关于Flinkx启动流程-整体理解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/393066

相关文章

C#提取PDF表单数据的实现流程

《C#提取PDF表单数据的实现流程》PDF表单是一种常见的数据收集工具,广泛应用于调查问卷、业务合同等场景,凭借出色的跨平台兼容性和标准化特点,PDF表单在各行各业中得到了广泛应用,本文将探讨如何使用... 目录引言使用工具C# 提取多个PDF表单域的数据C# 提取特定PDF表单域的数据引言PDF表单是一

PyCharm接入DeepSeek实现AI编程的操作流程

《PyCharm接入DeepSeek实现AI编程的操作流程》DeepSeek是一家专注于人工智能技术研发的公司,致力于开发高性能、低成本的AI模型,接下来,我们把DeepSeek接入到PyCharm中... 目录引言效果演示创建API key在PyCharm中下载Continue插件配置Continue引言

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

深入理解C语言的void*

《深入理解C语言的void*》本文主要介绍了C语言的void*,包括它的任意性、编译器对void*的类型检查以及需要显式类型转换的规则,具有一定的参考价值,感兴趣的可以了解一下... 目录一、void* 的类型任意性二、编译器对 void* 的类型检查三、需要显式类型转换占用的字节四、总结一、void* 的

深入理解Redis大key的危害及解决方案

《深入理解Redis大key的危害及解决方案》本文主要介绍了深入理解Redis大key的危害及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、背景二、什么是大key三、大key评价标准四、大key 产生的原因与场景五、大key影响与危

SpringBoot项目启动后自动加载系统配置的多种实现方式

《SpringBoot项目启动后自动加载系统配置的多种实现方式》:本文主要介绍SpringBoot项目启动后自动加载系统配置的多种实现方式,并通过代码示例讲解的非常详细,对大家的学习或工作有一定的... 目录1. 使用 CommandLineRunner实现方式:2. 使用 ApplicationRunne

Python实现NLP的完整流程介绍

《Python实现NLP的完整流程介绍》这篇文章主要为大家详细介绍了Python实现NLP的完整流程,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 编程安装和导入必要的库2. 文本数据准备3. 文本预处理3.1 小写化3.2 分词(Tokenizatio

深入理解C++ 空类大小

《深入理解C++空类大小》本文主要介绍了C++空类大小,规定空类大小为1字节,主要是为了保证对象的唯一性和可区分性,满足数组元素地址连续的要求,下面就来了解一下... 目录1. 保证对象的唯一性和可区分性2. 满足数组元素地址连续的要求3. 与C++的对象模型和内存管理机制相适配查看类对象内存在C++中,规

SpringBoot使用minio进行文件管理的流程步骤

《SpringBoot使用minio进行文件管理的流程步骤》MinIO是一个高性能的对象存储系统,兼容AmazonS3API,该软件设计用于处理非结构化数据,如图片、视频、日志文件以及备份数据等,本文... 目录一、拉取minio镜像二、创建配置文件和上传文件的目录三、启动容器四、浏览器登录 minio五、