flink源码分析 - standalone模式下jobmanager启动过程配置文件加载

本文主要是介绍flink源码分析 - standalone模式下jobmanager启动过程配置文件加载,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flink版本: flink-1.11.2

代码位置: org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#main

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.runtime.entrypoint;import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释: flink有三种方式执行应用程序:session mode, per-job mode, applocation mode*  模型的区别主要包含:*  1. 集群生命周期和资源隔离保证*  2. 应用程序的main()方法是在客户机上执行还是在集群上执行*//*** Entry point for the standalone session cluster.*/
public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {public StandaloneSessionClusterEntrypoint(Configuration configuration) {super(configuration);}@Overrideprotected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释:*  1、参数是:StandaloneResourceManagerFactory 实例*  2、返回值:DefaultDispatcherResourceManagerComponentFactory 实例*/return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());}/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释: 入口*/public static void main(String[] args) {// TODO_MA 注释:提供对 JVM 执行环境的访问的实用程序类,如执行用户(getHadoopUser())、启动选项或JVM版本。// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);// TODO_MA 注释:注册一些信号处理SignalHandler.register(LOG);// TODO_MA 注释: 安装安全关闭的钩子// TODO_MA 注释: 你的 Flink集群启动过程中,或者在启动好了之后的运行中,// TODO_MA 注释: 都有可能接收到关闭集群的命令JvmShutdownSafeguard.installAsShutdownHook(LOG);EntrypointClusterConfiguration entrypointClusterConfiguration = null;// TODO_MA 注释:final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory());try {/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释: 对传入的参数进行解析*  内部通过 EntrypointClusterConfigurationParserFactory 解析配置文件,*  返回 EntrypointClusterConfiguration 为 ClusterConfiguration 的子类*/entrypointClusterConfiguration = commandLineParser.parse(args);} catch(FlinkParseException e) {LOG.error("Could not parse command line arguments {}.", args, e);commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());System.exit(1);}/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释: 解析配置参数, 解析 flink 的配置文件: fink-conf.ymal*/Configuration configuration = loadConfiguration(entrypointClusterConfiguration);/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释:创建 StandaloneSessionClusterEntrypoint*/StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释:启动集群的entrypoint*  这个方法接受的是父类 ClusterEntrypoint,可想而知其他几种启动方式也是通过这个方法。*/ClusterEntrypoint.runClusterEntrypoint(entrypoint);}
}

加载配置文件主要分两步:

1.  解析命令行传入参数。核心代码:

entrypointClusterConfiguration = commandLineParser.parse(args);

原理参考:

flink源码分析 - 命令行参数解析-CommandLineParser-CSDN博客

2. flink-yaml配置加载:

核心代码:

Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

其他部分略过,仅记录最关键yaml文件解析部分:  注意下方: org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.configuration;import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.annotation.Nullable;import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;/*** Global configuration object for Flink. Similar to Java properties configuration* objects it includes key-value pairs which represent the framework's configuration.*/
@Internal
public final class GlobalConfiguration {private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";// the keys whose values should be hiddenprivate static final String[] SENSITIVE_KEYS = new String[] {"password", "secret", "fs.azure.account.key"};// the hidden content to be displayedpublic static final String HIDDEN_CONTENT = "******";// --------------------------------------------------------------------------------------------private GlobalConfiguration() {}// --------------------------------------------------------------------------------------------/*** Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an* empty configuration object if the environment variable is not set. In production this variable is set but* tests and local execution/debugging don't have this environment variable set. That's why we should fail* if it is not set.* @return Returns the Configuration*/public static Configuration loadConfiguration() {return loadConfiguration(new Configuration());}/*** Loads the global configuration and adds the given dynamic properties* configuration.** @param dynamicProperties The given dynamic properties* @return Returns the loaded global configuration with dynamic properties*/public static Configuration loadConfiguration(Configuration dynamicProperties) {final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);if (configDir == null) {return new Configuration(dynamicProperties);}return loadConfiguration(configDir, dynamicProperties);}/*** Loads the configuration files from the specified directory.** <p>YAML files are supported as configuration files.** @param configDir*        the directory which contains the configuration files*/public static Configuration loadConfiguration(final String configDir) {// TODO_MA 注释:return loadConfiguration(configDir, null);}/*** Loads the configuration files from the specified directory. If the dynamic properties* configuration is not null, then it is added to the loaded configuration.** @param configDir directory to load the configuration from* @param dynamicProperties configuration file containing the dynamic properties. Null if none.* @return The configuration loaded from the given configuration directory*/public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {if (configDir == null) {throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");}final File confDirFile = new File(configDir);if (!(confDirFile.exists())) {throw new IllegalConfigurationException("The given configuration directory name '" + configDir +"' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory.");}// TODO_MA 注释: Flink 配置文件: flink-conf.yaml// get Flink yaml configuration filefinal File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);if (!yamlConfigFile.exists()) {throw new IllegalConfigurationException("The Flink config file '" + yamlConfigFile +"' (" + confDirFile.getAbsolutePath() + ") does not exist.");}// TODO_MA 注释: 读取 flink-conf.xml 配置文件Configuration configuration = loadYAMLResource(yamlConfigFile);if (dynamicProperties != null) {configuration.addAll(dynamicProperties);}return configuration;}/*** Loads a YAML-file of key-value pairs.** <p>Colon and whitespace ": " separate key and value (one per line). The hash tag "#" starts a single-line comment.** <p>Example:** <pre>* jobmanager.rpc.address: localhost # network address for communication with the job manager* jobmanager.rpc.port   : 6123      # network port to connect to for communication with the job manager* taskmanager.rpc.port  : 6122      # network port the task manager expects incoming IPC connections* </pre>** <p>This does not span the whole YAML specification, but only the *syntax* of simple YAML key-value pairs (see issue* #113 on GitHub). If at any point in time, there is a need to go beyond simple key-value pairs syntax* compatibility will allow to introduce a YAML parser library.** @param file the YAML file to read from* @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a>*/private static Configuration loadYAMLResource(File file) {// TODO_MA 注释: 存储 配置解析结果的 容器final Configuration config = new Configuration();/************************************************** TODO_MA 马中华 https://blog.csdn.net/zhongqi2513*  注释:  读取 flink-conf.yaml 文件*/try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))){String line;int lineNo = 0;// TODO_MA 注释: 读取一行while ((line = reader.readLine()) != null) {lineNo++;// 1. check for comments/*zhouxianfu 2023-07-30: 此处为了防止下面这种情况导致后期取值错误key: value ## comment  以下是示例high-availability.cluster-id: /flink-1.12.0_cluster_yarn   ## 注意: yarn模式下不能配置这个参数,而是由yarn自动生成*/String[] comments = line.split("#", 2);String conf = comments[0].trim();// 2. get key and valueif (conf.length() > 0) {String[] kv = conf.split(": ", 2);// skip line with no valid key-value pairif (kv.length == 1) {LOG.warn("Error while trying to split key and value in configuration file " + file + ":" + lineNo + ": \"" + line + "\"");continue;}String key = kv[0].trim();String value = kv[1].trim();// sanity checkif (key.length() == 0 || value.length() == 0) {LOG.warn("Error after splitting key and value in configuration file " + file + ":" + lineNo + ": \"" + line + "\"");continue;}LOG.info("Loading configuration property: {}, {}", key, isSensitive(key) ? HIDDEN_CONTENT : value);config.setString(key, value);}}} catch (IOException e) {throw new RuntimeException("Error parsing YAML configuration.", e);}// TODO_MA 注释: 返回 Configurationreturn config;}/*** Check whether the key is a hidden key.** @param key the config key*/public static boolean isSensitive(String key) {Preconditions.checkNotNull(key, "key is null");final String keyInLower = key.toLowerCase();for (String hideKey : SENSITIVE_KEYS) {if (keyInLower.length() >= hideKey.length()&& keyInLower.contains(hideKey)) {return true;}}return false;}
}

这篇关于flink源码分析 - standalone模式下jobmanager启动过程配置文件加载的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/451384

相关文章

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

springboot3打包成war包,用tomcat8启动

1、在pom中,将打包类型改为war <packaging>war</packaging> 2、pom中排除SpringBoot内置的Tomcat容器并添加Tomcat依赖,用于编译和测试,         *依赖时一定设置 scope 为 provided (相当于 tomcat 依赖只在本地运行和测试的时候有效,         打包的时候会排除这个依赖)<scope>provided

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL