6 Hive引擎集成Apache Paimon

2023-11-10 16:30
文章标签 引擎 集成 apache hive paimon

本文主要是介绍6 Hive引擎集成Apache Paimon,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

在实际工作中,我们通查会使用Flink计算引擎去读写Paimon,但是在批处理场景中,更多的是使用Hive去读写Paimon,这样操作起来更加方便。

前面我们在Flink代码里面,借助于Hive Catalog,实现了在Flink中创建Paimon表,写入数据,并且把paimon的元数据信息保存在Hive Metastore里面,这样创建的表是可以被Hive识别并且操作的。

但是最直接的肯定是在Hive中直接创建Paimon类型的表,并且读写数据。

Paimon目前可以支持Hive 3.1, 2.3, 2.2, 2.1 and 2.1-cdh-6.3这些版本的操作。

但是需要注意,如果Hive的执行引擎使用的是Tez,那么只能读取Paimon,无法向Paimon中写入数据。如果Hive的执行引擎使用的是MR,那么读写都是支持的。

在Hive中配置Paimon依赖

想要在Hive中操作Paimon,首先需要在Hive中配置Paimon的依赖,此时我们需要用到一个jar包:paimon-hive-connector
我们目前使用的Hive是3.1.2版本的,所以需要下载对应版本的paimon-hive-connector jar包。

https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-3.1/0.5.0-incubating/paimon-hive-connector-3.1-0.5.0-incubating.jar

将这个jar包上传到bigdata04机器(hive客户端机器)的hive安装目录中:

[root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[root@bigdata04 apache-hive-3.1.2-bin]# mkdir auxlib
[root@bigdata04 apache-hive-3.1.2-bin]# cd auxlib/
[root@bigdata04 auxlib]# ll
total 34128
-rw-r--r--. 1 root root 34945743 Sep 13  2023 paimon-hive-connector-3.1-0.5.0-incubating.jar

注意:需要在hive安装目录中创建auxlib目录,然后把jar包上传到这个目录中,这样会被自动加载。

如果我们在操作Hive的时候使用的是beeline客户端,那么在Hive中配置好Paimon的环境之后,需要重启HiveServer2服务。

在Hive中读写Paimon表

咱们之前在Flink引擎代码中使用Hive Catalog的时候创建了一个表:p_h_t1,这个表的元数据在Hive Metastore也有存储,之前我们其实也能查看到,只是在hive中查询这个表中数据的时候报错了,其实就是因为缺少paimon-hive-connector这个jar包,现在我们再查询就可以了。

在这里我们使用Hive的beeline客户端。
注意:需要先启动HiveServer2服务。

[root@bigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[root@bigdata04 apache-hive-3.1.2-bin]# bin/hiveserver2

查看hive中目前都有哪些表:

[root@bigdata04 apache-hive-3.1.2-bin]# bin/beeline -u  jdbc:hive2://localhost:10000 -n root
Connecting to jdbc:hive2://localhost:10000
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 3.1.2 by Apache Hive
0: jdbc:hive2://localhost:10000> SHOW TABLES;
+--------------------+
|      tab_name      |
+--------------------+
| flink_stu          |
| orders             |
| p_h_t1             |
| p_h_par             |
| s1                 |
| student_favors     |
| student_favors_2   |
| student_score      |
| student_score_bak  |
| t1                 |
+--------------------+
9 rows selected (1.663 seconds)

此时可以看到之前通过Hive Catalog写入的表:p_h_t1

查询这个表中的数据:

0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| tom          | 20          |
+--------------+-------------+
2 rows selected (5.853 seconds)

此时就可以正常查询了。

接着我们尝试在Hive中向这个Paimon表中插入一条数据:

0: jdbc:hive2://localhost:10000> INSERT INTO p_h_t1(name,age) VALUES('jessic',19);

重新查询这个表中的最新数据:

0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| jessic       | 19          |
| tom          | 20          |
+--------------+-------------+
3 rows selected (0.737 seconds)

在通过Hive进行查询的时候,默认查询的是表中最新快照的数据,我们也可以通过时间旅行这个特性来控制查询之前的数据。

举个例子,查询指定快照版本中的数据:

0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=1;
No rows affected (0.011 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| tom          | 20          |
+--------------+-------------+
2 rows selected (0.752 seconds)
0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=2;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t1;
+--------------+-------------+
| p_h_t1.name  | p_h_t1.age  |
+--------------+-------------+
| jack         | 18          |
| jessic       | 19          |
| tom          | 20          |
+--------------+-------------+
3 rows selected (0.692 seconds)

这样就可以实现查询历史数据的查询了。

在Hive中创建Paimon表

前面我们操作的p_h_t1这个表其实是借助于Flink引擎创建的。
下面我们来看一下在Hive中如何创建Piamon表:

0: jdbc:hive2://localhost:10000> SET hive.metastore.warehouse.dir=hdfs://bigdata01:9000/paimon;
0: jdbc:hive2://localhost:10000> CREATE TABLE IF NOT EXISTS p_h_t2(name STRING,age INT,PRIMARY KEY (name) NOT ENFORCED
)STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';

这样表就创建好了,下面我们可以在Hive中测试一下读写数据:

0: jdbc:hive2://localhost:10000> INSERT INTO p_h_t2(name,age) VALUES('tom',20);
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state=,code=0)

注意:此时查询报错是因为找不到snapshot-2这份快照数据,目前这个表中只添加了一次数据,所以只有snapshot-1

那为什么会查找snapshot-2呢?

因为我们前面在这个会话中设置了SET paimon.scan.snapshot-id=2;,这个配置在当前会话有效。

正常情况下我们在hive中执行SET paimon.scan.snapshot-id=null;其实就可以了:

0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id=null;
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000> SET paimon.scan.snapshot-id;
+-------------------------------+
|              set              |
+-------------------------------+
| paimon.scan.snapshot-id=null  |
+-------------------------------+
1 row selected (0.009 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
Error: java.io.IOException: java.lang.RuntimeException: Fails to read snapshot from path hdfs://bigdata01:9000/paimon/default.db/p_h_t2/snapshot/snapshot-2 (state=,code=0)

但是发现他还是会找snapshot-2

我们尝试重新开启一个新的会话查询也不行,就算重启hiveserver2也还是不行。

后来发现这可能是一个bug,当我们在hive会话中设置了paimon.scan.snapshot-id=2,那么之后创建的表默认就只会查询snapshot-2了,那也就意味着建表的时候会把这个参数带过去。

为了验证这个猜想,我们在flink代码中查询这个Paimon表的详细建表语句,不要在hive命令行中查看(在Hive命令行中看不到详细的参数信息)。

创建package:tech.xuwei.paimon.hivepaimon
创建object:FlinkSQLReadFromPaimo

完整代码如下:

package tech.xuwei.paimon.cdcingestionimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 使用FlinkSQL从Paimon表中读取数据* Created by xuwei*/
object FlinkSQLReadFromPaimon {def main(args: Array[String]): Unit = {//创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH(|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//读取Paimon表中的数据,并且打印输出结果tEnv.executeSql("""|SHOW CREATE TABLE p_h_t2;|""".stripMargin).print()}
}

在idea中执行代码,查看结果:

CREATE TABLE `paimon_catalog`.`default`.`p_h_t2` (`name` VARCHAR(2147483647),`age` INT
) WITH ('path' = 'hdfs://bigdata01:9000/paimon/default.db/p_h_t2','totalSize' = '0','numRows' = '0','rawDataSize' = '0','scan.snapshot-id' = '2','COLUMN_STATS_ACCURATE' = '{"BASIC_STATS":"true","COLUMN_STATS":{"age":"true","name":"true"}}','numFiles' = '0','bucketing_version' = '2','storage_handler' = 'org.apache.paimon.hive.PaimonStorageHandler'
)

在这里发现建表语句中有一个参数:'scan.snapshot-id' = '2',所以它默认会读取第2个快照。

想要解决这个问题,有两个办法。

  • 1:在hive中删除这个表,然后执行SET paimon.scan.snapshot-id=null;,再创建这个表就行了。
  • 2:如果不想删除这个表,可以在Flink代码中修改这个表,移除scan.snapshot-id属性即可,这个功能我们之前讲过。

第一种办法简单粗暴,不再演示,我们来看一下第二种办法:

创建object:FlinkSQLAlterPaimonTable

完整代码如下:

package tech.xuwei.paimon.hivepaimonimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** 修改Paimon表属性* Created by xuwei*/
object FlinkSQLAlterPaimonTable {def main(args: Array[String]): Unit = {//创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv = StreamTableEnvironment.create(env)//创建Paimon类型的CatalogtEnv.executeSql("""|CREATE CATALOG paimon_catalog WITH(|    'type'='paimon',|    'warehouse'='hdfs://bigdata01:9000/paimon'|)|""".stripMargin)tEnv.executeSql("USE CATALOG paimon_catalog")//移除表中的scan.snapshot-id属性tEnv.executeSql("""|ALTER TABLE p_h_t2 RESET ('scan.snapshot-id')|""".stripMargin)//查看最新的表属性信息tEnv.executeSql("""|SHOW CREATE TABLE p_h_t2|""".stripMargin).print()}
}

执行此代码,可以看到如下结果:

CREATE TABLE `paimon_catalog`.`default`.`p_h_t2` (`name` VARCHAR(2147483647),`age` INT
) WITH ('path' = 'hdfs://bigdata01:9000/paimon/default.db/p_h_t2','totalSize' = '0','numRows' = '0','rawDataSize' = '0','COLUMN_STATS_ACCURATE' = '{"BASIC_STATS":"true","COLUMN_STATS":{"age":"true","name":"true"}}','numFiles' = '0','bucketing_version' = '2','storage_handler' = 'org.apache.paimon.hive.PaimonStorageHandler'
)

此时表中就没有scan.snapshot-id属性了。

这个时候我们再回到hive命令行中查询这个表:

0: jdbc:hive2://localhost:10000> SELECT * FROM p_h_t2;
+--------------+-------------+
| p_h_t2.name  | p_h_t2.age  |
+--------------+-------------+
| tom          | 20          |
+--------------+-------------+
1 row selected (0.46 seconds)

这样就可以正常查询了。

注意:如果此时我们在hive中删除这个表,那么对应的paimon中这个表也会被删除。

0: jdbc:hive2://localhost:10000> drop table p_h_t2;
No rows affected (0.33 seconds)

到hdfs中确认一下这个paimon表是否存在:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/p_h_t2
ls: `/paimon/default.db/p_h_t2': No such file or directory

这样就说明paimon中这个表不存在了。

不过在hdfs中会多一个这个目录,这属于一个临时目录,没什么影响,可以手工删除,不处理也没影响。

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/_tmp.p_h_t2

针对Paimon中已经存在的表,我们想要在hive中进行访问,应该如何实现呢?

此时可以借助于Hive的外部表特性来实现。
相当于是在hive中创建一个外部表,通过location指向paimon表的hdfs路径即可。

我们使用前面cdc数据采集中创建的Paimon表:cdc_chinese_code,在hive中创建一个外部表映射到这个表:

0: jdbc:hive2://localhost:10000> CREATE EXTERNAL TABLE p_h_external
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
LOCATION 'hdfs://bigdata01:9000/paimon/default.db/cdc_chinese_code';

然后就可以在hive中查询这个表了:

0: jdbc:hive2://localhost:10000> select * from p_h_external;
+------------------+--------------------+
| p_h_external.id  | p_h_external.name  |
+------------------+--------------------+
| 1                | 张三                 |
| 2                | 李四                 |
+------------------+--------------------+

此时如果我们在hive中删除这个外部表,不会影响paimon中的cdc_chinese_code表。

0: jdbc:hive2://localhost:10000> drop table p_h_external;

到hdfs中确认一下,cdc_chinese_code这个paimon表还是存在的:

[root@bigdata04 ~]# hdfs dfs -ls /paimon/default.db/cdc_chinese_code
Found 4 items
drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/bucket-0
drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/manifest
drwxr-xr-x   - root supergroup          0 2029-02-27 11:26 /paimon/default.db/cdc_chinese_code/schema
drwxr-xr-x   - root supergroup          0 2029-02-27 11:32 /paimon/default.db/cdc_chinese_code/snapshot

更多Paimon数据湖内容请关注:https://edu.51cto.com/course/35051.html

这篇关于6 Hive引擎集成Apache Paimon的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/383881

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2

java如何通过Kerberos认证方式连接hive

《java如何通过Kerberos认证方式连接hive》该文主要介绍了如何在数据源管理功能中适配不同数据源(如MySQL、PostgreSQL和Hive),特别是如何在SpringBoot3框架下通过... 目录Java实现Kerberos认证主要方法依赖示例续期连接hive遇到的问题分析解决方式扩展思考总

深入理解Apache Airflow 调度器(最新推荐)

《深入理解ApacheAirflow调度器(最新推荐)》ApacheAirflow调度器是数据管道管理系统的关键组件,负责编排dag中任务的执行,通过理解调度器的角色和工作方式,正确配置调度器,并... 目录什么是Airflow 调度器?Airflow 调度器工作机制配置Airflow调度器调优及优化建议最

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Apache Tomcat服务器版本号隐藏的几种方法

《ApacheTomcat服务器版本号隐藏的几种方法》本文主要介绍了ApacheTomcat服务器版本号隐藏的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1. 隐藏HTTP响应头中的Server信息编辑 server.XML 文件2. 修China编程改错误

SpringBoot使用注解集成Redis缓存的示例代码

《SpringBoot使用注解集成Redis缓存的示例代码》:本文主要介绍在SpringBoot中使用注解集成Redis缓存的步骤,包括添加依赖、创建相关配置类、需要缓存数据的类(Tes... 目录一、创建 Caching 配置类二、创建需要缓存数据的类三、测试方法Spring Boot 熟悉后,集成一个外

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke