flink sqlClient提交hiveIceberg

2023-11-22 14:12

本文主要是介绍flink sqlClient提交hiveIceberg,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flink sqlClient提交hiveIceberg

  • 环境准备
  • sqlclient启动前准备
  • 启动sqlclient
  • init.sql
  • insert.sql

环境准备

组件名版本
flink客户端1.14.4-2.12
hadoop集群3.1.4
hive客户端3.1.2
icebergiceberg-flink-runtime-1.14-0.13.2.jar
iceberg-hive依赖iceberg-hive-runtime-0.13.2.jar

sqlclient启动前准备

sqlclient启动有两种方式,per-job、session。
session模式需先启动一个session,启动方式如下:

/home/hadoop/flink/bin/yarn-session.sh \
-t /home/hadoop/flink/sqlplugins \
-s 2 -jm 5120 -tm 5120 -qu default -nm iceberg_test1 -d

per-job模式需在flink客户端的flink-conf.yaml文件中添加如下参数:
execution.target: yarn-per-job
注意:

flink-conf.yaml文件中还设置了其他内容如下
classloader.resolve-order: parent-firstclassloader.check-leaked-classloader: false#kerberos相关配置
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /bigdata/apps/test/core.keytab
security.kerberos.login.principal: hadoop
security.kerberos.login.contexts: Client

启动sqlclient

-- yarn session模式
/home/hadoop/flink/bin/sql-client.sh  embedded \
-s appId \
-l /home/hadoop/flink/sqlplugins \
-i /home/hadoop/flink/script/init.sql \
-f /home/hadoop/flink/script/insert.sql \
shell-- yarn per-job模式
/home/hadoop/flink/bin/sql-client.sh  embedded \
-l /home/hadoop/flink/sqlplugins \
-i /home/hadoop/flink/script/init.sql \
-f /home/hadoop/flink/script/insert.sql \
shell

init.sql

set 'sql-client.verbose'='true';
SET 'execution.checkpointing.interval' = '60s';CREATE CATALOG ice_catalog WITH ('type' = 'iceberg','catalog-type' = 'hive','uri' = 'thrift://hdp02.bonc.com:9083','warehouse' = 'hdfs://beh001/tmp/','hive-conf-dir' = '/home/hadoop/flink/confdir','hadoop-conf-dir' = '/home/hadoop/flink/confdir'
);CREATE DATABASE IF NOT EXISTS ice_catalog.ice_db;CREATE TABLE IF NOT EXISTS ice_catalog.ice_db.ice_tb (deal_date string,chnl_id string,chnl_name string,region_code string,city_code string,chnl_third_class string,chnl_second_class string,chnl_first_class string,chnl_area_class string,chnl_eff_flag string,oper_id string,oper_name string,self_term_code string,air_term_code string,oper_eff_flag string,item_cls_type string,item_cls_desc string,item_grp_type string,item_grp_desc string,user_chnl_id string,user_chnl_name string,user_region_code string,user_city_code string,item_value1 decimal(14,2),item_value2 decimal(14,2),PRIMARY KEY (chnl_id ,oper_id) NOT ENFORCED
) WITH ('write.upsert.enabled' = 'true','write.metadata.previous-versions-max' = '10','write.metadata.delete-after-commit.enabled' = 'true','commit.manifest.min-count-to-merge' = '1','engine.hive.enabled' = 'true','table.dynamic-table-options.enabled' = 'true','format-version' = '2'
);CREATE TABLE csvSource (deal_date string COMMENT '处理日期',               chnl_id string COMMENT '渠道ID',                 chnl_name string COMMENT '渠道名称',               region_code string COMMENT '归属地市代码',           city_code string COMMENT '归属区县代码',             chnl_third_class string COMMENT '渠道三级类型',      chnl_second_class string COMMENT '渠道二级类型',     chnl_first_class string COMMENT '渠道一级类型',      chnl_area_class string COMMENT '渠道地域属性',       chnl_eff_flag string COMMENT '渠道有效标志',         oper_id string COMMENT '工号ID',                 oper_name string COMMENT '工号姓名',               self_term_code string COMMENT '自助终端标志',        air_term_code string COMMENT '空中充值标志',         oper_eff_flag string COMMENT '工号有效标志',         item_cls_type string COMMENT '指标大类代码',         item_cls_desc string COMMENT '指标大类名称',         item_grp_type string COMMENT '指标细项代码',         item_grp_desc string COMMENT '指标细项名称',         user_chnl_id string COMMENT '用户渠道ID',          user_chnl_name string COMMENT '用户渠道名称',        user_region_code string COMMENT '用户归属地市代码',    user_city_code string COMMENT '用户归属区县代码',      item_value1 decimal(14,2) COMMENT '指标值1',      item_value2 decimal(14,2) COMMENT '指标值2'
) WITH ('connector' = 'filesystem','path' = 'hdfs://beh001/tmp/originData/csvSource.txt','format' = 'csv','csv.field-delimiter' = ','
);

insert.sql

insert intoice_catalog.ice_db.ice_tb
selectdeal_date  ,               chnl_id  ,                 chnl_name  ,               region_code  ,           city_code  ,             chnl_third_class  ,      chnl_second_class  ,     chnl_first_class  ,      chnl_area_class  ,       chnl_eff_flag  ,         oper_id  ,                 oper_name  ,               self_term_code  ,        air_term_code  ,         oper_eff_flag  ,         item_cls_type  ,         item_cls_desc  ,         item_grp_type  ,         item_grp_desc  ,         user_chnl_id  ,          user_chnl_name  ,        user_region_code  ,    user_city_code  ,      item_value1,      item_value2
fromcsvSource;

这篇关于flink sqlClient提交hiveIceberg的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/410711

相关文章

将Java项目提交到云服务器的流程步骤

《将Java项目提交到云服务器的流程步骤》所谓将项目提交到云服务器即将你的项目打成一个jar包然后提交到云服务器即可,因此我们需要准备服务器环境为:Linux+JDK+MariDB(MySQL)+Gi... 目录1. 安装 jdk1.1 查看 jdk 版本1.2 下载 jdk2. 安装 mariadb(my

Git如何修改已提交人的用户名和邮箱

《Git如何修改已提交人的用户名和邮箱》文章介绍了如何修改Git已提交人的用户名和邮箱,包括注意事项和具体步骤,确保操作正确无误... 目录git修改已提交人的用户名和邮箱前言第一步第二步总结git修改已提交人的用户名和邮箱前言需注意以下两点内容:需要在顶层目录下(php就是 .git 文件夹所在的目

javaScript在表单提交时获取表单数据的示例代码

《javaScript在表单提交时获取表单数据的示例代码》本文介绍了五种在JavaScript中获取表单数据的方法:使用FormData对象、手动提取表单数据、使用querySelector获取单个字... 方法 1:使用 FormData 对象FormData 是一个方便的内置对象,用于获取表单中的键值

Redis 多规则限流和防重复提交方案实现小结

《Redis多规则限流和防重复提交方案实现小结》本文主要介绍了Redis多规则限流和防重复提交方案实现小结,包括使用String结构和Zset结构来记录用户IP的访问次数,具有一定的参考价值,感兴趣... 目录一:使用 String 结构记录固定时间段内某用户 IP 访问某接口的次数二:使用 Zset 进行

Git提交代码详细流程及问题总结

《Git提交代码详细流程及问题总结》:本文主要介绍Git的三大分区,分别是工作区、暂存区和版本库,并详细描述了提交、推送、拉取代码和合并分支的流程,文中通过代码介绍的非常详解,需要的朋友可以参考下... 目录1.git 三大分区2.Git提交、推送、拉取代码、合并分支详细流程3.问题总结4.git push

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

HTML提交表单给python

python 代码 from flask import Flask, request, render_template, redirect, url_forapp = Flask(__name__)@app.route('/')def form():# 渲染表单页面return render_template('./index.html')@app.route('/submit_form',

Python3 BeautifulSoup爬虫 POJ自动提交

POJ 提交代码采用Base64加密方式 import http.cookiejarimport loggingimport urllib.parseimport urllib.requestimport base64from bs4 import BeautifulSoupfrom submitcode import SubmitCodeclass SubmitPoj():de

查看提交历史 —— Git 学习笔记 11

查看提交历史 查看提交历史 不带任何选项的git log-p选项--stat 选项--pretty=oneline选项--pretty=format选项git log常用选项列表参考资料 在提交了若干更新,又或者克隆了某个项目之后,你也许想回顾下提交历史。 完成这个任务最简单而又有效的 工具是 git log 命令。 接下来的例子会用一个用于演示的 simplegit

form表单提交编码的问题

浏览器在form提交后,会生成一个HTTP的头部信息"content-type",标准规定其形式为Content-type: application/x-www-form-urlencoded; charset=UTF-8        那么我们如果需要修改编码,不使用默认的,那么可以如下这样操作修改编码,来满足需求: hmtl代码:   <meta http-equiv="Conte