本文主要是介绍flink ddl kafka mysql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
需要的jar
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>${flink.version}</version></dependency>
sql
CREATE TABLE sourceTable (userId VARCHAR, eventType VARCHAR) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.startup-mode' = 'earliest-offset','connector.topic' = 'browTopic','connector.properties.group.id' = 'testGroup','connector.properties.zookeeper.connect' = 'localhost:2181','connector.properties.bootstrap.servers' = 'localhost:9092','update-mode' = 'append','format.type' = 'json','format.derive-schema' = 'true'
)CREATE TABLE sinkTable (userId VARCHAR,eventType VARCHAR
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://localhost:3306/flink_test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8','connector.table' = 'sinkTable','connector.username' = 'root','connector.password' = '123456','connector.write.flush.max-rows' = '1'
)
package org.fuwushe.sql;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;public class KafkaSql {public static void main(String[] args) throws Exception {//2、设置运行环境StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);streamEnv.setParallelism(1);String sourceDDL = "CREATE TABLE sourceTable (userId VARCHAR, eventType VARCHAR) WITH (\n"+ "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n"+ "\t'connector.startup-mode' = 'earliest-offset',\n" + "\t'connector.topic' = 'browTopic',\n"+ "\t 'connector.properties.group.id' = 'testGroup',\n"+ "\t'connector.properties.zookeeper.connect' = 'localhost:2181',\n"+ "\t'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + "\t'update-mode' = 'append',\n"+ "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")";System.out.println(sourceDDL);String sinkDDL =" CREATE TABLE sinkTable (\n" + " userId VARCHAR,\n" + " eventType VARCHAR\n" + ") WITH (\n"+ " 'connector.type' = 'jdbc',\n"+ " 'connector.url' = 'jdbc:mysql://localhost:3306/flink_test?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',\n"+ " 'connector.table' = 'sinkTable',\n" + " 'connector.username' = 'root',\n"+ " 'connector.password' = '123456',\n" + " 'connector.write.flush.max-rows' = '1'\n"+ ") ";String sinkSql = "insert into sinkTable select * from sourceTable";tableEnv.sqlUpdate(sourceDDL);tableEnv.sqlUpdate(sinkDDL);tableEnv.sqlUpdate(sinkSql);streamEnv.execute();}
}
这篇关于flink ddl kafka mysql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!