flink-cep实践

2023-11-24 07:01
文章标签 实践 flink cep

本文主要是介绍flink-cep实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

package com.techwolf.hubble;import com.alibaba.fastjson.JSONObject;
import com.techwolf.hubble.constant.Config;
import com.techwolf.hubble.model.TestEvent;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;/*** Hello world!**/
public class App {public static void main(String[] args) throws Exception{//初始化环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//定义时间戳提取器作为输入流分配时间戳和水位线WatermarkStrategy<TestEvent> watermarkStrategy=WatermarkStrategy.<TestEvent>forMonotonousTimestamps().withTimestampAssigner(new EventTimeAssignerSupplier());DataStream<TestEvent> inputDataSteam=env.fromElements(new TestEvent("1","A",System.currentTimeMillis()-100*1000,"1"),new TestEvent("1","A",System.currentTimeMillis()-85*1000,"2"),new TestEvent("1","A",System.currentTimeMillis()-80*1000,"3"),new TestEvent("1","A",System.currentTimeMillis()-75*1000,"4"),new TestEvent("1","A",System.currentTimeMillis()-60*1000,"5"),new TestEvent("1","A",System.currentTimeMillis()-55*1000,"6"),new TestEvent("1","A",System.currentTimeMillis()-40*1000,"7"),new TestEvent("1","A",System.currentTimeMillis()-35*1000,"8"),new TestEvent("1","A",System.currentTimeMillis()-20*1000,"9"),new TestEvent("1","A",System.currentTimeMillis()-10*1000,"10"),new TestEvent("1","B",System.currentTimeMillis()-5*1000,"11")).assignTimestampsAndWatermarks(watermarkStrategy);Pattern<TestEvent,TestEvent> pattern=Pattern.<TestEvent>begin("begin").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("A");}}).followedBy("end").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("B");}}).within(Time.seconds(10));PatternStream<TestEvent> patternStream=CEP.pattern(inputDataSteam.keyBy(TestEvent::getId),pattern);OutputTag<TestEvent> timeOutTag=new OutputTag<TestEvent>("timeOutTag"){};//处理匹配结果SingleOutputStreamOperator<TestEvent> twentySingleOutputStream=patternStream.flatSelect(timeOutTag,new EventTimeOut(),new FlatSelect()).uid("match_twenty_minutes_pattern");DataStream<String> result=twentySingleOutputStream.getSideOutput(timeOutTag).map(new MapFunction<TestEvent, String>() {@Overridepublic String map(TestEvent testEvent) throws Exception {return JSONObject.toJSONString(testEvent);}});result.print();env.execute(Config.JOB_NAME);}public static class EventTimeOut implements PatternFlatTimeoutFunction<TestEvent,TestEvent> {private static final long serialVersionUID = -2471077777598713906L;@Overridepublic void timeout(Map<String, List<TestEvent>> map, long l, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class FlatSelect implements PatternFlatSelectFunction<TestEvent,TestEvent> {private static final long serialVersionUID = 1753544074226581611L;@Overridepublic void flatSelect(Map<String, List<TestEvent>> map, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class EventTimeAssignerSupplier implements TimestampAssignerSupplier<TestEvent> {private static final long serialVersionUID = -9040340771307752904L;@Overridepublic TimestampAssigner<TestEvent> createTimestampAssigner(Context context) {return new EventTimeAssigner();}}public static class EventTimeAssigner implements TimestampAssigner<TestEvent> {@Overridepublic long extractTimestamp(TestEvent event, long l) {return event.getEventTime();}}
}

这篇关于flink-cep实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/421456

相关文章

SpringBoot简单整合ElasticSearch实践

《SpringBoot简单整合ElasticSearch实践》Elasticsearch支持结构化和非结构化数据检索,通过索引创建和倒排索引文档,提高搜索效率,它基于Lucene封装,分为索引库、类型... 目录一:ElasticSearch支持对结构化和非结构化的数据进行检索二:ES的核心概念Index:

Python数据验证神器Pydantic库的使用和实践中的避坑指南

《Python数据验证神器Pydantic库的使用和实践中的避坑指南》Pydantic是一个用于数据验证和设置的库,可以显著简化API接口开发,文章通过一个实际案例,展示了Pydantic如何在生产环... 目录1️⃣ 崩溃时刻:当你的API接口又双叒崩了!2️⃣ 神兵天降:3行代码解决验证难题3️⃣ 深度

C++ move 的作用详解及陷阱最佳实践

《C++move的作用详解及陷阱最佳实践》文章详细介绍了C++中的`std::move`函数的作用,包括为什么需要它、它的本质、典型使用场景、以及一些常见陷阱和最佳实践,感兴趣的朋友跟随小编一起看... 目录C++ move 的作用详解一、一句话总结二、为什么需要 move?C++98/03 的痛点⚡C++

MySQL存储过程实践(in、out、inout)

《MySQL存储过程实践(in、out、inout)》文章介绍了数据库中的存储过程,包括其定义、优缺点、性能调校与撰写,以及创建和调用方法,还详细说明了存储过程的参数类型,包括IN、OUT和INOUT... 目录简述存储过程存储过程的优缺点优点缺点存储过程的创建和调用mysql 存储过程中的关键语法案例存储

Java 的ArrayList集合底层实现与最佳实践

《Java的ArrayList集合底层实现与最佳实践》本文主要介绍了Java的ArrayList集合类的核心概念、底层实现、关键成员变量、初始化机制、容量演变、扩容机制、性能分析、核心方法源码解析、... 目录1. 核心概念与底层实现1.1 ArrayList 的本质1.1.1 底层数据结构JDK 1.7

JDK21对虚拟线程的几种用法实践指南

《JDK21对虚拟线程的几种用法实践指南》虚拟线程是Java中的一种轻量级线程,由JVM管理,特别适合于I/O密集型任务,:本文主要介绍JDK21对虚拟线程的几种用法,文中通过代码介绍的非常详细,... 目录一、参考官方文档二、什么是虚拟线程三、几种用法1、Thread.ofVirtual().start(

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

springboot依靠security实现digest认证的实践

《springboot依靠security实现digest认证的实践》HTTP摘要认证通过加密参数(如nonce、response)验证身份,避免明文传输,但存在密码存储风险,相比基本认证更安全,却因... 目录概述参数Demopom.XML依赖Digest1Application.JavaMyPasswo

分析 Java Stream 的 peek使用实践与副作用处理方案

《分析JavaStream的peek使用实践与副作用处理方案》StreamAPI的peek操作是中间操作,用于观察元素但不终止流,其副作用风险包括线程安全、顺序混乱及性能问题,合理使用场景有限... 目录一、peek 操作的本质:有状态的中间操作二、副作用的定义与风险场景1. 并行流下的线程安全问题2. 顺

Java 结构化并发Structured Concurrency实践举例

《Java结构化并发StructuredConcurrency实践举例》Java21结构化并发通过作用域和任务句柄统一管理并发生命周期,解决线程泄漏与任务追踪问题,提升代码安全性和可观测性,其核心... 目录一、结构化并发的核心概念与设计目标二、结构化并发的核心组件(一)作用域(Scopes)(二)任务句柄