本文主要是介绍flink 1.18 sql demo,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
flink 1.18 sql demo
更换flink-table-planner 为 flink-table-planner-loader pom.xml
<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-uber</artifactId><version>1.18.0</version><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.18.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.18.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.18.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.18.0</version></dependency><!-- 官网给的是flink-connector-kafka 但是flink on k8s 会缺包然后有个sql-connector jar 引入后正常 两个保留一个即可 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka</artifactId><version>3.0.2-1.18</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.0.2-1.18</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-planner_2.12</artifactId>-->
<!-- <version>1.18.0</version>-->
<!-- </dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader--><dependency><groupId>org.apache.flink</groupId><artifactId> </artifactId><version>1.18.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.21</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- Replace this with the main class of your job 这里是你的主类地址--><mainClass>com.cn.App</mainClass></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>
demo
package com.cn;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Classname app* @Description TODO* @Date 2024/1/12 11:26* @Created by typezhou*/
public class App {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(1000L);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);String str = "CREATE TABLE KafkaTable (\n" +" `user_id` STRING,\n" +" `ts` TIMESTAMP(3) METADATA FROM 'timestamp'\n" +") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = 'aaaa',\n" +" 'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +" 'properties.group.id' = 'testGrou1p',\n" +" 'scan.startup.mode' = 'latest-offset',\n" +" 'format' = 'csv'\n" +")";tableEnv.executeSql(str);Table tableResult = tableEnv.sqlQuery("SELECT user_id FROM KafkaTable group by user_id");
// DataStream<ResultBean> tuple2DataStream = tableEnv.toDataStream(result, ResultBean.class);
// SingleOutputStreamOperator<ResultBean> map = tuple2DataStream.map(new MapFunction<ResultBean, ResultBean>() {
// @Override
// public ResultBean map(ResultBean s) throws Exception {
// Thread.sleep(3000L);
// return s;
// }
// });
// tuple2DataStream.print();String sqlPri = "CREATE TABLE print_table (\n" +" `user_id` STRING \n" +") WITH (\n" +" 'connector' = 'kafka',\n" +" 'topic' = 'bbbb',\n" +" 'properties.bootstrap.servers' = '172.xx.xx.xx:9092,172.xx.86.xx:9092,172.xx.xx.xx:9092',\n" +" 'format' = 'csv'\n" +")";tableEnv.executeSql(sqlPri);tableEnv.executeSql("insert into print_table SELECT user_id FROM KafkaTable");}}
这篇关于flink 1.18 sql demo的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!