本文主要是介绍flink on yarn paimon,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
- 概述
- 实践
- paimon
- 结束
概述
ogg kafka paimon
实践
前置准备请看如下文章
文章 | 链接 |
---|---|
hadoop一主三从安装 | 链接 |
spark on yarn | 链接 |
flink的yarn-session环境 | 链接 |
paimon
目标:
- 1.同步表
- 2.能过 kafka 向 paimon写入
SET parallelism.default =2;
set table.exec.sink.not-null-enforcer = drop;SET jobmanager.memory.process.size = 2048m;
SET taskmanager.memory.process.size = 10240m;
SET taskmanager.memory.managed.size = 128m;CREATE CATALOG paimon WITH ('type' = 'paimon','warehouse' = 'hdfs:///data/paimon','default-database'='ods'
);USE CATALOG paimon;DROP TABLE IF EXISTS xx_REFDES_KAFKA;
CREATE TEMPORARY xx_REFDES_KAFKA
( `PCBID` STRING,`RID` STRING,`REFDES` STRING,`BM_CIRCUIT_NO` DECIMAL(38,0),`TIMESTAMP` TIMESTAMP,`PICKUPSTATUS` STRING,`SERIAL_NUMBER` STRING,`FLAG` DECIMAL(2,0),`KITID` STRING,`ID` STRING,`CREATEDATE` TIMESTAMP,`ETL` STRING COMMENT 'etl标识',`OPT1` STRING COMMENT '备用1',`OPT2` STRING COMMENT '备用2',`OPT3` STRING COMMENT '备用3',`OPT4` STRING COMMENT '备用4',`OPT5` STRING COMMENT '备用5',`NOZZLEID` STRING COMMENT 'nxt的NOZZLEID',`LANENO` STRING COMMENT 'nxt的LANENO',`COMPONENTBARCODE` STRING COMMENT 'asm的componentBarcode nxt的Part2DCode',`PN` STRING,`LOTCODE` STRING,`DATECODE` STRING,`VERDOR` STRING,`WORKORDER` STRING,primary key(ID) not enforced)
WITH('connector' = 'kafka','topic' = 'TRACE_LOG_REFDES','properties.bootstrap.servers' = '10.xx.xx.30:9092','properties.group.id' = 'xx_REFDES_GROUP','scan.startup.mode' = 'earliest-offset','format' = 'ogg-json'
);create table if not exists yy_refdes_hive_ro
( `pcbid` string,`rid` string,`refdes` string,`bm_circuit_no` decimal(38,0),`timestamp` string COMMENT '{"type":"DATE"}',`pickupstatus` string,`serial_number` string,`flag` decimal(2,0),`kitid` string,`id` string,`createdate` string COMMENT '{"type":"DATE"}',`etl` string comment 'etl标识',`opt1` string comment '备用1',`opt2` string comment '备用2',`opt3` string comment '备用3',`opt4` string comment '备用4',`opt5` string comment '备用5',`nozzleid` string comment 'nxt的nozzleid',`laneno` string comment 'nxt的laneno',`componentbarcode` string comment 'asm的componentbarcode nxt的part2dcode',`pn` string,`lotcode` string,`datecode` string,`verdor` string,`workorder` string,`dt` string,primary key (id,dt) not enforced)partitioned by (dt) with ('connector' = 'paimon','file.format' = 'parquet','metastore.partitioned-table' = 'true','bucket' = '-1','partition.expiration-time' = '730 d','partition.expiration-check-interval' = '1 d','partition.timestamp-formatter' = 'yyyy-MM-dd','partition.timestamp-pattern' = '$dt');INSERT INTO yy_refdes_hive_ro SELECT
PCBID,RID,REFDES,BM_CIRCUIT_NO,DATE_FORMAT(`TIMESTAMP`,'yyyy-MM-dd HH:mm:ss'),PICKUPSTATUS,SERIAL_NUMBER,FLAG,KITID,ID,DATE_FORMAT(CREATEDATE,'yyyy-MM-dd HH:mm:ss'),ETL,OPT1,OPT2,OPT3,OPT4,OPT5,NOZZLEID,LANENO,COMPONENTBARCODE,PN,LOTCODE,DATECODE,VERDOR,WORKORDER,DATE_FORMAT(`TIMESTAMP`,'yyyy-MM-dd')
FROM xx_REFDES_KAFKA;
结束
ogg数据通过 flink 写入 paimon至此结束。
这篇关于flink on yarn paimon的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!