flink ddl kafka mysql

2024-08-28 14:58
文章标签 mysql ddl flink kafka database

本文主要是介绍flink ddl kafka mysql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

需要的jar

   <dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>${flink.version}</version></dependency>

sql

CREATE TABLE sourceTable (userId VARCHAR, eventType VARCHAR) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.startup-mode' = 'earliest-offset','connector.topic' = 'browTopic','connector.properties.group.id' = 'testGroup','connector.properties.zookeeper.connect' = 'localhost:2181','connector.properties.bootstrap.servers' = 'localhost:9092','update-mode' = 'append','format.type' = 'json','format.derive-schema' = 'true'
)CREATE TABLE sinkTable (userId VARCHAR,eventType VARCHAR
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://localhost:3306/flink_test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','connector.table' = 'sinkTable','connector.username' = 'root','connector.password' = '123456','connector.write.flush.max-rows' = '1'
)
package org.fuwushe.sql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;public class KafkaSql {public static void main(String[] args) throws Exception {//2、设置运行环境StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);streamEnv.setParallelism(1);String sourceDDL = "CREATE TABLE sourceTable (userId VARCHAR, eventType VARCHAR) WITH (\n"+ "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n"+ "\t'connector.startup-mode' = 'earliest-offset',\n" + "\t'connector.topic' = 'browTopic',\n"+ "\t  'connector.properties.group.id' = 'testGroup',\n"+ "\t'connector.properties.zookeeper.connect' = 'localhost:2181',\n"+ "\t'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + "\t'update-mode' = 'append',\n"+ "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")";System.out.println(sourceDDL);String sinkDDL =" CREATE TABLE sinkTable (\n" + "    userId VARCHAR,\n" + "    eventType VARCHAR\n" + ") WITH (\n"+ "    'connector.type' = 'jdbc',\n"+ "    'connector.url' = 'jdbc:mysql://localhost:3306/flink_test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',\n"+ "    'connector.table' = 'sinkTable',\n" + "    'connector.username' = 'root',\n"+ "    'connector.password' = '123456',\n" + "    'connector.write.flush.max-rows' = '1'\n"+ ") ";String sinkSql = "insert into sinkTable select * from sourceTable";tableEnv.sqlUpdate(sourceDDL);tableEnv.sqlUpdate(sinkDDL);tableEnv.sqlUpdate(sinkSql);streamEnv.execute();}
}

这篇关于flink ddl kafka mysql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115094

相关文章

浅谈mysql的sql_mode可能会限制你的查询

《浅谈mysql的sql_mode可能会限制你的查询》本文主要介绍了浅谈mysql的sql_mode可能会限制你的查询,这个问题主要说明的是,我们写的sql查询语句违背了聚合函数groupby的规则... 目录场景:问题描述原因分析:解决方案:第一种:修改后,只有当前生效,若是mysql服务重启,就会失效;

MySQL多列IN查询的实现

《MySQL多列IN查询的实现》多列IN查询是一种强大的筛选工具,它允许通过多字段组合快速过滤数据,本文主要介绍了MySQL多列IN查询的实现,具有一定的参考价值,感兴趣的可以了解一下... 目录一、基础语法:多列 IN 的两种写法1. 直接值列表2. 子查询二、对比传统 OR 的写法三、性能分析与优化1.

MySQL新增字段后Java实体未更新的潜在问题与解决方案

《MySQL新增字段后Java实体未更新的潜在问题与解决方案》在Java+MySQL的开发中,我们通常使用ORM框架来映射数据库表与Java对象,但有时候,数据库表结构变更(如新增字段)后,开发人员可... 目录引言1. 问题背景:数据库与 Java 实体不同步1.1 常见场景1.2 示例代码2. 不同操作

mysql如何查看当前连接数

《mysql如何查看当前连接数》:本文主要介绍mysql如何查看当前连接数问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql查看当前连接数查看mysql数据库允许最大连接数总结mysql查看当前连接数查看当前连接数SHOW STATUS LIKE

通过Docker Compose部署MySQL的详细教程

《通过DockerCompose部署MySQL的详细教程》DockerCompose作为Docker官方的容器编排工具,为MySQL数据库部署带来了显著优势,下面小编就来为大家详细介绍一... 目录一、docker Compose 部署 mysql 的优势二、环境准备与基础配置2.1 项目目录结构2.2 基

Linux安装MySQL的教程

《Linux安装MySQL的教程》:本文主要介绍Linux安装MySQL的教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux安装mysql1.Mysql官网2.我的存放路径3.解压mysql文件到当前目录4.重命名一下5.创建mysql用户组和用户并修

MySQL中慢SQL优化的不同方式介绍

《MySQL中慢SQL优化的不同方式介绍》慢SQL的优化,主要从两个方面考虑,SQL语句本身的优化,以及数据库设计的优化,下面小编就来给大家介绍一下有哪些方式可以优化慢SQL吧... 目录避免不必要的列分页优化索引优化JOIN 的优化排序优化UNION 优化慢 SQL 的优化,主要从两个方面考虑,SQL 语

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个

如何解决mysql出现Incorrect string value for column ‘表项‘ at row 1错误问题

《如何解决mysql出现Incorrectstringvalueforcolumn‘表项‘atrow1错误问题》:本文主要介绍如何解决mysql出现Incorrectstringv... 目录mysql出现Incorrect string value for column ‘表项‘ at row 1错误报错

Mysql中深分页的五种常用方法整理

《Mysql中深分页的五种常用方法整理》在数据量非常大的情况下,深分页查询则变得很常见,这篇文章为大家整理了5个常用的方法,文中的示例代码讲解详细,大家可以根据自己的需求进行选择... 目录方案一:延迟关联 (Deferred Join)方案二:有序唯一键分页 (Cursor-based Paginatio