flink-cep实践

2023-11-24 07:01
文章标签 实践 flink cep

本文主要是介绍flink-cep实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

package com.techwolf.hubble;import com.alibaba.fastjson.JSONObject;
import com.techwolf.hubble.constant.Config;
import com.techwolf.hubble.model.TestEvent;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.util.List;
import java.util.Map;/*** Hello world!**/
public class App {public static void main(String[] args) throws Exception{//初始化环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//定义时间戳提取器作为输入流分配时间戳和水位线WatermarkStrategy<TestEvent> watermarkStrategy=WatermarkStrategy.<TestEvent>forMonotonousTimestamps().withTimestampAssigner(new EventTimeAssignerSupplier());DataStream<TestEvent> inputDataSteam=env.fromElements(new TestEvent("1","A",System.currentTimeMillis()-100*1000,"1"),new TestEvent("1","A",System.currentTimeMillis()-85*1000,"2"),new TestEvent("1","A",System.currentTimeMillis()-80*1000,"3"),new TestEvent("1","A",System.currentTimeMillis()-75*1000,"4"),new TestEvent("1","A",System.currentTimeMillis()-60*1000,"5"),new TestEvent("1","A",System.currentTimeMillis()-55*1000,"6"),new TestEvent("1","A",System.currentTimeMillis()-40*1000,"7"),new TestEvent("1","A",System.currentTimeMillis()-35*1000,"8"),new TestEvent("1","A",System.currentTimeMillis()-20*1000,"9"),new TestEvent("1","A",System.currentTimeMillis()-10*1000,"10"),new TestEvent("1","B",System.currentTimeMillis()-5*1000,"11")).assignTimestampsAndWatermarks(watermarkStrategy);Pattern<TestEvent,TestEvent> pattern=Pattern.<TestEvent>begin("begin").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("A");}}).followedBy("end").where(new SimpleCondition<TestEvent>() {@Overridepublic boolean filter(TestEvent testEvent) throws Exception {return testEvent.getAction().equals("B");}}).within(Time.seconds(10));PatternStream<TestEvent> patternStream=CEP.pattern(inputDataSteam.keyBy(TestEvent::getId),pattern);OutputTag<TestEvent> timeOutTag=new OutputTag<TestEvent>("timeOutTag"){};//处理匹配结果SingleOutputStreamOperator<TestEvent> twentySingleOutputStream=patternStream.flatSelect(timeOutTag,new EventTimeOut(),new FlatSelect()).uid("match_twenty_minutes_pattern");DataStream<String> result=twentySingleOutputStream.getSideOutput(timeOutTag).map(new MapFunction<TestEvent, String>() {@Overridepublic String map(TestEvent testEvent) throws Exception {return JSONObject.toJSONString(testEvent);}});result.print();env.execute(Config.JOB_NAME);}public static class EventTimeOut implements PatternFlatTimeoutFunction<TestEvent,TestEvent> {private static final long serialVersionUID = -2471077777598713906L;@Overridepublic void timeout(Map<String, List<TestEvent>> map, long l, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class FlatSelect implements PatternFlatSelectFunction<TestEvent,TestEvent> {private static final long serialVersionUID = 1753544074226581611L;@Overridepublic void flatSelect(Map<String, List<TestEvent>> map, Collector<TestEvent> collector) throws Exception {if (null != map.get("begin")) {for (TestEvent event : map.get("begin")) {collector.collect(event);}}}}public static class EventTimeAssignerSupplier implements TimestampAssignerSupplier<TestEvent> {private static final long serialVersionUID = -9040340771307752904L;@Overridepublic TimestampAssigner<TestEvent> createTimestampAssigner(Context context) {return new EventTimeAssigner();}}public static class EventTimeAssigner implements TimestampAssigner<TestEvent> {@Overridepublic long extractTimestamp(TestEvent event, long l) {return event.getEventTime();}}
}

这篇关于flink-cep实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/421456

相关文章

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

Prometheus与Grafana在DevOps中的应用与最佳实践

Prometheus 与 Grafana 在 DevOps 中的应用与最佳实践 随着 DevOps 文化和实践的普及,监控和可视化工具已成为 DevOps 工具链中不可或缺的部分。Prometheus 和 Grafana 是其中最受欢迎的开源监控解决方案之一,它们的结合能够为系统和应用程序提供全面的监控、告警和可视化展示。本篇文章将详细探讨 Prometheus 和 Grafana 在 DevO

springboot整合swagger2之最佳实践

来源:https://blog.lqdev.cn/2018/07/21/springboot/chapter-ten/ Swagger是一款RESTful接口的文档在线自动生成、功能测试功能框架。 一个规范和完整的框架,用于生成、描述、调用和可视化RESTful风格的Web服务,加上swagger-ui,可以有很好的呈现。 SpringBoot集成 pom <!--swagge

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

vue2实践:el-table实现由用户自己控制行数的动态表格

需求 项目中需要提供一个动态表单,如图: 当我点击添加时,便添加一行;点击右边的删除时,便删除这一行。 至少要有一行数据,但是没有上限。 思路 这种每一行的数据固定,但是不定行数的,很容易想到使用el-table来实现,它可以循环读取:data所绑定的数组,来生成行数据,不同的是: 1、table里面的每一个cell,需要放置一个input来支持用户编辑。 2、最后一列放置两个b

【HarmonyOS】-TaskPool和Worker的对比实践

ArkTS提供了TaskPool与Worker两种多线程并发方案,下面我们将从其工作原理、使用效果对比两种方案的差异,进而选择适用于ArkTS图片编辑场景的并发方案。 TaskPool与Worker工作原理 TaskPool与Worker两种多线程并发能力均是基于 Actor并发模型实现的。Worker主、子线程通过收发消息进行通信;TaskPool基于Worker做了更多场景化的功能封装,例

vue2实践:第一个非正规的自定义组件-动态表单对话框

前言 vue一个很重要的概念就是组件,作为一个没有经历过前几代前端开发的我来说,不太能理解它所带来的“进步”,但是,将它与后端c++、java类比,我感觉,组件就像是这些语言中的类和对象的概念,通过封装好的组件(类),可以通过挂载的方式,非常方便的调用其提供的功能,而不必重新写一遍实现逻辑。 我们常用的element UI就是由饿了么所提供的组件库,但是在项目开发中,我们可能还需要额外地定义一

《C++中的移动构造函数与移动赋值运算符:解锁高效编程的最佳实践》

在 C++的编程世界中,移动构造函数和移动赋值运算符是提升程序性能和效率的重要工具。理解并正确运用它们,可以让我们的代码更加高效、简洁和优雅。 一、引言 随着现代软件系统的日益复杂和对性能要求的不断提高,C++程序员需要不断探索新的技术和方法来优化代码。移动构造函数和移动赋值运算符的出现,为解决资源管理和性能优化问题提供了有力的手段。它们允许我们在不进行不必要的复制操作的情况下,高效地转移资源

Vue3+elementplus实现图片上传下载(最强实践)

图片上传子组件: 实现照片的上传,预览,以及转成以逗号隔开的图片地址,即时监听,并发送消息到父组件。 <!-- ImageUploader.vue --> <template><div><el-upload class="avatar-uploader" :http-request="customUpload" :before-upload="beforeUpload":show-fil