59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例

本文主要是介绍59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 四、CEP库中的时间
    • 1、按照事件时间处理迟到事件
    • 2、时间上下文
  • 五、可选的参数设置
  • 六、示例
    • 1、maven依赖
    • 2、示例:输出每个用户连续三次登录失败的信息,允许数据延迟10s
    • 3、示例:查找 同一个ip在10秒内访问同一个链接超过3次的ip,可以不连续
    • 4、示例:监测服务器的温度并告警


本文介绍了Flink 的类库CEP的时间处理、可选的参数以及在实际工作中非常有用的三个示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

四、CEP库中的时间

1、按照事件时间处理迟到事件

在CEP中,事件的处理顺序很重要。在使用事件时间时,为了保证事件按照正确的顺序被处理,一个事件到来后会先被放到一个缓冲区中, 在缓冲区里事件都按照时间戳从小到大排序,当水位线到达后,缓冲区中所有小于水位线的事件被处理。这意味着水位线之间的数据都按照时间戳被顺序处理。

这个库假定按照事件时间时水位线一定是正确的。
为了保证跨水位线的事件按照事件时间处理,Flink CEP库假定水位线一定是正确的,并且把时间戳小于最新水位线的事件看作是晚到的。 晚到的事件不会被处理。你也可以指定一个侧输出标志来收集比最新水位线晚到的事件,你可以这样做:


PatternStream<Event> patternStream = CEP.pattern(input, pattern);OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};SingleOutputStreamOperator<ComplexEvent> result = patternStream.sideOutputLateData(lateDataOutputTag).select(new PatternSelectFunction<Event, ComplexEvent>() {...});DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

2、时间上下文

在PatternProcessFunction中,用户可以和IterativeCondition中 一样按照下面的方法使用实现了TimeContext的上下文:


/*** 支持获取事件属性比如当前处理事件或当前正处理的事件的时间。* 用在{@link PatternProcessFunction}和{@link org.apache.flink.cep.pattern.conditions.IterativeCondition}中*/
@PublicEvolving
public interface TimeContext {/*** 当前正处理的事件的时间戳。** <p>如果是{@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime},这个值会被设置为事件进入CEP算子的时间。*/long timestamp();/** 返回当前的处理时间。 */long currentProcessingTime();
}

这个上下文让用户可以获取处理的事件(在IterativeCondition时候是进来的记录,在PatternProcessFunction是匹配的结果)的时间属性。 调用TimeContext#currentProcessingTime总是返回当前的处理时间,而且尽量去调用这个函数而不是调用其它的比如说System.currentTimeMillis()。

使用EventTime时,TimeContext#timestamp()返回的值等于分配的时间戳。 使用ProcessingTime时,这个值等于事件进入CEP算子的时间点(在PatternProcessFunction中是匹配产生的时间)。 这意味着多次调用这个方法得到的值是一致的。

五、可选的参数设置

用于配置 Flink CEP 的 SharedBuffer 缓存容量的选项。它可以加快 CEP 算子的处理速度,并限制内存中缓存的元素数量。

仅当 state.backend.type 设置为 rocksdb 时限制内存使用才有效,这会将超过缓存数量的元素传输到 rocksdb 状态存储而不是内存状态存储。当 state.backend.type 设置为 rocksdb 时,这些配置项有助于限制内存。相比之下,当 state.backend 设置为非 rocksdb 时,缓存会导致性能下降。与使用 Map 实现的旧缓存相比,状态部分将包含更多从 guava-cache 换出的元素,这将使得 copy on write 时的状态处理增加一些开销。

在这里插入图片描述

六、示例

本部分通过几个示例展示CEP的使用方式,有些是工作中实际的例子简化版,有些是为了演示CEP功能构造的例子,其运行结果均在代码的注释中。

1、maven依赖

本文示例均使用此处的依赖。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version>
</properties>
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><!-- <scope>provided</scope> --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>1.17.2</version></dependency>
</dependencies>

2、示例:输出每个用户连续三次登录失败的信息,允许数据延迟10s

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: 输出每个用户连续三次登录失败的信息,允许数据延迟10s*/
public class TestLoginFailDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式,连续的三个登录失败事件Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("second").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("third").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}});// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {// 个体模式是单例的,List 中只有一个元素LoginEvent first = map.get("first").get(0);LoginEvent second = map.get("second").get(0);LoginEvent third = map.get("third").get(0);// return first.toString() + "\n" + second.toString() + "\n" + third.toString();return map.get("first").toString() + " \n" + map.get("second").toString() + " \n"+ map.get("third").toString();}}).print("连续三次登录失败用户信息:\n");// 连续三次登录失败用户信息:// :9> TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.1, status=F,timestamp=2)// TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.2, status=F, timestamp=3)// TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.6, status=F, timestamp=5)env.execute();}}

3、示例:查找 同一个ip在10秒内访问同一个链接超过3次的ip,可以不连续

可以监控接口是否被攻击,应用应该比较广泛。

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: 查找 同一个ip在10秒内访问同一个链接超过3次的ip,可以不连续*/
public class TestRepeatAccessDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LogMessage {private String ip;private String url;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LogMessage) {LogMessage logMessage = (LogMessage) obj;return this.ip.equals(logMessage.getIp()) && this.url.equals(logMessage.getUrl())&& this.timestamp == logMessage.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LogMessage> logMessageList = Arrays.asList(new LogMessage("192.168.10.1", "URL1", 2L),new LogMessage("192.168.10.1", "URL1", 3L),new LogMessage("192.168.10.1", "URL2", 4L),new LogMessage("192.168.10.1", "URL2", 5L),new LogMessage("192.168.10.8", "URL1", 6L),new LogMessage("192.168.10.1", "URL1", 7L));@Data@NoArgsConstructor@AllArgsConstructorstatic class RiskLogList extends LogMessage {private int count;}static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LogMessage> logMessageDS = env.fromCollection(logMessageList).assignTimestampsAndWatermarks(WatermarkStrategy.<LogMessage>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((logMessage, rs) -> logMessage.getTimestamp())).keyBy(logMessage -> logMessage.getIp() + logMessage.getUrl()); // 根据ip和url分组// 定义模式Pattern<LogMessage, ?> logMessagePattern = Pattern.<LogMessage>begin("first").followedBy("second").times(2).within(Time.seconds(10));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LogMessage> patternStream = CEP.pattern(logMessageDS, logMessagePattern);// 将匹配到的流选择出来输出patternStream.process(new PatternProcessFunction<LogMessage, String>() {@Overridepublic void processMatch(Map<String, List<LogMessage>> match, Context ctx, Collector<String> out)throws Exception {LogMessage logMessage1 = match.get("first").get(0);LogMessage logMessage2 = match.get("second").get(0);LogMessage logMessage3 = match.get("second").get(1);boolean flag = logMessage1.getUrl().equals(logMessage2.getUrl())&& logMessage1.getUrl().equals(logMessage3.getUrl());if (flag) {out.collect(logMessage1.getIp() + "  url:" + logMessage1.getUrl() + "   timestamp:"+ logMessage1.getTimestamp() + "  timestamp2:" + logMessage2.getTimestamp()+ "  timestamp3:" + logMessage3.getTimestamp());}}}).print("输出信息:\n");// 控制台输出:// 输出信息::1> 192.168.10.1 url:URL1 timestamp:2 timestamp2:3 timestamp3:7env.execute();}public static void main(String[] args) throws Exception {test1();}
}

4、示例:监测服务器的温度并告警

监测 机器温度一小时内三次大于设定温度进行风险记录,将风险记录中的数据上一次大于平均值进行报警。

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: 监测 机器温度一小时内三次大于设定温度进行风险记录,将风险记录中的数据上一次大于平均值进行报警*/
public class TestMachineMonitoring {// 机器的基本信息@Data@NoArgsConstructor@AllArgsConstructorstatic class MechineInfo {private int mechineId;private String mechineName;private int temperature;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof MechineInfo) {MechineInfo mechineInfo = (MechineInfo) obj;return this.mechineId == mechineInfo.getMechineId()&& this.mechineName.equals(mechineInfo.getMechineName())&& this.timestamp == mechineInfo.getTimestamp()&& this.temperature == mechineInfo.getTemperature();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}// 机器的三次平均温度@Data@NoArgsConstructor@AllArgsConstructorstatic class MechineRiskInfo {private int mechineId;private double avgTemperature;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof MechineRiskInfo) {MechineRiskInfo mechineRiskInfo = (MechineRiskInfo) obj;return this.mechineId == mechineRiskInfo.getMechineId()&& this.avgTemperature == mechineRiskInfo.getAvgTemperature()&& this.timestamp == mechineRiskInfo.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}// 预警通知信息@Data@NoArgsConstructor@AllArgsConstructorstatic class MechineAlertInfo {private int mechineId;private String email;private double avgTemperature;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof MechineAlertInfo) {MechineAlertInfo mechineAlertInfo = (MechineAlertInfo) obj;return this.mechineId == mechineAlertInfo.getMechineId() && this.email == mechineAlertInfo.getEmail()&& this.avgTemperature == mechineAlertInfo.getAvgTemperature()&& this.timestamp == mechineAlertInfo.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}// 初始化流数据static List<MechineInfo> mechineInfoList = Arrays.asList(new MechineInfo(1, "m1", 331, 2L),new MechineInfo(1, "m1", 321, 4L),new MechineInfo(1, "m1", 311, 5L),new MechineInfo(1, "m1", 361, 7L),new MechineInfo(1, "m1", 351, 9L),new MechineInfo(1, "m1", 341, 11L),new MechineInfo(2, "m11", 121, 3L),new MechineInfo(3, "m21", 101, 4L),new MechineInfo(4, "m31", 98, 5L),new MechineInfo(5, "m41", 123, 6L));// 风险数据集合// static List<MechineRiskInfo> mechineRiskInfoList = new ArrayList();// 预警数据集合// static Map<String, MechineAlertInfo> mechineAlertInfoMap = new HashMap<String, MechineAlertInfo>();// 预警温度private static final double TEMPERATURE_SETTING = 100;// 超时数据static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<MechineInfo> mechineInfoStream = env.fromCollection(mechineInfoList).assignTimestampsAndWatermarks(WatermarkStrategy.<MechineInfo>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((logMessage, rs) -> logMessage.getTimestamp())).keyBy(mechineInfo -> mechineInfo.getMechineId()); // 根据ip和url分组// 定义模式1,过滤温度大于设置温度Pattern<MechineInfo, ?> mechineInfoPattern = Pattern.<MechineInfo>begin("first").where(new SimpleCondition<MechineInfo>() {@Overridepublic boolean filter(MechineInfo value) throws Exception {return value.getTemperature() >= TEMPERATURE_SETTING;}}).followedBy("second").where(new SimpleCondition<MechineInfo>() {@Overridepublic boolean filter(MechineInfo value) throws Exception {return value.getTemperature() >= TEMPERATURE_SETTING;}}).times(2).within(Time.minutes(10));PatternStream<MechineInfo> patternStream = CEP.pattern(mechineInfoStream, mechineInfoPattern);// 筛选,并计算 三次温度平均值DataStream<MechineRiskInfo> mechineRiskInfoStream = patternStream.process(new PatternProcessFunction<TestMachineMonitoring.MechineInfo, MechineRiskInfo>() {@Overridepublic void processMatch(Map<String, List<MechineInfo>> match, Context ctx,Collector<MechineRiskInfo> out)throws Exception {MechineInfo firstMechineInfo = match.get("first").get(0);MechineInfo secondMechineInfo1 = match.get("second").get(0);MechineInfo secondMechineInfo2 = match.get("second").get(1);// System.out.printf("mechineInfo:id=%s,name=%s,t=%s,ts=%s",//         firstMechineInfo.getMechineId(),//         firstMechineInfo.getMechineName(), firstMechineInfo.getTemperature(),//         firstMechineInfo.getTimestamp() + "\n");// System.out.printf("secondMechineInfo1:id=%s,name=%s,t=%s,ts=%s",//         secondMechineInfo1.getMechineId(),//         secondMechineInfo1.getMechineName(), secondMechineInfo1.getTemperature(),//         secondMechineInfo1.getTimestamp() + "\n");// System.out.printf("secondMechineInfo2:id=%s,name=%s,t=%s,ts=%s",//         secondMechineInfo2.getMechineId(),//         secondMechineInfo2.getMechineName(), secondMechineInfo2.getTemperature(),//         secondMechineInfo2.getTimestamp() + "\n");out.collect(new MechineRiskInfo(firstMechineInfo.getMechineId(), (firstMechineInfo.getTemperature()+ secondMechineInfo1.getTemperature() + secondMechineInfo2.getTemperature())/ 3,ctx.timestamp()));}}).keyBy(mechineRiskInfo -> mechineRiskInfo.getMechineId());mechineRiskInfoStream.print("mechineRiskInfoStream:");// 定义模式2,比较风险数据的前后两条,如果是上升的趋势,则报警,并设置报警联系人Pattern<MechineRiskInfo, ?> mechineRiskInfoPattern = Pattern.<MechineRiskInfo>begin("step1").next("step2").within(Time.hours(1));PatternStream<MechineRiskInfo> patternStream2 = CEP.pattern(mechineRiskInfoStream, mechineRiskInfoPattern);// 筛选 警告信息,并设置发送邮箱DataStream<MechineAlertInfo> mechineAlertInfoList = patternStream2.process(new PatternProcessFunction<TestMachineMonitoring.MechineRiskInfo, MechineAlertInfo>() {@Overridepublic void processMatch(Map<String, List<MechineRiskInfo>> match, Context ctx,Collector<MechineAlertInfo> out) throws Exception {MechineRiskInfo mechineRiskInfo1 = match.get("step1").get(0);MechineRiskInfo mechineRiskInfo2 = match.get("step2").get(0);MechineAlertInfo MechineAlertInfo = null;if (mechineRiskInfo1.getAvgTemperature() <= mechineRiskInfo2.getAvgTemperature()) {MechineAlertInfo = new MechineAlertInfo(mechineRiskInfo1.getMechineId(),"alan.chan.chn@163.com",mechineRiskInfo2.getAvgTemperature(), ctx.currentProcessingTime());out.collect(MechineAlertInfo);}}});mechineAlertInfoList.print("mechineAlertInfoList:");// mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, email=alan.chan.chn@163.com, avgTemperature=331.0, timestamp=1705366481553)// mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, email=alan.chan.chn@163.com, avgTemperature=341.0, timestamp=1705366481566)// mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, email=alan.chan.chn@163.com, avgTemperature=351.0, timestamp=1705366481567)env.execute();}public static void main(String[] args) throws Exception {test1();}
}

以上,本文介绍了Flink 的类库CEP的时间处理、可选的参数以及在实际工作中非常有用的三个示例。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

这篇关于59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/639419

相关文章

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}

AI行业应用(不定期更新)

ChatPDF 可以让你上传一个 PDF 文件,然后针对这个 PDF 进行小结和提问。你可以把各种各样你要研究的分析报告交给它,快速获取到想要知道的信息。https://www.chatpdf.com/