flink ddl kafka mysql

2024-08-28 14:58
文章标签 mysql ddl flink kafka database

本文主要是介绍flink ddl kafka mysql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

需要的jar

   <dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>${flink.version}</version></dependency>

sql

CREATE TABLE sourceTable (userId VARCHAR, eventType VARCHAR) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.startup-mode' = 'earliest-offset','connector.topic' = 'browTopic','connector.properties.group.id' = 'testGroup','connector.properties.zookeeper.connect' = 'localhost:2181','connector.properties.bootstrap.servers' = 'localhost:9092','update-mode' = 'append','format.type' = 'json','format.derive-schema' = 'true'
)CREATE TABLE sinkTable (userId VARCHAR,eventType VARCHAR
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://localhost:3306/flink_test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','connector.table' = 'sinkTable','connector.username' = 'root','connector.password' = '123456','connector.write.flush.max-rows' = '1'
)
package org.fuwushe.sql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;public class KafkaSql {public static void main(String[] args) throws Exception {//2、设置运行环境StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);streamEnv.setParallelism(1);String sourceDDL = "CREATE TABLE sourceTable (userId VARCHAR, eventType VARCHAR) WITH (\n"+ "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n"+ "\t'connector.startup-mode' = 'earliest-offset',\n" + "\t'connector.topic' = 'browTopic',\n"+ "\t  'connector.properties.group.id' = 'testGroup',\n"+ "\t'connector.properties.zookeeper.connect' = 'localhost:2181',\n"+ "\t'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + "\t'update-mode' = 'append',\n"+ "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")";System.out.println(sourceDDL);String sinkDDL =" CREATE TABLE sinkTable (\n" + "    userId VARCHAR,\n" + "    eventType VARCHAR\n" + ") WITH (\n"+ "    'connector.type' = 'jdbc',\n"+ "    'connector.url' = 'jdbc:mysql://localhost:3306/flink_test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',\n"+ "    'connector.table' = 'sinkTable',\n" + "    'connector.username' = 'root',\n"+ "    'connector.password' = '123456',\n" + "    'connector.write.flush.max-rows' = '1'\n"+ ") ";String sinkSql = "insert into sinkTable select * from sourceTable";tableEnv.sqlUpdate(sourceDDL);tableEnv.sqlUpdate(sinkDDL);tableEnv.sqlUpdate(sinkSql);streamEnv.execute();}
}

这篇关于flink ddl kafka mysql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115094

相关文章

浅谈mysql的not exists走不走索引

《浅谈mysql的notexists走不走索引》在MySQL中,​NOTEXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引,下面就来介绍一下mysql的notexists走不走索... 在mysql中,​NOT EXISTS子句是否使用索引取决于子查询中关联字段是否建立了合适的索引。以下

Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式

《Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式》本文详细介绍如何使用Java通过JDBC连接MySQL数据库,包括下载驱动、配置Eclipse环境、检测数据库连接等关键步骤,... 目录一、下载驱动包二、放jar包三、检测数据库连接JavaJava 如何使用 JDBC 连接 mys

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四: