flink sqlClient提交hiveIceberg

2023-11-22 14:12

本文主要是介绍flink sqlClient提交hiveIceberg,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flink sqlClient提交hiveIceberg

  • 环境准备
  • sqlclient启动前准备
  • 启动sqlclient
  • init.sql
  • insert.sql

环境准备

组件名版本
flink客户端1.14.4-2.12
hadoop集群3.1.4
hive客户端3.1.2
icebergiceberg-flink-runtime-1.14-0.13.2.jar
iceberg-hive依赖iceberg-hive-runtime-0.13.2.jar

sqlclient启动前准备

sqlclient启动有两种方式,per-job、session。
session模式需先启动一个session,启动方式如下:

/home/hadoop/flink/bin/yarn-session.sh \
-t /home/hadoop/flink/sqlplugins \
-s 2 -jm 5120 -tm 5120 -qu default -nm iceberg_test1 -d

per-job模式需在flink客户端的flink-conf.yaml文件中添加如下参数:
execution.target: yarn-per-job
注意:

flink-conf.yaml文件中还设置了其他内容如下
classloader.resolve-order: parent-firstclassloader.check-leaked-classloader: false#kerberos相关配置
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /bigdata/apps/test/core.keytab
security.kerberos.login.principal: hadoop
security.kerberos.login.contexts: Client

启动sqlclient

-- yarn session模式
/home/hadoop/flink/bin/sql-client.sh  embedded \
-s appId \
-l /home/hadoop/flink/sqlplugins \
-i /home/hadoop/flink/script/init.sql \
-f /home/hadoop/flink/script/insert.sql \
shell-- yarn per-job模式
/home/hadoop/flink/bin/sql-client.sh  embedded \
-l /home/hadoop/flink/sqlplugins \
-i /home/hadoop/flink/script/init.sql \
-f /home/hadoop/flink/script/insert.sql \
shell

init.sql

set 'sql-client.verbose'='true';
SET 'execution.checkpointing.interval' = '60s';CREATE CATALOG ice_catalog WITH ('type' = 'iceberg','catalog-type' = 'hive','uri' = 'thrift://hdp02.bonc.com:9083','warehouse' = 'hdfs://beh001/tmp/','hive-conf-dir' = '/home/hadoop/flink/confdir','hadoop-conf-dir' = '/home/hadoop/flink/confdir'
);CREATE DATABASE IF NOT EXISTS ice_catalog.ice_db;CREATE TABLE IF NOT EXISTS ice_catalog.ice_db.ice_tb (deal_date string,chnl_id string,chnl_name string,region_code string,city_code string,chnl_third_class string,chnl_second_class string,chnl_first_class string,chnl_area_class string,chnl_eff_flag string,oper_id string,oper_name string,self_term_code string,air_term_code string,oper_eff_flag string,item_cls_type string,item_cls_desc string,item_grp_type string,item_grp_desc string,user_chnl_id string,user_chnl_name string,user_region_code string,user_city_code string,item_value1 decimal(14,2),item_value2 decimal(14,2),PRIMARY KEY (chnl_id ,oper_id) NOT ENFORCED
) WITH ('write.upsert.enabled' = 'true','write.metadata.previous-versions-max' = '10','write.metadata.delete-after-commit.enabled' = 'true','commit.manifest.min-count-to-merge' = '1','engine.hive.enabled' = 'true','table.dynamic-table-options.enabled' = 'true','format-version' = '2'
);CREATE TABLE csvSource (deal_date string COMMENT '处理日期',               chnl_id string COMMENT '渠道ID',                 chnl_name string COMMENT '渠道名称',               region_code string COMMENT '归属地市代码',           city_code string COMMENT '归属区县代码',             chnl_third_class string COMMENT '渠道三级类型',      chnl_second_class string COMMENT '渠道二级类型',     chnl_first_class string COMMENT '渠道一级类型',      chnl_area_class string COMMENT '渠道地域属性',       chnl_eff_flag string COMMENT '渠道有效标志',         oper_id string COMMENT '工号ID',                 oper_name string COMMENT '工号姓名',               self_term_code string COMMENT '自助终端标志',        air_term_code string COMMENT '空中充值标志',         oper_eff_flag string COMMENT '工号有效标志',         item_cls_type string COMMENT '指标大类代码',         item_cls_desc string COMMENT '指标大类名称',         item_grp_type string COMMENT '指标细项代码',         item_grp_desc string COMMENT '指标细项名称',         user_chnl_id string COMMENT '用户渠道ID',          user_chnl_name string COMMENT '用户渠道名称',        user_region_code string COMMENT '用户归属地市代码',    user_city_code string COMMENT '用户归属区县代码',      item_value1 decimal(14,2) COMMENT '指标值1',      item_value2 decimal(14,2) COMMENT '指标值2'
) WITH ('connector' = 'filesystem','path' = 'hdfs://beh001/tmp/originData/csvSource.txt','format' = 'csv','csv.field-delimiter' = ','
);

insert.sql

insert intoice_catalog.ice_db.ice_tb
selectdeal_date  ,               chnl_id  ,                 chnl_name  ,               region_code  ,           city_code  ,             chnl_third_class  ,      chnl_second_class  ,     chnl_first_class  ,      chnl_area_class  ,       chnl_eff_flag  ,         oper_id  ,                 oper_name  ,               self_term_code  ,        air_term_code  ,         oper_eff_flag  ,         item_cls_type  ,         item_cls_desc  ,         item_grp_type  ,         item_grp_desc  ,         user_chnl_id  ,          user_chnl_name  ,        user_region_code  ,    user_city_code  ,      item_value1,      item_value2
fromcsvSource;

这篇关于flink sqlClient提交hiveIceberg的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/410711

相关文章

作业提交过程之HDFSMapReduce

作业提交全过程详解 (1)作业提交 第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAp

HTML提交表单给python

python 代码 from flask import Flask, request, render_template, redirect, url_forapp = Flask(__name__)@app.route('/')def form():# 渲染表单页面return render_template('./index.html')@app.route('/submit_form',

Python3 BeautifulSoup爬虫 POJ自动提交

POJ 提交代码采用Base64加密方式 import http.cookiejarimport loggingimport urllib.parseimport urllib.requestimport base64from bs4 import BeautifulSoupfrom submitcode import SubmitCodeclass SubmitPoj():de

查看提交历史 —— Git 学习笔记 11

查看提交历史 查看提交历史 不带任何选项的git log-p选项--stat 选项--pretty=oneline选项--pretty=format选项git log常用选项列表参考资料 在提交了若干更新,又或者克隆了某个项目之后,你也许想回顾下提交历史。 完成这个任务最简单而又有效的 工具是 git log 命令。 接下来的例子会用一个用于演示的 simplegit

form表单提交编码的问题

浏览器在form提交后,会生成一个HTTP的头部信息"content-type",标准规定其形式为Content-type: application/x-www-form-urlencoded; charset=UTF-8        那么我们如果需要修改编码,不使用默认的,那么可以如下这样操作修改编码,来满足需求: hmtl代码:   <meta http-equiv="Conte

js异步提交form表单的解决方案

1.定义异步提交表单的方法 (通用方法) /*** 异步提交form表单* @param options {form:form表单元素,success:执行成功后处理函数}* <span style="color:#ff0000;"><strong>@注意 后台接收参数要解码否则中文会导致乱码 如:URLDecoder.decode(param,"UTF-8")</strong></span>

husky 工具配置代码检查工作流:提交代码至仓库前做代码检查

提示:这篇博客以我前两篇博客作为先修知识,请大家先去看看我前两篇博客 博客指路:前端 ESlint 代码规范及修复代码规范错误-CSDN博客前端 Vue3 项目开发—— ESLint & prettier 配置代码风格-CSDN博客 husky 工具配置代码检查工作流的作用 在工作中,我们经常需要将写好的代码提交至代码仓库 但是由于程序员疏忽而将不规范的代码提交至仓库,显然是不合理的 所

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

js操作Dom节点拼接表单及ajax提交表单

有时候我们不希望html(jsp、vm)中有创建太多的标签(dom节点),所以这些任务都由js来做,下面提供套完整的表单提交流程,只需要在html中添加两个div其余的都由js来做吧。下面原生代码只需略微修改就能达到你想要的效果。 1、需要创建表单的点击事件 <a href="javascript:void(0);"onclick="changeSettleMoney('$!doctor.do

在幼儿园管理系统中,会议管理申请会议模块:添加会议记录(提交表单)的时候报:404错误!

在幼儿园管理系统(spring MVC)中,会议管理>申请会议模块:添加会议记录的时候报:404错误!不知道为啥找不到,一开始感觉一头雾水,怎么会出现404页面找不到错误那,又检查action,controller等这也没错啊!怎么出现404错误那。经过询问和查找,终于找到原因了。 原因是:添加的有时间字段。 代码: @InitBinder public void in