flink 加载外部cep规则

2023-10-19 14:32
文章标签 加载 规则 flink 外部 cep

本文主要是介绍flink 加载外部cep规则,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

主程序代码:

package cepengine.App;import cepengine.domain.Event;
import cepengine.domain.InputEventSchema;
import cepengine.domain.OutputEventSchema;
import groovy.lang.GroovyClassLoader;
import groovy.lang.GroovyObject;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Properties;public class FlinkKafkaSimpleSchema {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*** 设置检查点*/env.enableCheckpointing(5000);ObjectMapper mapper = new ObjectMapper();try {/*** 加载外部规则*/GroovyClassLoader loader = new GroovyClassLoader();File file = new File("./src/main/java/cepengine/scripts/sRule.groovy");Class aClass = loader.parseClass(file);GroovyObject groovyObject = (GroovyObject) aClass.newInstance();Pattern<Event, Event> pattern = (Pattern<Event, Event>) groovyObject.invokeMethod("run", null);/** 初始化 Consumer 配置 */Properties consumerConfig = new Properties();consumerConfig.setProperty("bootstrap.servers", "localhost:9092");consumerConfig.setProperty("group.id", "risk_control");/** 初始化 Kafka Consumer */FlinkKafkaConsumer<Event> flinkKafkaConsumer =new FlinkKafkaConsumer<Event>("flink_kafka_poc_input",new InputEventSchema(),consumerConfig);/** 配置offset */flinkKafkaConsumer.setStartFromEarliest();/** 将 Kafka Consumer 加入到流处理 */DataStream<Event> stream = env.addSource(flinkKafkaConsumer);/*** 匹配规则*/PatternStream<Event> patternStream = CEP.pattern(stream, pattern);DataStream<Event> outstream = patternStream.select(new PatternSelectFunction<Event, Event>() {@Overridepublic Event select(Map<String, List<Event>> map) throws Exception {List<Event> next = map.get("next");return new Event(next.get(0).getKey(), next.get(0).getValue(), next.get(0).getTopic(),next.get(0).getPartition(),next.get(0).getOffset());}});outstream.print("next");/** 初始化 Producer 配置 */Properties producerConfig = new Properties();producerConfig.setProperty("bootstrap.servers", "localhost:9092");producerConfig.setProperty("max.request.size", "102428800");/** 初始化 Kafka Producer */FlinkKafkaProducer<Event> myProducer = new FlinkKafkaProducer<Event>("flink_kafka_poc_output",new OutputEventSchema(),producerConfig);/** 将 Kafka Producer 加入到流处理 */outstream.addSink(myProducer);/** 执行 */env.execute();} catch (Exception e) {}}}

 

Groovy脚本:

package cepengine.scriptsimport cepengine.domain.Event
import org.apache.flink.cep.pattern.Pattern
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.streaming.api.windowing.time.Timeclass sRule implements Serializable{def run() {Pattern<Event, ?> pattern =Pattern.<String>begin("begin").where(new SimpleCondition<Event>() {@Overrideboolean filter(Event event) throws Exception {return  event.getValue().contains("失败")}}).next("next").where(new SimpleCondition<Event>() {@Overrideboolean filter(Event event) throws Exception {return event.getValue().contains("失败")}})//
//                .next("next2")
//                .where(new SimpleCondition<LoginEvent>() {
//            @Override
//            boolean filter(LoginEvent loginEvent) throws Exception {
//                return loginEvent.getType().equals("success")
//            }
//        }).within(Time.seconds(5))return pattern}
}

kafka consumer schema:

package cepengine.domain;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;import java.io.IOException;
import java.nio.charset.StandardCharsets;import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;public class InputEventSchema implements KeyedDeserializationSchema<Event> {@Overridepublic Event deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {String msg = new String(message, StandardCharsets.UTF_8);String key = null;if (messageKey != null) {key = new String(messageKey, StandardCharsets.UTF_8);}return new Event( key, msg,topic, partition, offset);}@Overridepublic boolean isEndOfStream(Event nextElement) {return false;}@Overridepublic TypeInformation<Event> getProducedType() {return getForClass(Event.class);}}

kafka producer schema:

package cepengine.domain;import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;public class OutputEventSchema implements KeyedSerializationSchema<Event> {@Overridepublic byte[] serializeKey(Event event) {return event.getKey().getBytes();}@Overridepublic byte[] serializeValue(Event event) {return event.getValue().getBytes();}@Overridepublic String getTargetTopic(Event event) {return null;}}

Event类:

package cepengine.domain;public class Event {private String topic;private int partition;private long offset;private String value;private  String key;@Overridepublic String toString() {return "Event{" +"topic='" + topic + '\'' +", partition=" + partition +", offset=" + offset +", value='" + value + '\'' +", key='" + key + '\'' +'}';}public Event() {}public Event(String key, String value, String topic, int partition, long offset) {this.key = key;this.value = value;this.topic = topic;this.partition = partition;this.offset = offset;}public String getKey() {return key;}public void setKey(String key) {this.key = key;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public int getPartition() {return partition;}public void setPartition(int partition) {this.partition = partition;}public long getOffset() {return offset;}public void setOffset(long offset) {this.offset = offset;}public String getValue() {return value;}public void setValue(String Value) {this.value = value;}}

目前只是基于文件加载规则,后续改造为基于数据库加载规则。

这篇关于flink 加载外部cep规则的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/240429

相关文章

Spring如何使用注解@DependsOn控制Bean加载顺序

《Spring如何使用注解@DependsOn控制Bean加载顺序》:本文主要介绍Spring如何使用注解@DependsOn控制Bean加载顺序,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录1.javascript 前言2. 代码实现总结1. 前言默认情况下,Spring加载Bean的顺

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

C++作用域和标识符查找规则详解

《C++作用域和标识符查找规则详解》在C++中,作用域(Scope)和标识符查找(IdentifierLookup)是理解代码行为的重要概念,本文将详细介绍这些规则,并通过实例来说明它们的工作原理,需... 目录作用域标识符查找规则1. 普通查找(Ordinary Lookup)2. 限定查找(Qualif

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

springboot加载不到nacos配置中心的配置问题处理

《springboot加载不到nacos配置中心的配置问题处理》:本文主要介绍springboot加载不到nacos配置中心的配置问题处理,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录springboot加载不到nacos配置中心的配置两种可能Spring Boot 版本Nacos

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Nginx Location映射规则总结归纳与最佳实践

《NginxLocation映射规则总结归纳与最佳实践》Nginx的location指令是配置请求路由的核心机制,其匹配规则直接影响请求的处理流程,下面给大家介绍NginxLocation映射规则... 目录一、Location匹配规则与优先级1. 匹配模式2. 优先级顺序3. 匹配示例二、Proxy_pa

Java -jar命令如何运行外部依赖JAR包

《Java-jar命令如何运行外部依赖JAR包》在Java应用部署中,java-jar命令是启动可执行JAR包的标准方式,但当应用需要依赖外部JAR文件时,直接使用java-jar会面临类加载困... 目录引言:外部依赖JAR的必要性一、问题本质:类加载机制的限制1. Java -jar的默认行为2. 类加

java -jar命令运行 jar包时运行外部依赖jar包的场景分析

《java-jar命令运行jar包时运行外部依赖jar包的场景分析》:本文主要介绍java-jar命令运行jar包时运行外部依赖jar包的场景分析,本文给大家介绍的非常详细,对大家的学习或工作... 目录Java -jar命令运行 jar包时如何运行外部依赖jar包场景:解决:方法一、启动参数添加: -Xb

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1