Flink 流转表,表转流,watermark设置

2024-08-23 08:44

本文主要是介绍Flink 流转表,表转流,watermark设置,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

流转表

首先创建一个流


@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Nan {private String xing;private String name;private Long ts;
}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStreamSource<String> sourceNan = env.socketTextStream("hdp01", 1111);
DataStreamSource<String> sourceNv = env.socketTextStream("hdp01", 2222);System.setProperty("java.net.preferIPv4Stack", "true");SingleOutputStreamOperator<Nan> beanNan = sourceNan.map(new MapFunction<String, Nan>() {@Overridepublic Nan map(String s) throws Exception {try {String[] split = s.split(",");return new Nan(split[0].substring(0, 1), split[1], Long.parseLong(split[2]));} catch (Exception e) {return null;}}
}).filter(Objects::nonNull).assignTimestampsAndWatermarks(WatermarkStrategy.<Nan>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Nan>() {@Overridepublic long extractTimestamp(Nan nan, long l) {return nan.getTs();}
})).returns(TypeInformation.of(Nan.class));

创建watermark

流转表的时候有一个点要注意,watermark必须要重新指定,否则会丢失,常用的方式如下
创建watermark,有两步,
第一步:必须要依据一个字段来创建watermark,这个字段必须是timestamp_ltz(3)的类型。
第二步:根据时间戳字段生成watermark
时间戳字段有两种获取方式
1、根据一个bigint字段进行转换
2、在流转表,且流上设置了watermark的情况下,根据内置属性rowtime创建,这个rowtime是流转表时暴露出来的事件时间
watermark也有两种获取方式
1、根据时间戳字段重新创建watermark
2、在流转表,且流上设置了watermark的情况下,沿用流上的watermark

下面是两种场景,只要记住第一种就行了,其实第二种没什么用。

1、 根据一个bigint字段进行创建时间戳字段,然后重新创建watermark

tenv.createTemporaryView("nan", beanNan, Schema.newBuilder().column("xing", DataTypes.STRING()).column("name", DataTypes.STRING()).column("ts", DataTypes.BIGINT()).columnByExpression("rt", "to_timestamp_ltz(ts,3)") // 根据一个bigint字段进行转换.watermark("rt", "rt - interval '1' second ") // 重新创建watermark.build());

2、根据内置属性rowtime创建时间戳字段,然后沿用流上的watermark

tenv.createTemporaryView("nan1", beanNan, Schema.newBuilder().column("xing", DataTypes.STRING()).column("name", DataTypes.STRING()).column("ts", DataTypes.BIGINT()).columnByMetadata("rt", DataTypes.TIMESTAMP_LTZ(3),"rowtime") // 根据内置属性rowtime创建.watermark("rt", "source_watermark()") // 沿用流的watermark “source_watermark 等于 rt - interval '1' second”.build());
TableResult tableResult = tenv.executeSql("select *,current_watermark(rt) from nan");
tableResult.print();

表转流

首先创建一个表

 String source = "CREATE TABLE person (  " +"  xing STRING,  " +"  name STRING,  " +"  ts BIGINT,  " +"  rt as to_timestamp_ltz(ts,3),  " +"  watermark for rt as rt - interval '1' second  " +") WITH (  " +" 'connector' = 'kafka',  " +" 'topic' = 'flink_topic',  " +" 'properties.bootstrap.servers' = '172.16.10.139:9092',  " +" 'properties.group.id' = 'testGroup',  " +" 'scan.startup.mode' = 'latest-offset', " +" 'format' = 'json'  " +")";tenv.executeSql(source);

创建watermark

表转流,可以沿用流上的watermark,不需要额外声明

DataStream<Row> dataStream = tenv.toDataStream(table);dataStream.process(new ProcessFunction<Row, Object>() {@Overridepublic void processElement(Row value, ProcessFunction<Row, Object>.Context ctx, Collector<Object> out) throws Exception {System.out.println(value+" watermark=>"+ctx.timerService().currentWatermark());}
});
env.execute();

这篇关于Flink 流转表,表转流,watermark设置的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1098903

相关文章

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

uniapp设置微信小程序的交互反馈

链接:uni.showToast(OBJECT) | uni-app官网 (dcloud.net.cn) 设置操作成功的弹窗: title是我们弹窗提示的文字 showToast是我们在加载的时候进入就会弹出的提示。 2.设置失败的提示窗口和标签 icon:'error'是设置我们失败的logo 设置的文字上限是7个文字,如果需要设置的提示文字过长就需要设置icon并给

Tomcat性能参数设置

转自:http://blog.csdn.net/chinadeng/article/details/6591542 Tomcat性能参数设置 2010 - 12 - 27 Tomcat性能参数设置 博客分类: Java Linux Tomcat 网络应用 多线程 Socket 默认参数不适合生产环境使用,因此需要修改一些参数   1、修改启动时内存参数、并指定J

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

linux下非标准波特率的设置和使用

通常,在linux下面,设置串口使用终端IO的相关函数设置,如tcsetattr等函数,linux内部有一个对常用波特率列表的索引,根据设置的波特率用底层驱动来设置异步通信芯片的寄存器 对于非标准的任意波特率需要用ioctl(fd, TIOCGSERIAL, p)和ioctl(fd, TIOCSSERIAL, p)的配合,ioctl的最后一个参数是struct serial_struct *

win7如何设置SATA硬盘

Win7在安装时设置的是IDE,安装完后需要在注册表中设置为SATA,否则直接设BIOS会不认硬盘,具体如下 注册表子项:HKEY_LOCAL_MACHINE/System/CurrentControlSet/Services/Msahci 找到Start键,将值0改为3

centOS7.0设置默认进入字符界面

刚装的,带有x window桌面,每次都是进的桌面,想改成自动进命令行的。记得以前是修改 /etc/inittab 但是这个版本inittab里的内容不一样了没有id:x:initdefault这一行而且我手动加上也不管用,这个centos 7下 /etc/inittab 的内容 Targets systemd uses targets which serve a simil

设置zookeeper开机自启动/服务化

设置启动zk的用户为zookeeper 设置启动zk的用户为zookeeper用户,而非root用户,这样比较安全。 可以使用root用户进行zookeeper的管理(启动、停止…),但对于追求卓越和安全的的人来说,采用新非root用户管理zookeeper更好。 步骤: 1. 创建用户和用户组 2. 相关目录设置用户和用户组属性 3. 采用zookeeper用户启动进程 设置z

如何设置好看的电脑屏保?电脑屏保设置教程

如何设置好看的电脑屏保?电脑屏保设置教程。大家好,今天小编给大家带来了好看的电脑屏保,教大家如何设置一个好看的电脑屏保。屏保软件很多,今天我们介绍一款比较有特殊的屁屏保软件:芝麻时钟(芝麻时钟 桌面时钟软件 桌面日历 时钟屏保 世界时钟软件下载芝麻时钟是很有个性的时钟软件,支持桌面时钟,任务栏时钟美化,世界时钟,桌面日历,桌面天气,记事便签,时钟屏保。把时钟放到桌面,选择自己喜欢的主题修改任务栏时

jqgrid设置单元格可编辑

1 在单元格的属性列设置为editable。 2 点击编辑按钮的时候,触发某一行设置为edit的状态。 jQuery("#rowed4").jqGrid({url:'server.php?q=2',datatype: "json",colNames:['Inv No','Date', 'Client', 'Amount','Tax','Total','Notes'],colModel