Flink之复杂事件处理CEP

2023-12-06 00:44
文章标签 事件处理 flink 复杂 cep

本文主要是介绍Flink之复杂事件处理CEP,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

复杂事件处理CEP

  • Flink CEP
  • 基本使用
    • 添加依赖
    • 定义匹配模式
    • 定义匹配结果
    • 验证
  • 模式Pattern API
    • 单个模式
      • 量词
      • 条件
    • 组合模式
    • 跳过策略
    • 模式组
    • 匹配结果
  • 应用示例
    • 自定义消息事件
    • 自定义Pattern
    • 测试

Flink CEP

Flink的CEP (Complex Event Processing) 是指Flink提供的一种用于处理复杂事件序列的库。复杂事件通常由多个简单事件组成,这些简单事件在特定的时间窗口内以特定的顺序发生。CEP可以用于检测和识别这些复杂事件,并根据预定义的模式进行操作和处理。

Flink的CEP库提供了一个灵活而强大的编程模型,使用户能够指定不同事件之间的关系模式,并定义事件触发的条件。它能够处理基于时间、顺序和其他属性的复杂事件模式,并支持流式处理和实时数据。CEP可以用于构建基于事件的应用程序,例如金融交易监控、网络流量分析、IoT数据处理等。

应用场景

Flink CEP(Complex Event Processing)是针对处理数据流中复杂事件模式的技术,适用于多种实时数据处理场景,其中包括:

金融交易监控:实时监控金融交易数据流,以识别潜在的欺诈行为,例如检测异常的交易序列或者异常的资金流动模式。网络安全分析:对实时网络日志进行分析,以检测网络攻击、异常行为或者安全威胁,例如识别特定攻击模式或异常的网络通信序列。物联网(IoT)数据处理:处理来自传感器和设备的实时数据,以识别设备故障、异常事件或者预测维护需求,例如发现特定的设备状态序列暗示了潜在的问题。市场营销和个性化推荐:分析客户实时行为数据,识别特定的购买模式或者行为序列,以提供个性化的产品推荐或市场营销策略。生产流程监控:监控工业生产线上的传感器和生产数据,以检测生产异常、预测设备故障或者优化生产调度。医疗健康监控:实时监控病人健康数据或医疗设备数据,以检测潜在的健康危机、预测病情变化或者提供实时的健康监控服务。

基本使用

添加依赖

将Flink CEP依赖项添加到pom.xml中

   <dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep</artifactId><version>${flink.version}</version></dependency>

定义匹配模式

DataStream要应用模式匹配的 事件必须实现正确的equals()和hash Code()方法,因为 Flink CEP 使用它们来比较和匹配事件。

    public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 准备事件流DataStream<Tuple2<String, Integer>> inputEventStream = env.fromElements(new Tuple2<>("event", 1),new Tuple2<>("event", 2),new Tuple2<>("event", 3),new Tuple2<>("event", 4),new Tuple2<>("event", 5),new Tuple2<>("event", 6),new Tuple2<>("event", 7),new Tuple2<>("event", 8)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> event, long recordTimestamp) {return event.f1 * 1000;}})).keyBy(event -> event.f0);/*** 定义复杂事件处理模式* 先匹配元素是偶数的事件,然后匹配元素>3的事件,然后继续匹配元素是8的元素*/// 声明并初始化一个模式,用于表示要在事件流中检测的模式。这个模式匹配的是一个包含String和Integer类型元素的元组. begin("start") 来定义模式的起始点Pattern<Tuple2<String, Integer>, ?> pattern = Pattern.<Tuple2<String, Integer>>begin("start")// 对模式的起始点应用条件,条件是一个简单的过滤条件: 事件的第二个元素是偶数。.where(new SimpleCondition<Tuple2<String, Integer>>() {@Overridepublic boolean filter(Tuple2<String, Integer> event) {return event.f1 % 2 == 0;}})// 定义了紧接在起始点后的第一个元素命名为 "middle".next("middle")// 这个元素是一个Tuple2类型的子类型.subtype(Tuple2.class)// 对第二个元素应用了迭代条件,这里使用了一个迭代条件(IterativeCondition),来检查第二个元素是否为奇数.where(new SimpleCondition<Tuple2>() {@Overridepublic boolean filter(Tuple2 event) {// return (Integer) event.f1 > 5;return (Integer) event.f1 > 3;}})// 规定了前面定义的模式必须发生N次.times(2)// 定义了这N次发生必须是连续的.consecutive()// 定义了在之后紧跟的元素命名为 "end",用于表示模式的结束.followedBy("end")// 对模式的结束点应用了一个简单的条件,确保事件的第二个元素等于8.where(SimpleCondition.of((Tuple2<String, Integer> event) -> event.f1 == 8)).within(Time.seconds(5));// 在事件流上应用模式PatternStream<Tuple2<String, Integer>> patternStream = CEP.pattern(inputEventStream.keyBy(event -> event.f0), pattern);// 选择匹配结果并输出// DataStream<String> result = patternStream.select(new MyPatternSelectFunction());DataStream<String> result = patternStream.process(new MyPatternProcessFunction());result.print();// 执行任务env.execute("CEP Example");}

定义匹配结果

 /*** PatternSelectFunction定义匹配结果的处理函数*/public static class MyPatternSelectFunction implements PatternSelectFunction<Tuple2<String, Integer>, String> {@Overridepublic String select(Map<String, List<Tuple2<String, Integer>>> pattern) throws Exception {StringBuilder builder = new StringBuilder();builder.append("找到匹配项: ");pattern.forEach((key, value) -> builder.append(key).append(" => ").append(value).append("; "));return builder.toString();}}/*** PatternProcessFunction定义匹配结果的处理函数*/public static class MyPatternProcessFunction extends PatternProcessFunction<Tuple2<String, Integer>, String> {@Overridepublic void processMatch(Map<String, List<Tuple2<String, Integer>>> pattern, Context context, Collector<String> collector) throws Exception {StringBuilder builder = new StringBuilder();builder.append("找到匹配项: ");pattern.forEach((key, value) -> builder.append(key).append(" => ").append(value).append("; "));collector.collect(builder.toString());}}

验证

1> 找到匹配项: start => [(event,4)]; middle => [(event,5), (event,6)]; end => [(event,8)]; 

模式Pattern API

模式 API允许定义要从输入流中提取的复杂模式序列

每个复杂模式序列由多个简单模式组成,即寻找具有相同属性的单个事件的模式每个模式必须有一个唯一的名称,可以使用该名称来标识匹配的事件模式名称不能包含字符":"

单个模式

复杂规则中的每一个单独的模式定义,就是个体模式。我们既可以定义一个给定事件出现的次数(量词),也可以定义一个条件来决定一个进来的事件是否被接受进入这个模式(条件)。

量词

默认情况下,模式是单例模式,可以使用量词将其转换为循环模式。

API说明
pattern.oneOrMore()模式发生1次或N次
pattern.times(#ofTimes)发生一次或多次的模式
pattern.times(#fromTimes, #toTimes)出现特定次数的模式
pattern.greedy()模式变得贪婪,匹配越多越好
pattern.optional()模式可以不匹配

使用示例:

// 期望出现4次
pattern.times(4);// 期望出现0次或者4次
pattern.times(4).optional();// 期望出现2次、3次或者4次
pattern.times(2, 4);// 期望出现2次、3次或者4次,尽可能多地重复
pattern.times(2, 4).greedy();// 期望出现0次、2次、3次或者4次
pattern.times(2, 4).optional();// 期望出现0次、2次、3次或者4次,尽可能多地重复
pattern.times(2, 4).optional().greedy();// 期望出现1次或者更多次
pattern.oneOrMore();// 期望出现1次或者更多次,尽可能多地重复
pattern.oneOrMore().greedy();// 期望出现0次或者更多次
pattern.oneOrMore().optional();// 期望出现0次或者更多次,尽可能多地重复
pattern.oneOrMore().optional().greedy();// 期望出现2次或者更多次
pattern.timesOrMore(2);// 期望出现2次或者更多次,尽可能多地重复
pattern.timesOrMore(2).greedy();// 期望出现0次、2次或者更多次
pattern.timesOrMore(2).optional()// 期望出现0次、2次或者更多次,尽可能多地重复
pattern.timesOrMore(2).optional().greedy();

条件

对于每个模式,可以指定传入事件必须满足的条件才能被“接受”到模式中

API描述示例说明
pattern.where()定义当前模式的条件。为了匹配模式,事件必须满足条件。多个连续的 where() 子句会导致它们的条件被AND编辑pattern.where(SimpleCondition.of((Tuple2<String, Integer> event) -> event.f1 == 8))匹配f1==8
pattern.or()添加一个与现有条件相结合的新条件OR。仅当事件至少满足其中一个条件时,它才能与模式匹配pattern.where(SimpleCondition.of((Tuple2<String, Integer> event) -> event.f1 ==1)).or(SimpleCondition.of((Tuple2<String, Integer> event) -> event.f1 == 2))匹配f1 == 1 或者 f1==2
pattern.until()指定循环模式的停止条件。如果发生与给定条件匹配的事件,则该模式将不再接受任何事件。仅适用于结合oneOrMore() NOTE:它允许在基于事件的条件下清除相应模式的状态pattern.until(SimpleCondition.of((Tuple2<String, Integer> event) -> event.f1 == 2))匹配1次或多次,直到f1==2

如果名称以foo开头,则接受名为middle的模式的下一个事件,并且如果该模式先前接受的事件的价格之和加上当前事件的价格事件不超过5.0的值

middle.oneOrMore().subtype(SubEvent.class).where(new IterativeCondition<SubEvent>() {@Overridepublic boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {if (!value.getName().startsWith("foo")) {return false;}double sum = value.getPrice();for (Event event : ctx.getEventsForPattern("middle")) {sum += event.getPrice();}return Double.compare(sum, 5.0) < 0;}});

组合模式

把很多单个模式组合起来,就形成了组合模式。Flink CEP支持事件之间如下形式的连续策略:

严格连续性:期望所有匹配事件严格一个接一个地出现,中间没有任何不匹配的事件。宽松连续性:忽略匹配事件之间出现的不匹配事件。非确定性宽松连续性:进一步放松连续性,允许忽略某些匹配事件的其他匹配。

要在连续模式之间应用它们,可以使用:

next():对于严格的
followedBy():对于宽松的
followedByAny():对于非确定性松弛连续性
notNext():如果您不希望某个事件类型直接跟随另一个事件类型
notFollowedBy():如果您不希望某个事件类型介于其他两个事件类型之间

模式序列必须以初始模式开始

Pattern<Event, ?> start = Pattern.<Event>begin("start");Pattern<Event, ?> start = Pattern.<Event>begin(Pattern.<Event>begin("start").where(...).followedBy("middle").where(...)
);
API说明示例
begin(#name)定义起始模式Pattern<Event, ?> start = Pattern.begin(“start”);
begin(#pattern_sequence)定义起始模式Pattern.begin(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));
next(#name)附加新模式。匹配事件必须直接继承前一个匹配事件(严格连续性)Pattern<Event, ?> next = start.next(“middle”)
next(#pattern_sequence)附加新模式。一系列匹配事件必须直接接续前一个匹配事件(严格连续性)start.next(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));
followedBy(#name)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间(宽松的连续性)Pattern<Event, ?> followedBy = start.followedBy(“middle”);
followedBy(#pattern_sequence)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间(宽松的连续性)start.followedBy(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));
followedByAny(#name)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间,并且将为每个替代匹配事件呈现替代匹配(非确定性宽松连续性)Pattern<Event, ?> followedByAny = start.followedByAny(“middle”);
followedByAny(#pattern_sequence)附加新模式。其他事件可以发生在匹配事件和前一个匹配事件之间,并且将为每个替代匹配事件呈现替代匹配(非确定性宽松连续性)start.next(Pattern.begin(“start”).where(…).followedBy(“middle”).where(…));
notNext()附加新的否定模式。匹配(否定)事件必须直接继承前一个匹配事件(严格连续性),才能丢弃部分匹配Pattern<Event, ?> notNext = start.notNext(“not”);
notFollowedBy()附加新的否定模式。即使匹配(负)事件和前一个匹配事件(宽松连续性)之间发生其他事件,部分匹配的事件序列也将被丢弃Pattern<Event, ?> notFollowedBy = start.notFollowedBy(“not”);
within(time)定义事件序列与模式匹配的最大时间间隔。如果未完成的事件序列超过此时间,则将其丢弃pattern.within(Time.seconds(10));

使用示例:

// 严格的连续性模式
Pattern<Event, ?> strict = start.next("middle").where(...);// 宽松的连续性模式
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);// 非确定性的宽松连续性模式
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);// 使用严格连续性的NOT模式
Pattern<Event, ?> strictNot = start.notNext("not").where(...);// 使用宽松连续性的NOT模式
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

跳过策略

对于给定的模式,同一事件可以分配给多个成功的匹配。要控制一个事件将分配多少个匹配项,需要指定跳过策略AfterMatchSkipStrategy

跳跃策略有五种类型

API说明
AfterMatchSkipStrategy.noSkip()创建NO_SKIP跳过策略
AfterMatchSkipStrategy.skipToNext()创建SKIP_TO_NEXT跳过策略
AfterMatchSkipStrategy.skipPastLastEvent()创建SKIP_PAST_LAST_EVENT跳过策略
AfterMatchSkipStrategy.skipToFirst(patternName)使用引用的模式名称patternName创建SKIP_TO_FIRST跳过策略
AfterMatchSkipStrategy.skipToLast(patternName)使用引用的模式名称patternName创建SKIP_TO_LAST跳过策略

注意:

当使用SKIP_TO_FIRST和SKIP_TO_LAST跳过策略时,还应指定有效的PatternName

SkipToFirstStrategy skipToFirstStrategy = AfterMatchSkipStrategy.skipToFirst("patternName");
Pattern.begin("patternName", skipToFirstStrategy);

模式组

将一个模式作为条件嵌套在单个模式里,就是模式组。

Pattern<Event, ?> start = Pattern.begin(
Pattern.begin("start").where(...).followedBy("start_middle").where(...)
);// 严格的连续性模式
Pattern<Event, ?> strict = start.next(
Pattern.begin("next_start").where(...).followedBy("next_middle").where(...)
).times(3);// 宽松的连续性模式
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.begin("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore();// 非确定性的宽松连续性模式
Pattern<Event, ?> nonDetermin = start.followedByAny(
Pattern.begin("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional();

匹配结果

指定要查找的模式序列后,就可以将其应用到输入流以检测潜在的匹配项。

要针对模式序列运行事件流,必须创建一个PatternStream. 给定一个输入流input、一个模式pattern和一个可选的比较器,comparator用于对具有相同时间戳的事件(在 EventTime的情况下或在同一时刻到达)进行排序

可以使用PatternProcessFunction、也可以使用旧式API,例如PatternSelectFunction

1.PatternProcessFunction

PatternProcessFunction有一个processMatch为每个匹配事件序列调用的方法。

PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
    public static class MyPatternProcessFunction extends PatternProcessFunction<Tuple2<String, Integer>, String> {/**** @param pattern Map<String, List<IN>>其中键是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是输入元素的类型)*/@Overridepublic void processMatch(Map<String, List<Tuple2<String, Integer>>> pattern, Context context, Collector<String> collector) throws Exception {StringBuilder builder = new StringBuilder();builder.append("找到匹配项: ");pattern.forEach((key, value) -> builder.append(key).append(" => ").append(value).append("; "));collector.collect(builder.toString());}}

使用:

DataStream<String> result = patternStream.process(new MyPatternProcessFunction());

2.TimedOutPartialMatchHandler

每当模式具有通过within关键字附加的窗口长度时,部分事件序列就有可能被丢弃,因为它们超出了窗口长度。要对超时的部分匹配采取行动,可以使用TimedOutPartialMatchHandler接口。

TimedOutPartialMatchHandler提供了额外的processTimedOutMatch方法,每次超时的部分匹配都会调用该方法。

    public static class MyPatternProcessFunction extends PatternProcessFunction<Tuple2<String, Integer>, String> implements TimedOutPartialMatchHandler<Tuple2<String, Integer>> {/**** @param pattern Map<String, List<IN>>其中键是模式序列中每个模式的名称,值是该模式的所有已接受事件的列表(IN是输入元素的类型)*/@Overridepublic void processMatch(Map<String, List<Tuple2<String, Integer>>> pattern, Context context, Collector<String> collector) throws Exception {StringBuilder builder = new StringBuilder();builder.append("找到匹配项: ");pattern.forEach((key, value) -> builder.append(key).append(" => ").append(value).append("; "));collector.collect(builder.toString());}@Overridepublic void processTimedOutMatch(Map<String, List<Tuple2<String, Integer>>> map, Context context) throws Exception {StringBuilder builder = new StringBuilder();builder.append("处理超时的部分模式: ");map.forEach((key, value) -> builder.append(key).append(" => ").append(value).append("; "));System.out.println(builder.toString());}}
  1. PatternSelectFunction
public static class MyPatternSelectFunction implements PatternSelectFunction<Tuple2<String, Integer>, String> {@Overridepublic String select(Map<String, List<Tuple2<String, Integer>>> pattern) throws Exception {StringBuilder builder = new StringBuilder();builder.append("找到匹配项: ");pattern.forEach((key, value) -> builder.append(key).append(" => ").append(value).append("; "));return builder.toString();}}

使用:

DataStream<String> result = patternStream.select(new MyPatternSelectFunction());

应用示例

模拟查找匹配5秒钟内连续登录失败在3次以上的用户

自定义消息事件

@Data
@AllArgsConstructor
@NoArgsConstructor
public class LoginEvent {/*** 用户id*/private Integer uid;/*** 是否登录成功*/private Boolean success;/*** 时间戳*/private Long timeStamp; 
}

自定义Pattern

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<LoginEvent> streamSource = env.fromElements(new LoginEvent(1, false, 1000L),new LoginEvent(2, true, 2000L),new LoginEvent(3, true, 3000L),new LoginEvent(1, false, 4000L),new LoginEvent(1, false, 5000L),new LoginEvent(4, false, 5000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {@Overridepublic long extractTimestamp(LoginEvent loginEvent, long l) {return loginEvent.getTimeStamp();}})).keyBy(r -> r.getUid());Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent loginEvent) throws Exception {return !loginEvent.getSuccess();}}).next("second").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent loginEvent) throws Exception {return !loginEvent.getSuccess();}}).next("third").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent loginEvent) throws Exception {return !loginEvent.getSuccess();}}).within(Time.seconds(5));PatternStream<LoginEvent> patternedStream = CEP.pattern(streamSource, pattern);patternedStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {LoginEvent first = map.get("first").iterator().next();LoginEvent second = map.get("second").iterator().next();LoginEvent third = map.get("third").iterator().next();return String.format("uid:%d 连续3次登录失败,登录时间: first:%d, second:%d, third:%d", first.getUid(), first.getTimeStamp(), second.getTimeStamp(), third.getTimeStamp());}}).print();env.execute();}

测试

uid:1 5秒钟内连续3次登录失败,登录时间: first:1000, second:4000, third:5000

这篇关于Flink之复杂事件处理CEP的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/459773

相关文章

利用matlab bar函数绘制较为复杂的柱状图,并在图中进行适当标注

示例代码和结果如下:小疑问:如何自动选择合适的坐标位置对柱状图的数值大小进行标注?😂 clear; close all;x = 1:3;aa=[28.6321521955954 26.2453660695847 21.69102348512086.93747104431360 6.25442246899816 3.342835958564245.51365061796319 4.87

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

最近心情有点复杂:论心态

一月一次的彷徨又占据了整个身心;彷徨源至不自信;而不自信则是感觉自己的价值没有很好的实现亦或者说是自己不认可自己的目前的生活和状态吧。 我始终相信一句话:任何人的生活形态完全是由自己决定的;外在的总归不能直达一个人的内心深处。所以少年 为了自己想要的生活 多坚持努力吧、不为别人只为自己心中的那一丝执着。 由此我看到了一个故事: 一个心情烦躁的人去拜访禅师。他问禅师:我这辈子就这么注定了吗?您

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

复杂SQL集合(不断收集中)

1.一道SQL语句面试题,关于group by 表内容: 2005-05-09 胜 2005-05-09 胜 2005-05-09 负 2005-05-09 负 2005-05-10 胜 2005-05-10 负 2005-05-10 负 如果要生成下列结果, 该如何写sql语句?             胜 负 2005-05-09 2 2 2005-05-10 1 2 --------

Android触摸事件处理机制之requestDisallowInterceptTouchEvent

一、触摸事件传递的规则 当手指触摸到屏幕时,系统就会调用相应的View的onTouchEvent,并传入一系列的action。当有多个层级的View时,在父层级允许的情况下,这个action会一直向下传递直到遇到最深层的View。所以touch事件最先调用的是最底层View的onTouchEvent,如果View的onTouchEvent接收到某个touchaction并做了相应处理,最后有两种

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的

java AWT事件处理

事件处理的过程中,主要涉及3类对象: Event Source(事件源):事件发生的声所,如按钮,窗口,菜单等组件。 Event(事件):事件封装了GUI组件上发生的特定事情(通常是一次用打操作)。 Event Listener(事件监听器):负责监听事件源所发生的事件,并对各种事件做出响应处理。 实现AWT事件处理机制的步骤如下: 1.实现事件监听器类,该监听器类是一个特殊的java类

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Apache-Flink深度解析-State

来源:https://dwz.cn/xrMCqbk5 Flink系列精华文章合集入门篇: Flink入门Flink DataSet&DataSteam APIFlink集群部署Flink重启策略Flink分布式缓存Flink重启策略Flink中的TimeFlink中的窗口Flink的时间戳和水印Flink广播变量Flink-Kafka-connetorFlink-Table&SQLFlink