本文主要是介绍flink sqlClient提交hiveIceberg,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
flink sqlClient提交hiveIceberg
- 环境准备
- sqlclient启动前准备
- 启动sqlclient
- init.sql
- insert.sql
环境准备
组件名 | 版本 |
---|---|
flink客户端 | 1.14.4-2.12 |
hadoop集群 | 3.1.4 |
hive客户端 | 3.1.2 |
iceberg | iceberg-flink-runtime-1.14-0.13.2.jar |
iceberg-hive依赖 | iceberg-hive-runtime-0.13.2.jar |
sqlclient启动前准备
sqlclient启动有两种方式,per-job、session。
session模式需先启动一个session,启动方式如下:
/home/hadoop/flink/bin/yarn-session.sh \
-t /home/hadoop/flink/sqlplugins \
-s 2 -jm 5120 -tm 5120 -qu default -nm iceberg_test1 -d
per-job模式需在flink客户端的flink-conf.yaml文件中添加如下参数:
execution.target: yarn-per-job
注意:
flink-conf.yaml文件中还设置了其他内容如下
classloader.resolve-order: parent-firstclassloader.check-leaked-classloader: false#kerberos相关配置
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /bigdata/apps/test/core.keytab
security.kerberos.login.principal: hadoop
security.kerberos.login.contexts: Client
启动sqlclient
-- yarn session模式
/home/hadoop/flink/bin/sql-client.sh embedded \
-s appId \
-l /home/hadoop/flink/sqlplugins \
-i /home/hadoop/flink/script/init.sql \
-f /home/hadoop/flink/script/insert.sql \
shell-- yarn per-job模式
/home/hadoop/flink/bin/sql-client.sh embedded \
-l /home/hadoop/flink/sqlplugins \
-i /home/hadoop/flink/script/init.sql \
-f /home/hadoop/flink/script/insert.sql \
shell
init.sql
set 'sql-client.verbose'='true';
SET 'execution.checkpointing.interval' = '60s';CREATE CATALOG ice_catalog WITH ('type' = 'iceberg','catalog-type' = 'hive','uri' = 'thrift://hdp02.bonc.com:9083','warehouse' = 'hdfs://beh001/tmp/','hive-conf-dir' = '/home/hadoop/flink/confdir','hadoop-conf-dir' = '/home/hadoop/flink/confdir'
);CREATE DATABASE IF NOT EXISTS ice_catalog.ice_db;CREATE TABLE IF NOT EXISTS ice_catalog.ice_db.ice_tb (deal_date string,chnl_id string,chnl_name string,region_code string,city_code string,chnl_third_class string,chnl_second_class string,chnl_first_class string,chnl_area_class string,chnl_eff_flag string,oper_id string,oper_name string,self_term_code string,air_term_code string,oper_eff_flag string,item_cls_type string,item_cls_desc string,item_grp_type string,item_grp_desc string,user_chnl_id string,user_chnl_name string,user_region_code string,user_city_code string,item_value1 decimal(14,2),item_value2 decimal(14,2),PRIMARY KEY (chnl_id ,oper_id) NOT ENFORCED
) WITH ('write.upsert.enabled' = 'true','write.metadata.previous-versions-max' = '10','write.metadata.delete-after-commit.enabled' = 'true','commit.manifest.min-count-to-merge' = '1','engine.hive.enabled' = 'true','table.dynamic-table-options.enabled' = 'true','format-version' = '2'
);CREATE TABLE csvSource (deal_date string COMMENT '处理日期', chnl_id string COMMENT '渠道ID', chnl_name string COMMENT '渠道名称', region_code string COMMENT '归属地市代码', city_code string COMMENT '归属区县代码', chnl_third_class string COMMENT '渠道三级类型', chnl_second_class string COMMENT '渠道二级类型', chnl_first_class string COMMENT '渠道一级类型', chnl_area_class string COMMENT '渠道地域属性', chnl_eff_flag string COMMENT '渠道有效标志', oper_id string COMMENT '工号ID', oper_name string COMMENT '工号姓名', self_term_code string COMMENT '自助终端标志', air_term_code string COMMENT '空中充值标志', oper_eff_flag string COMMENT '工号有效标志', item_cls_type string COMMENT '指标大类代码', item_cls_desc string COMMENT '指标大类名称', item_grp_type string COMMENT '指标细项代码', item_grp_desc string COMMENT '指标细项名称', user_chnl_id string COMMENT '用户渠道ID', user_chnl_name string COMMENT '用户渠道名称', user_region_code string COMMENT '用户归属地市代码', user_city_code string COMMENT '用户归属区县代码', item_value1 decimal(14,2) COMMENT '指标值1', item_value2 decimal(14,2) COMMENT '指标值2'
) WITH ('connector' = 'filesystem','path' = 'hdfs://beh001/tmp/originData/csvSource.txt','format' = 'csv','csv.field-delimiter' = ','
);
insert.sql
insert intoice_catalog.ice_db.ice_tb
selectdeal_date , chnl_id , chnl_name , region_code , city_code , chnl_third_class , chnl_second_class , chnl_first_class , chnl_area_class , chnl_eff_flag , oper_id , oper_name , self_term_code , air_term_code , oper_eff_flag , item_cls_type , item_cls_desc , item_grp_type , item_grp_desc , user_chnl_id , user_chnl_name , user_region_code , user_city_code , item_value1, item_value2
fromcsvSource;
这篇关于flink sqlClient提交hiveIceberg的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!