flink-cep实践

2023-11-24 07:01
文章标签 实践 flink cep

本文主要是介绍flink-cep实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

package com.techwolf.hubble;import com.alibaba.fastjson.JSONObject;
import com.techwolf.hubble.constant.Config;
import com.techwolf.hubble.model.TestEvent;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;/*** Hello world!**/
public class App {public static void main(String[] args) throws Exception{//初始化环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//定义时间戳提取器作为输入流分配时间戳和水位线WatermarkStrategy<TestEvent> watermarkStrategy=WatermarkStrategy.<TestEvent>forMonotonousTimestamps().withTimestampAssigner(new EventTimeAssignerSupplier());DataStream<TestEvent> inputDataSteam=env.fromElements(new TestEvent("1","A",System.currentTimeMillis()-100*1000,"1"),new TestEvent("1","A",System.currentTimeMillis()-85*1000,"2"),new TestEvent("1","A",System.currentTimeMillis()-80*1000,"3"),new TestEvent("1","A",System.currentTimeMillis()-75*1000,"4"),new TestEvent("1","A",System.currentTimeMillis()-60*1000,"5"),new TestEvent("1","A",System.currentTimeMillis()-55*1000,"6"),new TestEvent("1","A",System.currentTimeMillis()-40*1000,"7"),new TestEvent("1","A",System.currentTimeMillis()-35*1000,"8"),new TestEvent("1","A",System.currentTimeMillis()-20*1000,"9"),new TestEvent("1","A",System.currentTimeMillis()-10*1000,"10"),new TestEvent("1","B",System.currentTimeMillis()-5*1000,"11")).assignTimestampsAndWatermarks(watermarkStrategy);Pattern<TestEvent,TestEvent> pattern=Pattern.<TestEvent>begin("begin").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("A");}}).followedBy("end").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("B");}}).within(Time.seconds(10));PatternStream<TestEvent> patternStream=CEP.pattern(inputDataSteam.keyBy(TestEvent::getId),pattern);OutputTag<TestEvent> timeOutTag=new OutputTag<TestEvent>("timeOutTag"){};//处理匹配结果SingleOutputStreamOperator<TestEvent> twentySingleOutputStream=patternStream.flatSelect(timeOutTag,new EventTimeOut(),new FlatSelect()).uid("match_twenty_minutes_pattern");DataStream<String> result=twentySingleOutputStream.getSideOutput(timeOutTag).map(new MapFunction<TestEvent, String>() {@Overridepublic String map(TestEvent testEvent) throws Exception {return JSONObject.toJSONString(testEvent);}});result.print();env.execute(Config.JOB_NAME);}public static class EventTimeOut implements PatternFlatTimeoutFunction<TestEvent,TestEvent> {private static final long serialVersionUID = -2471077777598713906L;@Overridepublic void timeout(Map<String, List<TestEvent>> map, long l, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class FlatSelect implements PatternFlatSelectFunction<TestEvent,TestEvent> {private static final long serialVersionUID = 1753544074226581611L;@Overridepublic void flatSelect(Map<String, List<TestEvent>> map, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class EventTimeAssignerSupplier implements TimestampAssignerSupplier<TestEvent> {private static final long serialVersionUID = -9040340771307752904L;@Overridepublic TimestampAssigner<TestEvent> createTimestampAssigner(Context context) {return new EventTimeAssigner();}}public static class EventTimeAssigner implements TimestampAssigner<TestEvent> {@Overridepublic long extractTimestamp(TestEvent event, long l) {return event.getEventTime();}}
}

这篇关于flink-cep实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/421456

相关文章

Spring WebFlux 与 WebClient 使用指南及最佳实践

《SpringWebFlux与WebClient使用指南及最佳实践》WebClient是SpringWebFlux模块提供的非阻塞、响应式HTTP客户端,基于ProjectReactor实现,... 目录Spring WebFlux 与 WebClient 使用指南1. WebClient 概述2. 核心依

MyBatis-Plus 中 nested() 与 and() 方法详解(最佳实践场景)

《MyBatis-Plus中nested()与and()方法详解(最佳实践场景)》在MyBatis-Plus的条件构造器中,nested()和and()都是用于构建复杂查询条件的关键方法,但... 目录MyBATis-Plus 中nested()与and()方法详解一、核心区别对比二、方法详解1.and()

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

Java中的雪花算法Snowflake解析与实践技巧

《Java中的雪花算法Snowflake解析与实践技巧》本文解析了雪花算法的原理、Java实现及生产实践,涵盖ID结构、位运算技巧、时钟回拨处理、WorkerId分配等关键点,并探讨了百度UidGen... 目录一、雪花算法核心原理1.1 算法起源1.2 ID结构详解1.3 核心特性二、Java实现解析2.

MySQL 中 ROW_NUMBER() 函数最佳实践

《MySQL中ROW_NUMBER()函数最佳实践》MySQL中ROW_NUMBER()函数,作为窗口函数为每行分配唯一连续序号,区别于RANK()和DENSE_RANK(),特别适合分页、去重... 目录mysql 中 ROW_NUMBER() 函数详解一、基础语法二、核心特点三、典型应用场景1. 数据分

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实

MySQL 用户创建与授权最佳实践

《MySQL用户创建与授权最佳实践》在MySQL中,用户管理和权限控制是数据库安全的重要组成部分,下面详细介绍如何在MySQL中创建用户并授予适当的权限,感兴趣的朋友跟随小编一起看看吧... 目录mysql 用户创建与授权详解一、MySQL用户管理基础1. 用户账户组成2. 查看现有用户二、创建用户1. 基

Spring Boot 实现 IP 限流的原理、实践与利弊解析

《SpringBoot实现IP限流的原理、实践与利弊解析》在SpringBoot中实现IP限流是一种简单而有效的方式来保障系统的稳定性和可用性,本文给大家介绍SpringBoot实现IP限... 目录一、引言二、IP 限流原理2.1 令牌桶算法2.2 漏桶算法三、使用场景3.1 防止恶意攻击3.2 控制资源

springboot项目中整合高德地图的实践

《springboot项目中整合高德地图的实践》:本文主要介绍springboot项目中整合高德地图的实践,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一:高德开放平台的使用二:创建数据库(我是用的是mysql)三:Springboot所需的依赖(根据你的需求再