flink-cep实践

2023-11-24 07:01
文章标签 实践 flink cep

本文主要是介绍flink-cep实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

package com.techwolf.hubble;import com.alibaba.fastjson.JSONObject;
import com.techwolf.hubble.constant.Config;
import com.techwolf.hubble.model.TestEvent;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;/*** Hello world!**/
public class App {public static void main(String[] args) throws Exception{//初始化环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//定义时间戳提取器作为输入流分配时间戳和水位线WatermarkStrategy<TestEvent> watermarkStrategy=WatermarkStrategy.<TestEvent>forMonotonousTimestamps().withTimestampAssigner(new EventTimeAssignerSupplier());DataStream<TestEvent> inputDataSteam=env.fromElements(new TestEvent("1","A",System.currentTimeMillis()-100*1000,"1"),new TestEvent("1","A",System.currentTimeMillis()-85*1000,"2"),new TestEvent("1","A",System.currentTimeMillis()-80*1000,"3"),new TestEvent("1","A",System.currentTimeMillis()-75*1000,"4"),new TestEvent("1","A",System.currentTimeMillis()-60*1000,"5"),new TestEvent("1","A",System.currentTimeMillis()-55*1000,"6"),new TestEvent("1","A",System.currentTimeMillis()-40*1000,"7"),new TestEvent("1","A",System.currentTimeMillis()-35*1000,"8"),new TestEvent("1","A",System.currentTimeMillis()-20*1000,"9"),new TestEvent("1","A",System.currentTimeMillis()-10*1000,"10"),new TestEvent("1","B",System.currentTimeMillis()-5*1000,"11")).assignTimestampsAndWatermarks(watermarkStrategy);Pattern<TestEvent,TestEvent> pattern=Pattern.<TestEvent>begin("begin").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("A");}}).followedBy("end").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("B");}}).within(Time.seconds(10));PatternStream<TestEvent> patternStream=CEP.pattern(inputDataSteam.keyBy(TestEvent::getId),pattern);OutputTag<TestEvent> timeOutTag=new OutputTag<TestEvent>("timeOutTag"){};//处理匹配结果SingleOutputStreamOperator<TestEvent> twentySingleOutputStream=patternStream.flatSelect(timeOutTag,new EventTimeOut(),new FlatSelect()).uid("match_twenty_minutes_pattern");DataStream<String> result=twentySingleOutputStream.getSideOutput(timeOutTag).map(new MapFunction<TestEvent, String>() {@Overridepublic String map(TestEvent testEvent) throws Exception {return JSONObject.toJSONString(testEvent);}});result.print();env.execute(Config.JOB_NAME);}public static class EventTimeOut implements PatternFlatTimeoutFunction<TestEvent,TestEvent> {private static final long serialVersionUID = -2471077777598713906L;@Overridepublic void timeout(Map<String, List<TestEvent>> map, long l, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class FlatSelect implements PatternFlatSelectFunction<TestEvent,TestEvent> {private static final long serialVersionUID = 1753544074226581611L;@Overridepublic void flatSelect(Map<String, List<TestEvent>> map, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class EventTimeAssignerSupplier implements TimestampAssignerSupplier<TestEvent> {private static final long serialVersionUID = -9040340771307752904L;@Overridepublic TimestampAssigner<TestEvent> createTimestampAssigner(Context context) {return new EventTimeAssigner();}}public static class EventTimeAssigner implements TimestampAssigner<TestEvent> {@Overridepublic long extractTimestamp(TestEvent event, long l) {return event.getEventTime();}}
}

这篇关于flink-cep实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/421456

相关文章

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

golang内存对齐的项目实践

《golang内存对齐的项目实践》本文主要介绍了golang内存对齐的项目实践,内存对齐不仅有助于提高内存访问效率,还确保了与硬件接口的兼容性,是Go语言编程中不可忽视的重要优化手段,下面就来介绍一下... 目录一、结构体中的字段顺序与内存对齐二、内存对齐的原理与规则三、调整结构体字段顺序优化内存对齐四、内

C++实现封装的顺序表的操作与实践

《C++实现封装的顺序表的操作与实践》在程序设计中,顺序表是一种常见的线性数据结构,通常用于存储具有固定顺序的元素,与链表不同,顺序表中的元素是连续存储的,因此访问速度较快,但插入和删除操作的效率可能... 目录一、顺序表的基本概念二、顺序表类的设计1. 顺序表类的成员变量2. 构造函数和析构函数三、顺序表

python实现简易SSL的项目实践

《python实现简易SSL的项目实践》本文主要介绍了python实现简易SSL的项目实践,包括CA.py、server.py和client.py三个模块,文中通过示例代码介绍的非常详细,对大家的学习... 目录运行环境运行前准备程序实现与流程说明运行截图代码CA.pyclient.pyserver.py参

使用C++实现单链表的操作与实践

《使用C++实现单链表的操作与实践》在程序设计中,链表是一种常见的数据结构,特别是在动态数据管理、频繁插入和删除元素的场景中,链表相比于数组,具有更高的灵活性和高效性,尤其是在需要频繁修改数据结构的应... 目录一、单链表的基本概念二、单链表类的设计1. 节点的定义2. 链表的类定义三、单链表的操作实现四、

Spring Boot统一异常拦截实践指南(最新推荐)

《SpringBoot统一异常拦截实践指南(最新推荐)》本文介绍了SpringBoot中统一异常处理的重要性及实现方案,包括使用`@ControllerAdvice`和`@ExceptionHand... 目录Spring Boot统一异常拦截实践指南一、为什么需要统一异常处理二、核心实现方案1. 基础组件

SpringBoot项目中Maven剔除无用Jar引用的最佳实践

《SpringBoot项目中Maven剔除无用Jar引用的最佳实践》在SpringBoot项目开发中,Maven是最常用的构建工具之一,通过Maven,我们可以轻松地管理项目所需的依赖,而,... 目录1、引言2、Maven 依赖管理的基础概念2.1 什么是 Maven 依赖2.2 Maven 的依赖传递机

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

在C#中获取端口号与系统信息的高效实践

《在C#中获取端口号与系统信息的高效实践》在现代软件开发中,尤其是系统管理、运维、监控和性能优化等场景中,了解计算机硬件和网络的状态至关重要,C#作为一种广泛应用的编程语言,提供了丰富的API来帮助开... 目录引言1. 获取端口号信息1.1 获取活动的 TCP 和 UDP 连接说明:应用场景:2. 获取硬

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J