本文主要是介绍Flinkx如何通过json文件定位读写插件,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
前言
Flinkx作为数据同步工具,它通过json配置文件来确定多源到多源的数据同步和同步策略,这次就来看看Flinkx是如何通过json配置文件来定位reader和writer端的
整体结构
从Flinkx项目里可以看到,它对每个数据源都定义了core,reader输出端,writer输入端。当然也有意外,hive和redis只要输入。
Flinkx是如何通过json配置文件定位这个源呢
看源码
定位到D:\Projects\flinkx-1.8.5\flinkx-core\src\main\java\com\dtstack\flinkx\Main.java
1. 先看下如何定位reader端,在第113行
BaseDataReader dataReader = DataReaderFactory.getDataReader(config, env);
// 进入getDataReader方法:
public static BaseDataReader getDataReader(DataTransferConfig config, StreamExecutionEnvironment env) {try {// 通过配置文件获取插件名String pluginName = config.getJob().getContent().get(0).getReader().getName();// 通过插件名找到插件的类名String pluginClassName = PluginUtil.getPluginClassName(pluginName);// 在获取插件名的文件路径和公共插件文件路径Set<URL> urlList = PluginUtil.getJarFileDirPath(pluginName, config.getPluginRoot());// 通过文件路径加载类,得到构造函数,实例化插件return ClassLoaderManager.newInstance(urlList, cl -> {Class<?> clazz = cl.loadClass(pluginClassName);Constructor constructor = clazz.getConstructor(DataTransferConfig.class, StreamExecutionEnvironment.class);return (BaseDataReader)constructor.newInstance(config, env);});} catch (Exception e) {throw new RuntimeException(e);}}
在每个json配置文件中,都需要配置job.content.reader.name="xxxreader"参数,这个name就是插件名,通过该名字才可以找到相应的插件。实例化插件还需要导入公共的插件plugins/common/flinkx-rdb-**.jar
2. 在看看writer端,在122行
BaseDataWriter dataWriter = DataWriterFactory.getDataWriter(config);
// 进入getDataWriter方法
public static BaseDataWriter getDataWriter(DataTransferConfig config) {try {String pluginName = config.getJob().getContent().get(0).getWriter().getName();String pluginClassName = PluginUtil.getPluginClassName(pluginName);Set<URL> urlList = PluginUtil.getJarFileDirPath(pluginName, config.getPluginRoot());return ClassLoaderManager.newInstance(urlList, cl -> {Class<?> clazz = cl.loadClass(pluginClassName);Constructor constructor = clazz.getConstructor(DataTransferConfig.class);return (BaseDataWriter)constructor.newInstance(config);});} catch (Exception e) {throw new RuntimeException(e);}}
与reader端同理,通过name确定相应的writer端
这篇关于Flinkx如何通过json文件定位读写插件的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!