【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程

2024-03-12 12:44

本文主要是介绍【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

file

本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

源码修改编译

克隆SeaYunnel-Web源码到本地

  git  clone https://github.com/apache/seatunnel-web.git

在idea中打开项目

升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

  <seatunnel-framework.version>2.3.3</seatunnel-framework.version>改为<seatunnel-framework.version>2.3.4</seatunnel-framework.version>

因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String engineDataType) {return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map)throws DataTypeConvertException {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}
}
// 改为
public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertor<SeaTunnelDataType<?>> {@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, String s1) {return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();}@Overridepublic SeaTunnelDataType<?> toSeaTunnelType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic SeaTunnelDataType<?> toConnectorType(String s, SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object> map) {return seaTunnelDataType;}@Overridepublic String getIdentity() {return "EngineDataTypeConvertor";}}

org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}
}
// 改为public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);Set<PluginIdentifier> pluginIdentifiers =SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayList<PluginIdentifier> pluginIdentifiersList = new ArrayList<>();pluginIdentifiersList.addAll(pluginIdentifiers);List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);//        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {//            List<URL> files = FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory =new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory = new DataTypeConvertorFactory();}}SeaTunnelDataType<?> dataType = convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType<?> dataType =convertor.toSeaTunnelType(field.getName(), field.getType());

org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

 public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {Common.setDeployMode(DeployMode.CLIENT);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();SeaTunnelClient seaTunnelClient = createSeaTunnelClient();ClientJobExecutionEnvironment jobExecutionEnv =seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));jobInstanceDao.update(jobInstance);CompletableFuture.runAsync(() -> {waitJobFinish(clientJobProxy,userId,jobInstanceId,Long.toString(clientJobProxy.getJobId()),seaTunnelClient);});} catch (ExecutionException | InterruptedException e) {ExceptionUtils.getMessage(e);throw new RuntimeException(e);}return jobInstanceId;}

org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

else if (statusList.contains("CANCELLING")) {jobStatus = JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains("CANCELING")) {jobStatus = JobStatus.CANCELING.name();

org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

TableFactoryContext context =new TableFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context =new TableTransformFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());

org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
// 改为
public void restoreJob(@NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);JobConfig jobConfig = new JobConfig();jobConfig.setName(jobInstanceId + "_job");SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}

org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) throws IOException {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();List<Factory> factories;if (path.toFile().exists()) {List<URL> files = FileUtils.searchJarFiles(path);factories =FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;
}
// 改为public static Map<PluginIdentifier, ConnectorFeature> getConnectorFeatures(PluginType pluginType) {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException("ONLY support plugin type source");}ArrayList<PluginIdentifier> pluginIdentifiers = new ArrayList<>();pluginIdentifiers.addAll(SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());List<URL> pluginJarPaths =new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);List<Factory> factories;if (!pluginJarPaths.isEmpty()) {factories =FactoryUtil.discoverFactories(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factories =FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}Map<PluginIdentifier, ConnectorFeature> featureMap = new ConcurrentHashMap<>();factories.forEach(plugin -> {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;PluginIdentifier info =PluginIdentifier.of("seatunnel",PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;

代码格式化

mvn spotless:apply

编译打包

mvn clean package -DskipTests

至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

Linux部署测试

这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

重要的配置项

1、seatunnel-web数据库相关配置(application.yml) 
用来web服务中的数据持久化2、SEATUNNEL_HOME(环境变量)
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME(环境变量)
seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件:
connector-datasource-mapper.yaml 
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
hazelcast-client.yaml 
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息

感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

本文由 白鲸开源科技 提供发布支持!

这篇关于【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/801285

相关文章

Golang的CSP模型简介(最新推荐)

《Golang的CSP模型简介(最新推荐)》Golang采用了CSP(CommunicatingSequentialProcesses,通信顺序进程)并发模型,通过goroutine和channe... 目录前言一、介绍1. 什么是 CSP 模型2. Goroutine3. Channel4. Channe

IDEA如何切换数据库版本mysql5或mysql8

《IDEA如何切换数据库版本mysql5或mysql8》本文介绍了如何将IntelliJIDEA从MySQL5切换到MySQL8的详细步骤,包括下载MySQL8、安装、配置、停止旧服务、启动新服务以及... 目录问题描述解决方案第一步第二步第三步第四步第五步总结问题描述最近想开发一个新应用,想使用mysq

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Spring常见错误之Web嵌套对象校验失效解决办法

《Spring常见错误之Web嵌套对象校验失效解决办法》:本文主要介绍Spring常见错误之Web嵌套对象校验失效解决的相关资料,通过在Phone对象上添加@Valid注解,问题得以解决,需要的朋... 目录问题复现案例解析问题修正总结  问题复现当开发一个学籍管理系统时,我们会提供了一个 API 接口去

龙蜥操作系统Anolis OS-23.x安装配置图解教程(保姆级)

《龙蜥操作系统AnolisOS-23.x安装配置图解教程(保姆级)》:本文主要介绍了安装和配置AnolisOS23.2系统,包括分区、软件选择、设置root密码、网络配置、主机名设置和禁用SELinux的步骤,详细内容请阅读本文,希望能对你有所帮助... ‌AnolisOS‌是由阿里云推出的开源操作系统,旨

PyTorch使用教程之Tensor包详解

《PyTorch使用教程之Tensor包详解》这篇文章介绍了PyTorch中的张量(Tensor)数据结构,包括张量的数据类型、初始化、常用操作、属性等,张量是PyTorch框架中的核心数据结构,支持... 目录1、张量Tensor2、数据类型3、初始化(构造张量)4、常用操作5、常用属性5.1 存储(st

Java操作PDF文件实现签订电子合同详细教程

《Java操作PDF文件实现签订电子合同详细教程》:本文主要介绍如何在PDF中加入电子签章与电子签名的过程,包括编写Word文件、生成PDF、为PDF格式做表单、为表单赋值、生成文档以及上传到OB... 目录前言:先看效果:1.编写word文件1.2然后生成PDF格式进行保存1.3我这里是将文件保存到本地后

windows系统下shutdown重启关机命令超详细教程

《windows系统下shutdown重启关机命令超详细教程》shutdown命令是一个强大的工具,允许你通过命令行快速完成关机、重启或注销操作,本文将为你详细解析shutdown命令的使用方法,并提... 目录一、shutdown 命令简介二、shutdown 命令的基本用法三、远程关机与重启四、实际应用

Debian如何查看系统版本? 7种轻松查看Debian版本信息的实用方法

《Debian如何查看系统版本?7种轻松查看Debian版本信息的实用方法》Debian是一个广泛使用的Linux发行版,用户有时需要查看其版本信息以进行系统管理、故障排除或兼容性检查,在Debia... 作为最受欢迎的 linux 发行版之一,Debian 的版本信息在日常使用和系统维护中起着至关重要的作

python库fire使用教程

《python库fire使用教程》本文主要介绍了python库fire使用教程,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1.简介2. fire安装3. fire使用示例1.简介目前python命令行解析库用过的有:ar