flink 加载外部cep规则

2023-10-19 14:32
文章标签 加载 规则 flink 外部 cep

本文主要是介绍flink 加载外部cep规则,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

主程序代码:

package cepengine.App;import cepengine.domain.Event;
import cepengine.domain.InputEventSchema;
import cepengine.domain.OutputEventSchema;
import groovy.lang.GroovyClassLoader;
import groovy.lang.GroovyObject;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class FlinkKafkaSimpleSchema {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 设置检查点*/env.enableCheckpointing(5000);ObjectMapper mapper = new ObjectMapper();try {/*** 加载外部规则*/GroovyClassLoader loader = new GroovyClassLoader();File file = new File("./src/main/java/cepengine/scripts/sRule.groovy");Class aClass = loader.parseClass(file);GroovyObject groovyObject = (GroovyObject) aClass.newInstance();Pattern<Event, Event> pattern = (Pattern<Event, Event>) groovyObject.invokeMethod("run", null);/** 初始化 Consumer 配置 */Properties consumerConfig = new Properties();consumerConfig.setProperty("bootstrap.servers", "localhost:9092");consumerConfig.setProperty("group.id", "risk_control");/** 初始化 Kafka Consumer */FlinkKafkaConsumer<Event> flinkKafkaConsumer =new FlinkKafkaConsumer<Event>("flink_kafka_poc_input",new InputEventSchema(),consumerConfig);/** 配置offset */flinkKafkaConsumer.setStartFromEarliest();/** 将 Kafka Consumer 加入到流处理 */DataStream<Event> stream = env.addSource(flinkKafkaConsumer);/*** 匹配规则*/PatternStream<Event> patternStream = CEP.pattern(stream, pattern);DataStream<Event> outstream = patternStream.select(new PatternSelectFunction<Event, Event>() {@Overridepublic Event select(Map<String, List<Event>> map) throws Exception {List<Event> next = map.get("next");return new Event(next.get(0).getKey(), next.get(0).getValue(), next.get(0).getTopic(),next.get(0).getPartition(),next.get(0).getOffset());}});outstream.print("next");/** 初始化 Producer 配置 */Properties producerConfig = new Properties();producerConfig.setProperty("bootstrap.servers", "localhost:9092");producerConfig.setProperty("max.request.size", "102428800");/** 初始化 Kafka Producer */FlinkKafkaProducer<Event> myProducer = new FlinkKafkaProducer<Event>("flink_kafka_poc_output",new OutputEventSchema(),producerConfig);/** 将 Kafka Producer 加入到流处理 */outstream.addSink(myProducer);/** 执行 */env.execute();} catch (Exception e) {}}}

 

Groovy脚本:

package cepengine.scriptsimport cepengine.domain.Event
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.streaming.api.windowing.time.Timeclass sRule implements Serializable{def run() {Pattern<Event, ?> pattern =Pattern.<String>begin("begin").where(new SimpleCondition<Event>() {@Overrideboolean filter(Event event) throws Exception {return  event.getValue().contains("失败")}}).next("next").where(new SimpleCondition<Event>() {@Overrideboolean filter(Event event) throws Exception {return event.getValue().contains("失败")}})//
//                .next("next2")
//                .where(new SimpleCondition<LoginEvent>() {
//            @Override
//            boolean filter(LoginEvent loginEvent) throws Exception {
//                return loginEvent.getType().equals("success")
//            }
//        }).within(Time.seconds(5))return pattern}
}

kafka consumer schema:

package cepengine.domain;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;import java.io.IOException;
import java.nio.charset.StandardCharsets;import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;public class InputEventSchema implements KeyedDeserializationSchema<Event> {@Overridepublic Event deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {String msg = new String(message, StandardCharsets.UTF_8);String key = null;if (messageKey != null) {key = new String(messageKey, StandardCharsets.UTF_8);}return new Event( key, msg,topic, partition, offset);}@Overridepublic boolean isEndOfStream(Event nextElement) {return false;}@Overridepublic TypeInformation<Event> getProducedType() {return getForClass(Event.class);}}

kafka producer schema:

package cepengine.domain;import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;public class OutputEventSchema implements KeyedSerializationSchema<Event> {@Overridepublic byte[] serializeKey(Event event) {return event.getKey().getBytes();}@Overridepublic byte[] serializeValue(Event event) {return event.getValue().getBytes();}@Overridepublic String getTargetTopic(Event event) {return null;}}

Event类:

package cepengine.domain;public class Event {private String topic;private int partition;private long offset;private String value;private  String key;@Overridepublic String toString() {return "Event{" +"topic='" + topic + '\'' +", partition=" + partition +", offset=" + offset +", value='" + value + '\'' +", key='" + key + '\'' +'}';}public Event() {}public Event(String key, String value, String topic, int partition, long offset) {this.key = key;this.value = value;this.topic = topic;this.partition = partition;this.offset = offset;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public int getPartition() {return partition;}public void setPartition(int partition) {this.partition = partition;}public long getOffset() {return offset;}public void setOffset(long offset) {this.offset = offset;}public String getValue() {return value;}public void setValue(String Value) {this.value = value;}}

目前只是基于文件加载规则,后续改造为基于数据库加载规则。

这篇关于flink 加载外部cep规则的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/240429

相关文章

关于Spring @Bean 相同加载顺序不同结果不同的问题记录

《关于Spring@Bean相同加载顺序不同结果不同的问题记录》本文主要探讨了在Spring5.1.3.RELEASE版本下,当有两个全注解类定义相同类型的Bean时,由于加载顺序不同,最终生成的... 目录问题说明测试输出1测试输出2@Bean注解的BeanDefiChina编程nition加入时机总结问题说明

关于Gateway路由匹配规则解读

《关于Gateway路由匹配规则解读》本文详细介绍了SpringCloudGateway的路由匹配规则,包括基本概念、常用属性、实际应用以及注意事项,路由匹配规则决定了请求如何被转发到目标服务,是Ga... 目录Gateway路由匹配规则一、基本概念二、常用属性三、实际应用四、注意事项总结Gateway路由

Redis 多规则限流和防重复提交方案实现小结

《Redis多规则限流和防重复提交方案实现小结》本文主要介绍了Redis多规则限流和防重复提交方案实现小结,包括使用String结构和Zset结构来记录用户IP的访问次数,具有一定的参考价值,感兴趣... 目录一:使用 String 结构记录固定时间段内某用户 IP 访问某接口的次数二:使用 Zset 进行

SpringBoot项目启动后自动加载系统配置的多种实现方式

《SpringBoot项目启动后自动加载系统配置的多种实现方式》:本文主要介绍SpringBoot项目启动后自动加载系统配置的多种实现方式,并通过代码示例讲解的非常详细,对大家的学习或工作有一定的... 目录1. 使用 CommandLineRunner实现方式:2. 使用 ApplicationRunne

SpringBoot项目删除Bean或者不加载Bean的问题解决

《SpringBoot项目删除Bean或者不加载Bean的问题解决》文章介绍了在SpringBoot项目中如何使用@ComponentScan注解和自定义过滤器实现不加载某些Bean的方法,本文通过实... 使用@ComponentScan注解中的@ComponentScan.Filter标记不加载。@C

springboot 加载本地jar到maven的实现方法

《springboot加载本地jar到maven的实现方法》如何在SpringBoot项目中加载本地jar到Maven本地仓库,使用Maven的install-file目标来实现,本文结合实例代码给... 在Spring Boothttp://www.chinasem.cn项目中,如果你想要加载一个本地的ja

最好用的WPF加载动画功能

《最好用的WPF加载动画功能》当开发应用程序时,提供良好的用户体验(UX)是至关重要的,加载动画作为一种有效的沟通工具,它不仅能告知用户系统正在工作,还能够通过视觉上的吸引力来增强整体用户体验,本文给... 目录前言需求分析高级用法综合案例总结最后前言当开发应用程序时,提供良好的用户体验(UX)是至关重要

MyBatis延迟加载的处理方案

《MyBatis延迟加载的处理方案》MyBatis支持延迟加载(LazyLoading),允许在需要数据时才从数据库加载,而不是在查询结果第一次返回时就立即加载所有数据,延迟加载的核心思想是,将关联对... 目录MyBATis如何处理延迟加载?延迟加载的原理1. 开启延迟加载2. 延迟加载的配置2.1 使用

Android WebView的加载超时处理方案

《AndroidWebView的加载超时处理方案》在Android开发中,WebView是一个常用的组件,用于在应用中嵌入网页,然而,当网络状况不佳或页面加载过慢时,用户可能会遇到加载超时的问题,本... 目录引言一、WebView加载超时的原因二、加载超时处理方案1. 使用Handler和Timer进行超

Flutter 进阶:绘制加载动画

绘制加载动画:由小圆组成的大圆 1. 定义 LoadingScreen 类2. 实现 _LoadingScreenState 类3. 定义 LoadingPainter 类4. 总结 实现加载动画 我们需要定义两个类:LoadingScreen 和 LoadingPainter。LoadingScreen 负责控制动画的状态,而 LoadingPainter 则负责绘制动画。