59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理

本文主要是介绍59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 三、检测模式
    • 1、将模式应用到流上
    • 2、从模式中选取
      • 1)、匹配事件的选择提取(PatternSelectFunction)
      • 2)、匹配事件的选择提取(PatternFlatSelectFunction)
      • 3)、匹配事件的通用处理(PatternProcessFunction)
    • 3、处理超时的部分匹配
      • 1)、使用 PatternProcessFunction 的侧输出流
      • 2)、使用 PatternTimeoutFunction的侧输出流


本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

三、检测模式

1、将模式应用到流上

将模式应用到事件流上只要调用 CEP 类的静态方法.pattern()即可,将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:

// 代码片段,完整内容可以参考本文中的其他完整示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组
DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式
Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin( ... );// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

输入流根据你的使用场景可以是keyed或者non-keyed。

在 non-keyed 流上使用模式将会使你的作业并发度被设为1。

这里的 DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了。

模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。

默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;

如果是处理时间语义,那么所谓先后就是数据到达的顺序。

对于时间戳相同或是同时到达的事件,我们还可以在 CEP.pattern()中传入一个比较器作为第三个参数,用来进行更精确的排序:

// 代码片段
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // 可选的PatternStream<Event> patternStream = CEP.pattern(loginEventDS, loginEventPattern, comparator);

得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。

2、从模式中选取

PatternStream 的转换操作分成两种:选择提取(select)操作和处理(process)操作。与 DataStream 的转换类似,在调用API 时传入一个函数类,即选择操作传入的是一个 PatternSelectFunction,处理操作传入PatternProcessFunction。

1)、匹配事件的选择提取(PatternSelectFunction)

处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction:代码中基于 PatternStream 直接调用.select()方法,传入一个 PatternSelectFunction 作为参数。

  • 示例
static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("second").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("third").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}});// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {return map.get("first").toString() + " \n" + map.get("second").toString() + " \n"+ map.get("third").toString();}}).print("输出信息:\n");// 控制台输出:env.execute();}

PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的 value 就是一个事件的列表(List)。

如果个体模式是单例的,那么 List 中只有一个元素,直接调用.get(0)就可以把它取出。

如果个体模式是循环的,List 中就有可能有多个元素了。

可以将匹配到的事件包装成 String 类型输出,代码如下:


static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).times(3) // 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {// list中放了一个匹配了3个事件的模式return map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()+ " \n" + map.get("pattern").get(2).toString();}}).print("输出信息:\n");// 控制台输出:env.execute();}

2)、匹配事件的选择提取(PatternFlatSelectFunction)

要实现一个flatSelect()方法,与 select()的不同就在于没有返回值,b并且多了一个收集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。

static void test3() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).times(3) // 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {@Overridepublic void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out)throws Exception {out.collect(// list中放了一个匹配了3个事件的模式map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()+ " \n" + map.get("pattern").get(2).toString());}}).print("输出信息:\n");// 控制台输出:env.execute();}

3)、匹配事件的通用处理(PatternProcessFunction)

在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。

推荐使用PatternProcessFunction。

PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时, 对一个模式会有不止一个事件被接受。

PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time),还可以调用.output()方法将数据输出到侧输出流。在 CEP 中,侧输出流一般被用来处理超时事件。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {@Overridepublic void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;IN startEvent = match.get("start").get(0);IN endEvent = match.get("end").get(0);out.collect(OUT(startEvent, endEvent));}
}
  • 完整示例

package org.cep;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1004, "192.168.10.3", "S", 4L));static void testProcess() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {@Overridepublic void flatSelect(Map<String, List<LoginEvent>> pattern, Collector<String> out)throws Exception {out.collect(pattern.get("first").toString());}}).print("flatSelect输出信息:\n");patternStream.process(new PatternProcessFunction<LoginEvent, String>() {@Overridepublic void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)throws Exception {out.collect(match.get("first").toString());}}).print("process输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcess();}
}

PatternProcessFunction可以访问Context对象。有了它之后,你可以访问时间属性,比如currentProcessingTime或者当前匹配的timestamp (最新分配到匹配上的事件的时间戳)。

3、处理超时的部分匹配

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {@Overridepublic void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;...}@Overridepublic void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;IN startEvent = match.get("start").get(0);ctx.output(outputTag, T(startEvent));}
}

processTimedOutMatch不能访问主输出。 但你可以通过Context对象把结果输出到侧输出。

前面提到的PatternProcessFunction是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select/flatSelect这样旧格式的API,它们会在内部被转换为PatternProcessFunction。

PatternStream<Event> patternStream = CEP.pattern(input, pattern);OutputTag<String> outputTag = new OutputTag<String>("side-output"){};SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(outputTag,new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {public void timeout(Map<String, List<Event>> pattern,long timeoutTimestamp,Collector<TimeoutEvent> out) throws Exception {out.collect(new TimeoutEvent());}},new PatternFlatSelectFunction<Event, ComplexEvent>() {public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {out.collect(new ComplexEvent());}}
);DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);

1)、使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。

官方推荐做法

完整示例如下:


package org.cep;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1005, "192.168.10.8", "F", 26L),new LoginEvent(1004, "192.168.10.3", "S", 4L));// 推荐做法static void testProcessTimedOut() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));DataStream<String> resultStream = patternStream.process(new AlanProcessTimedOut(outputTag));// 正常流输出resultStream.print("输出信息:\n");// 超时流输出,通过OutputTag((SingleOutputStreamOperator<String>) resultStream).getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcessTimedOut();}static class AlanProcessTimedOut extends PatternProcessFunction<LoginEvent, String>implements TimedOutPartialMatchHandler<LoginEvent> {private OutputTag<String> outputTag;public AlanProcessTimedOut(OutputTag<String> outputTag) {this.outputTag = outputTag;}// 超时匹配处理@Overridepublic void processTimedOutMatch(Map<String, List<LoginEvent>> match, Context ctx) throws Exception {// OutputTag<LoginEvent> outputTag = new OutputTag<LoginEvent>("AlanProcessTimedOut");ctx.output(outputTag, match.get("first").toString());}// 正常匹配处理@Overridepublic void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)throws Exception {out.collect(match.get("first").toString());}}
}

2)、使用 PatternTimeoutFunction的侧输出流

PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。

Flink CEP 中也保留了简化版的PatternSelectFunction,它无法直接处理超时事件,不过可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。

PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。

在调用 PatternStream 的.select()方法时需要传入三个参数:

  • 侧输出流标签(OutputTag)
  • 超时事件处理函数 PatternTimeoutFunction
  • 匹配事件提取函数PatternSelectFunction
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1005, "192.168.10.8", "F", 26L),new LoginEvent(1004, "192.168.10.3", "S", 4L));static void testProcessTimedOut2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));SingleOutputStreamOperator<String> resultStream = patternStream.select(outputTag,new PatternTimeoutFunction<LoginEvent, String>() {// 处理超时流@Overridepublic String timeout(Map<String, List<LoginEvent>> pattern, long timeoutTimestamp)throws Exception {return pattern.get("first").toString() + "  timeoutTimestamp:" + timeoutTimestamp;}}, new PatternSelectFunction<LoginEvent, String>() {// 处理正常流@Overridepublic String select(Map<String, List<LoginEvent>> pattern) throws Exception {return pattern.get("first").toString();}});// 正常流输出resultStream.print("输出信息:\n");// 超时流输出,通过OutputTagresultStream.getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcessTimedOut2();}}

以上,本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

这篇关于59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636780

相关文章

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Java中ArrayList的8种浅拷贝方式示例代码

《Java中ArrayList的8种浅拷贝方式示例代码》:本文主要介绍Java中ArrayList的8种浅拷贝方式的相关资料,讲解了Java中ArrayList的浅拷贝概念,并详细分享了八种实现浅... 目录引言什么是浅拷贝?ArrayList 浅拷贝的重要性方法一:使用构造函数方法二:使用 addAll(

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

JAVA利用顺序表实现“杨辉三角”的思路及代码示例

《JAVA利用顺序表实现“杨辉三角”的思路及代码示例》杨辉三角形是中国古代数学的杰出研究成果之一,是我国北宋数学家贾宪于1050年首先发现并使用的,:本文主要介绍JAVA利用顺序表实现杨辉三角的思... 目录一:“杨辉三角”题目链接二:题解代码:三:题解思路:总结一:“杨辉三角”题目链接题目链接:点击这里

SpringBoot使用注解集成Redis缓存的示例代码

《SpringBoot使用注解集成Redis缓存的示例代码》:本文主要介绍在SpringBoot中使用注解集成Redis缓存的步骤,包括添加依赖、创建相关配置类、需要缓存数据的类(Tes... 目录一、创建 Caching 配置类二、创建需要缓存数据的类三、测试方法Spring Boot 熟悉后,集成一个外

JavaScript DOM操作与事件处理方法

《JavaScriptDOM操作与事件处理方法》本文通过一系列代码片段,详细介绍了如何使用JavaScript进行DOM操作、事件处理、属性操作、内容操作、尺寸和位置获取,以及实现简单的动画效果,涵... 目录前言1. 类名操作代码片段代码解析2. 属性操作代码片段代码解析3. 内容操作代码片段代码解析4.

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

Python绘制土地利用和土地覆盖类型图示例详解

《Python绘制土地利用和土地覆盖类型图示例详解》本文介绍了如何使用Python绘制土地利用和土地覆盖类型图,并提供了详细的代码示例,通过安装所需的库,准备地理数据,使用geopandas和matp... 目录一、所需库的安装二、数据准备三、绘制土地利用和土地覆盖类型图四、代码解释五、其他可视化形式1.