59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理

本文主要是介绍59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 三、检测模式
    • 1、将模式应用到流上
    • 2、从模式中选取
      • 1)、匹配事件的选择提取(PatternSelectFunction)
      • 2)、匹配事件的选择提取(PatternFlatSelectFunction)
      • 3)、匹配事件的通用处理(PatternProcessFunction)
    • 3、处理超时的部分匹配
      • 1)、使用 PatternProcessFunction 的侧输出流
      • 2)、使用 PatternTimeoutFunction的侧输出流


本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

三、检测模式

1、将模式应用到流上

将模式应用到事件流上只要调用 CEP 类的静态方法.pattern()即可,将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:

// 代码片段,完整内容可以参考本文中的其他完整示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组
DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式
Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin( ... );// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

输入流根据你的使用场景可以是keyed或者non-keyed。

在 non-keyed 流上使用模式将会使你的作业并发度被设为1。

这里的 DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了。

模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。

默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;

如果是处理时间语义,那么所谓先后就是数据到达的顺序。

对于时间戳相同或是同时到达的事件,我们还可以在 CEP.pattern()中传入一个比较器作为第三个参数,用来进行更精确的排序:

// 代码片段
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // 可选的PatternStream<Event> patternStream = CEP.pattern(loginEventDS, loginEventPattern, comparator);

得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。

2、从模式中选取

PatternStream 的转换操作分成两种:选择提取(select)操作和处理(process)操作。与 DataStream 的转换类似,在调用API 时传入一个函数类,即选择操作传入的是一个 PatternSelectFunction,处理操作传入PatternProcessFunction。

1)、匹配事件的选择提取(PatternSelectFunction)

处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction:代码中基于 PatternStream 直接调用.select()方法,传入一个 PatternSelectFunction 作为参数。

  • 示例
static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("second").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("third").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}});// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {return map.get("first").toString() + " \n" + map.get("second").toString() + " \n"+ map.get("third").toString();}}).print("输出信息:\n");// 控制台输出:env.execute();}

PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的 value 就是一个事件的列表(List)。

如果个体模式是单例的,那么 List 中只有一个元素,直接调用.get(0)就可以把它取出。

如果个体模式是循环的,List 中就有可能有多个元素了。

可以将匹配到的事件包装成 String 类型输出,代码如下:


static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).times(3) // 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {// list中放了一个匹配了3个事件的模式return map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()+ " \n" + map.get("pattern").get(2).toString();}}).print("输出信息:\n");// 控制台输出:env.execute();}

2)、匹配事件的选择提取(PatternFlatSelectFunction)

要实现一个flatSelect()方法,与 select()的不同就在于没有返回值,b并且多了一个收集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。

static void test3() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).times(3) // 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {@Overridepublic void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out)throws Exception {out.collect(// list中放了一个匹配了3个事件的模式map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()+ " \n" + map.get("pattern").get(2).toString());}}).print("输出信息:\n");// 控制台输出:env.execute();}

3)、匹配事件的通用处理(PatternProcessFunction)

在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。

推荐使用PatternProcessFunction。

PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时, 对一个模式会有不止一个事件被接受。

PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time),还可以调用.output()方法将数据输出到侧输出流。在 CEP 中,侧输出流一般被用来处理超时事件。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {@Overridepublic void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;IN startEvent = match.get("start").get(0);IN endEvent = match.get("end").get(0);out.collect(OUT(startEvent, endEvent));}
}
  • 完整示例

package org.cep;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1004, "192.168.10.3", "S", 4L));static void testProcess() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {@Overridepublic void flatSelect(Map<String, List<LoginEvent>> pattern, Collector<String> out)throws Exception {out.collect(pattern.get("first").toString());}}).print("flatSelect输出信息:\n");patternStream.process(new PatternProcessFunction<LoginEvent, String>() {@Overridepublic void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)throws Exception {out.collect(match.get("first").toString());}}).print("process输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcess();}
}

PatternProcessFunction可以访问Context对象。有了它之后,你可以访问时间属性,比如currentProcessingTime或者当前匹配的timestamp (最新分配到匹配上的事件的时间戳)。

3、处理超时的部分匹配

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {@Overridepublic void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;...}@Overridepublic void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;IN startEvent = match.get("start").get(0);ctx.output(outputTag, T(startEvent));}
}

processTimedOutMatch不能访问主输出。 但你可以通过Context对象把结果输出到侧输出。

前面提到的PatternProcessFunction是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select/flatSelect这样旧格式的API,它们会在内部被转换为PatternProcessFunction。

PatternStream<Event> patternStream = CEP.pattern(input, pattern);OutputTag<String> outputTag = new OutputTag<String>("side-output"){};SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(outputTag,new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {public void timeout(Map<String, List<Event>> pattern,long timeoutTimestamp,Collector<TimeoutEvent> out) throws Exception {out.collect(new TimeoutEvent());}},new PatternFlatSelectFunction<Event, ComplexEvent>() {public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {out.collect(new ComplexEvent());}}
);DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);

1)、使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。

官方推荐做法

完整示例如下:


package org.cep;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1005, "192.168.10.8", "F", 26L),new LoginEvent(1004, "192.168.10.3", "S", 4L));// 推荐做法static void testProcessTimedOut() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));DataStream<String> resultStream = patternStream.process(new AlanProcessTimedOut(outputTag));// 正常流输出resultStream.print("输出信息:\n");// 超时流输出,通过OutputTag((SingleOutputStreamOperator<String>) resultStream).getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcessTimedOut();}static class AlanProcessTimedOut extends PatternProcessFunction<LoginEvent, String>implements TimedOutPartialMatchHandler<LoginEvent> {private OutputTag<String> outputTag;public AlanProcessTimedOut(OutputTag<String> outputTag) {this.outputTag = outputTag;}// 超时匹配处理@Overridepublic void processTimedOutMatch(Map<String, List<LoginEvent>> match, Context ctx) throws Exception {// OutputTag<LoginEvent> outputTag = new OutputTag<LoginEvent>("AlanProcessTimedOut");ctx.output(outputTag, match.get("first").toString());}// 正常匹配处理@Overridepublic void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)throws Exception {out.collect(match.get("first").toString());}}
}

2)、使用 PatternTimeoutFunction的侧输出流

PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。

Flink CEP 中也保留了简化版的PatternSelectFunction,它无法直接处理超时事件,不过可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。

PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。

在调用 PatternStream 的.select()方法时需要传入三个参数:

  • 侧输出流标签(OutputTag)
  • 超时事件处理函数 PatternTimeoutFunction
  • 匹配事件提取函数PatternSelectFunction
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1005, "192.168.10.8", "F", 26L),new LoginEvent(1004, "192.168.10.3", "S", 4L));static void testProcessTimedOut2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));SingleOutputStreamOperator<String> resultStream = patternStream.select(outputTag,new PatternTimeoutFunction<LoginEvent, String>() {// 处理超时流@Overridepublic String timeout(Map<String, List<LoginEvent>> pattern, long timeoutTimestamp)throws Exception {return pattern.get("first").toString() + "  timeoutTimestamp:" + timeoutTimestamp;}}, new PatternSelectFunction<LoginEvent, String>() {// 处理正常流@Overridepublic String select(Map<String, List<LoginEvent>> pattern) throws Exception {return pattern.get("first").toString();}});// 正常流输出resultStream.print("输出信息:\n");// 超时流输出,通过OutputTagresultStream.getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcessTimedOut2();}}

以上,本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

这篇关于59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636780

相关文章

Nginx设置连接超时并进行测试的方法步骤

《Nginx设置连接超时并进行测试的方法步骤》在高并发场景下,如果客户端与服务器的连接长时间未响应,会占用大量的系统资源,影响其他正常请求的处理效率,为了解决这个问题,可以通过设置Nginx的连接... 目录设置连接超时目的操作步骤测试连接超时测试方法:总结:设置连接超时目的设置客户端与服务器之间的连接

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Android 悬浮窗开发示例((动态权限请求 | 前台服务和通知 | 悬浮窗创建 )

《Android悬浮窗开发示例((动态权限请求|前台服务和通知|悬浮窗创建)》本文介绍了Android悬浮窗的实现效果,包括动态权限请求、前台服务和通知的使用,悬浮窗权限需要动态申请并引导... 目录一、悬浮窗 动态权限请求1、动态请求权限2、悬浮窗权限说明3、检查动态权限4、申请动态权限5、权限设置完毕后

在 Spring Boot 中使用 @Autowired和 @Bean注解的示例详解

《在SpringBoot中使用@Autowired和@Bean注解的示例详解》本文通过一个示例演示了如何在SpringBoot中使用@Autowired和@Bean注解进行依赖注入和Bean... 目录在 Spring Boot 中使用 @Autowired 和 @Bean 注解示例背景1. 定义 Stud

oracle DBMS_SQL.PARSE的使用方法和示例

《oracleDBMS_SQL.PARSE的使用方法和示例》DBMS_SQL是Oracle数据库中的一个强大包,用于动态构建和执行SQL语句,DBMS_SQL.PARSE过程解析SQL语句或PL/S... 目录语法示例注意事项DBMS_SQL 是 oracle 数据库中的一个强大包,它允许动态地构建和执行

Python中顺序结构和循环结构示例代码

《Python中顺序结构和循环结构示例代码》:本文主要介绍Python中的条件语句和循环语句,条件语句用于根据条件执行不同的代码块,循环语句用于重复执行一段代码,文章还详细说明了range函数的使... 目录一、条件语句(1)条件语句的定义(2)条件语句的语法(a)单分支 if(b)双分支 if-else(

Python中Markdown库的使用示例详解

《Python中Markdown库的使用示例详解》Markdown库是一个用于处理Markdown文本的Python工具,这篇文章主要为大家详细介绍了Markdown库的具体使用,感兴趣的... 目录一、背景二、什么是 Markdown 库三、如何安装这个库四、库函数使用方法1. markdown.mark

MySQL数据库函数之JSON_EXTRACT示例代码

《MySQL数据库函数之JSON_EXTRACT示例代码》:本文主要介绍MySQL数据库函数之JSON_EXTRACT的相关资料,JSON_EXTRACT()函数用于从JSON文档中提取值,支持对... 目录前言基本语法路径表达式示例示例 1: 提取简单值示例 2: 提取嵌套值示例 3: 提取数组中的值注意

CSS3中使用flex和grid实现等高元素布局的示例代码

《CSS3中使用flex和grid实现等高元素布局的示例代码》:本文主要介绍了使用CSS3中的Flexbox和Grid布局实现等高元素布局的方法,通过简单的两列实现、每行放置3列以及全部代码的展示,展示了这两种布局方式的实现细节和效果,详细内容请阅读本文,希望能对你有所帮助... 过往的实现方法是使用浮动加

css渐变色背景|<gradient示例详解

《css渐变色背景|<gradient示例详解》CSS渐变是一种从一种颜色平滑过渡到另一种颜色的效果,可以作为元素的背景,它包括线性渐变、径向渐变和锥形渐变,本文介绍css渐变色背景|<gradien... 使用渐变色作为背景可以直接将渐China编程变色用作元素的背景,可以看做是一种特殊的背景图片。(是作为背