本文主要是介绍Flink中在source流中自定义timestamp和watermark,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
To work with Event Time, streaming programs need to set the time characteristic accordingly.
首先配置成,Event Time
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Assigning Timestamps
接着,我们需要定义如何去获取event time和如何产生Watermark?
一种方式,在source中写死,
@Override public void run(SourceContext<MyType> ctx) throws Exception {while (/* condition */) {MyType next = getNext();ctx.collectWithTimestamp(next, next.getEventTimestamp());if (next.hasWatermarkTime()) {ctx.emitWatermark(new Watermark(next.getWatermarkTime()));}} }
这种方式明显比较low,不太方便,并且这种方式是会被TimestampAssigner 覆盖掉的,
所以看看第二种方式,
Timestamp Assigners / Watermark Generators
一般在会在source后加些map,filter做些初始化或格式化
然后,在任意需要用到event time的操作之前,比如window,进行设置
给个例子,
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).timeWindow(Time.seconds(10)).reduce( (a, b) -> a.add(b) ).addSink(...);
那么Timestamp Assigners如何实现,比如例子中给出的MyTimestampsAndWatermarks
有3种,
、
DataStream<MyEvent> stream = ...DataStream<MyEvent> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {@Overridepublic long extractAscendingTimestamp(MyEvent element) {return element.getCreationTime();} });
这种没人用吧,不如直接用processing time了
定期的发送,你可以通过ExecutionConfig.setAutoWatermarkInterval(...),来设置这个频率
/*** This generator generates watermarks assuming that elements come out of order to a certain degree only.* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest* elements for timestamp t.*/ public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {long timestamp = element.getCreationTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current highest timestamp minus the out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);} }/*** This generator generates watermarks that are lagging behind processing time by a certain amount.* It assumes that elements arrive in Flink after at most a certain time.*/ public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds @Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark getCurrentWatermark() {// return the watermark as current time minus the maximum time lag return new Watermark(System.currentTimeMillis() - maxTimeLag);} }
上面给出两个case,区别是第一种,会以event time的Max,来设置watermark
第二种,是以当前的processing time来设置watermark
With Punctuated Watermarks
To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use theAssignerWithPunctuatedWatermarks
. For this class, Flink will first call the extractTimestamp(...)
method to assign the element a timestamp, and then immediately call for that element the checkAndGetNextWatermark(...)
method.
The checkAndGetNextWatermark(...)
method gets the timestamp that was assigned in the extractTimestamp(...)
method, and can decide whether it wants to generate a Watermark. Whenever the checkAndGetNextWatermark(...)
method returns a non-null Watermark, and that Watermark is larger than the latest previous Watermark, that new Watermark will be emitted.
这种即,watermark不是由时间来触发的,而是以特定的event触发的,即本到某些特殊的event或message,才触发watermark
所以它的接口叫,checkAndGetNextWatermark
需要先check
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {@Overridepublic long extractTimestamp(MyEvent element, long previousElementTimestamp) {return element.getCreationTime();}@Overridepublic Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;} }
这篇关于Flink中在source流中自定义timestamp和watermark的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!