trino-435:dynamic catalog

2023-12-29 01:36
文章标签 dynamic catalog trino 435

本文主要是介绍trino-435:dynamic catalog,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、背景:

trino对于数据源的注册方式为静态注册,在服务启动前需要配置好相关数据源的信息,当添加新的数据源时需要停止服务进行数据源的静态注册然后在重启服务;由于该操作可能会中断正在执行的任务,因此生产环境中这种方式是不可取的。为此再生产环境中需要进行数据源的动态配置,以满足生产环境的需求。在github trino issue Dynamic Catalogs #12709(https://github.com/trinodb/trino/issues/12709)中有相关trino动态数据源的相关推送,我们也依次为依据进行trino动态数据源的调研。

二、动态数据源存储核心接口

trino435版本有关数据源的存储分为静态存储和动态存储,其中动态存储的核心接口为:CatalogStore,包含如下方法:

方法名返回值类型形参描述
getCatalogsCollection<StoredCatalog>获取全部的catalog
createCatalogPropertiesCatalogPropertiesString catalogName, ConnectorName connectorName, Map<String, String> properties从原始的properties创建catalog properties
addOrReplaceCatalogvoidCatalogProperties catalogProperties新增或更新catalog properties
removeCatalogvoidString catalogName如果存在删除catalog

该接口中还包含一个内部接口StoredCatalog,有如下方法:

方法名返回值类型形参描述
getNameCollection<StoredCatalog>获取全部的catalog
loadPropertiesCatalogProperties从原始的properties创建catalog properties。服务启动时会触发调用,触发入口:Server.class中调用io.trino.connector.ConnectorServicesProvider#loadInitialCatalogs
1、数据库存储

为了实现catalogs的持久化,我们需要实现catalog数据库存储方便我们检索和项目上的使用。当我们要加入新的catalogs存储方式时,需要实现这两个接口。下面我们就讲解下将catalogs存储到数据库中的实现方式(以mysql存储为例)。

我们要实现catalogs的数据库的存储的一个前提是服务需要知道要存储数据库的url、用户名、密码等连接信息,因此需要声明对应的配置类(JdbcCatalogStoreConfig),并声明如下变量:

catalog.config-db-url
catalog.config-db-user
catalog.config-db-password

声明JdbcCatalogStore数据存储类实现了CatalogStore接口,完成一下流程改造:

  • 并在构造方法中实现从数据库中加载catalogs信息,catalogs存储的表结构(动态数据源存储表结构)如下:
CREATE TABLE IF NOT EXISTS `catalogs` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL COMMENT 'catalog名称',`properties` text,`create_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',`update_time` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY index_name (`name`)
)
  • 由于需要和mysql数据库交互,因此需要引入mysql驱动jar,并加载驱动
# 引入jar
<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.2.0</version><scope>runtime</scope>
</dependency># 加载驱动
loaderJdbcDriver(this.getClass().getClassLoader(), "com.mysql.cj.jdbc.Driver", catalogsUrl);private static void loaderJdbcDriver(ClassLoader classLoader, String driverClassName, String catalogUrl) {try {final Class<?> clazz = Class.forName(driverClassName, true, classLoader);final Driver driver = (Driver) clazz.newInstance();if (!driver.acceptsURL(catalogUrl)) {log.error("Jdbc driver loading error. Driver {} cannot accept url.", driverClassName);throw new RuntimeException("Jdbc driver loading error.");}} catch (final Exception e) {throw new RuntimeException("Jdbc driver loading error.", e);}
}
  • 实现loadProperties方法,完成String到CatalogProperties的转变。
  • 其他方法的实现可参考InMemoryCatalogStore和FileCatalogStore这两个类,需要注意的是在实现addOrReplaceCatalog方法时,需要实现Properties转String来存储到数据库,然而trino-server自带的JSONObject类转String时存在对“/”转义的情况,因此引入阿里巴巴的JSON转换工具,如下:
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.23</version>
</dependency>
2、动态数据源的restful API

主要完成catalog的注册与删除接口开发,下面分别对catalog注册和删除结构实现方式进行阐述。
(1)catalog的注册接口

  • 将catalog保存到数据库后如何使其在trino服务中生效
  • 如何将catalog注册信息有coordinator同步给worker节点

对于核心问题1,对io.trino.server.Server类的分析,新增catalog需要改造该类中的私有方法io.trino.server.Server#updateConnectorIds,实现将新增的catalog相关信息添加到Announcer类中。

对于核心问题2,对于coordinator节点提供了io.trino.failuredetector.HeartbeatFailureDetector类,可以获取worker节点地址信息,因此可以通过发送http请求向每个worker节点注册catalog(注意:对于worker节点catalog的注册不需要持久化操作,仅仅在内存中保存即可)

(2)catalog的删除接口
该接口主要完成catalog从内存和数据库中删除,并更新Announcer类。另外该接口并不需要和worker节点通信,因为在435版本中提供了catalog的修剪功能,该功能的实现类为io.trino.connector.CatalogPruneTask#CatalogPruneTask

有待优化点

  • 数据源注册完成后并没有立即生效,需要服务的需要对worker节点的探活完成后才生效
  • 在并发操作的情况下存在新加入的worker节点的数据源与coordinator不持一致的情况。
  • 数据源注册完成后coordinator节点通过failureDetector.getStats().values()的形式获取worker节点的uri存在重复的情况,需要优化。

这篇关于trino-435:dynamic catalog的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/547951

相关文章

C# dynamic类型使用详解

《C#dynamic类型使用详解》C#中的dynamic类型允许在运行时确定对象的类型和成员,跳过编译时类型检查,适用于处理未知类型的对象或与动态语言互操作,dynamic支持动态成员解析、添加和删... 目录简介dynamic 的定义dynamic 的使用动态类型赋值访问成员动态方法调用dynamic 的

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes 优势 1、构建了一个用于监督原始视频去噪的基准数据集。为了多次捕捉瞬间,我们手动为对象s创建运动。在高ISO模式下捕获每一时刻的噪声帧,并通过对多个噪声帧进行平均得到相应的干净帧。 2、有效的原始视频去噪网络(RViDeNet),通过探

兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)

Apache Doris 内置支持包括 Hive、Iceberg、Hudi、Paimon、LakeSoul、JDBC 在内的多种 Catalog,并为其提供原生高性能且稳定的访问能力,以满足与数据湖的集成需求。而随着 Apache Doris 用户的增加,新的数据源连接需求也随之增加。因此,从 3.0 版本开始,Apache Doris 引入了 Trino Connector 兼容框架。 Tri

【硬刚ES】ES基础(十三)Dynamic Template和Index Template

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的ES部分补充。

Spark动态资源分配-Dynamic Resource Allocation

关键字:spark、资源分配、dynamic resource allocation Spark中,所谓资源单位一般指的是executors,和Yarn中的Containers一样,在Spark On Yarn模式下,通常使用–num-executors来指定Application使用的executors数量,而–executor-memory和–executor-cores分别用来指定每个ex

强化学习实践(二):Dynamic Programming(Value \ Policy Iteration)

强化学习实践(二):Dynamic Programming(Value \ Policy Iteration) 伪代码Value IterationPolicy IterationTruncated Policy Iteration 代码项目地址 伪代码 具体的理解可以看理论学习篇,以及代码中的注释,以及赵老师原著 Value Iteration Policy Itera

Trino大量查询会导致HDFS namenode主备频繁切换吗?

会,且肯定会 一、背景 今天还没起床就被智能运维叫醒了,说通过namenode审计日志查看访问源ip有我们的trino集群,并且访问量比较大,起床气范了,这不很正常吗,早上一般都是跑批高峰,也不一定是我们trino的问题,必须按时上班。 到了工位联系运维,被告知也不一定是我们的trino引起的namenode主备节点切换,因为那个时间段,有很多系统会访问大数据平台,不管怎样,既然有警告就得排查,

【Kubernetes】持久卷的动态供给 Dynamic Provisioning

《持久化存储》系列,共包含以下文章: K8s 持久化存储方式持久卷 PV持久卷声明 PVC持久卷的动态供给 Dynamic Provisioning 😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!! 持久卷的动态供给 Dynamic Pr

算法训练第26天|452. 用最少数量的箭引爆气球|435. 无重叠区间|763.划分字母区间

LeetCode 452. 用最少数量的箭引爆气球 题目链接:452. 用最少数量的箭引爆气球 题目讲解:代码随想录 func findMinArrowShots(points [][]int) int {var res int = 1// 先按照第一位排序sort.Slice(points, func(i, j int)bool{return points[i][0] < points

Design Pattern—— Dynamic Proxy Pattern(三) 动态代理模式

done by myself 通过下面例子,进一步演示 动态代理 的强大特性,将会体会到 相对于静态static代理,动态可实现 一个proxy代理不同的真实对象。灵活多变。只需要在Client类 调整要代理的角色,无需改动其他部分。使代理角色、真实角色的对应关系更松散了。 抽象角色: package com.proxy.severalCase;public interfa