Flink CEP实现10秒内连续登录失败用户分析

2024-01-31 08:12

本文主要是介绍Flink CEP实现10秒内连续登录失败用户分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.roy</groupId><artifactId>FlinkDemo</artifactId><version>1.0</version><properties><flink.version>1.12.5</flink.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><log4j.version>2.12.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- CEP主要是下面这个依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.14</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.1.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(new PatternProcessFunction<Event, Result>() {@Overridepublic void processMatch(Map<String, List<Event>> pattern,Context ctx,Collector<Result> out) throws Exception {}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.List;
import java.util.Map;/*** @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配*/
public class MyUserLoginAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔env.getConfig().setAutoWatermarkInterval(1000L);// 使用Socket测试env.setParallelism(1);// 1、获取原始事件流(10.86.97.206改为实际地址)final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {@Overridepublic UserLoginRecord map(String s) throws Exception {final String[] splitVal = s.split(",");return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间.withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime()));// 2、定义匹配器// 2.1:10秒内出现3次登录失败的记录(不一定连续)// Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));// 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followByfinal Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("two").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).next("three").where(new SimpleCondition<UserLoginRecord>() {@Overridepublic boolean filter(UserLoginRecord value) throws Exception {return 1 == value.getLoginRes();}}).within(Time.seconds(10));// 3、获取匹配流final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);final MyProcessFunction myProcessFunction = new MyProcessFunction();// 4、将匹配流中的数据处理成结果数据流final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);badUserStream.print("badUser");env.execute("UserLoginAna");}// mainpublic static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{@Overridepublic void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {// 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
//                out.collect(record);
//            }// 针对2.2 非连续3次登录失败final List<UserLoginRecord> records = match.get("three");for(UserLoginRecord record : records){out.collect(record);}}// processMarch}// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {private String userId;private int loginRes; // 0-成功, 1-失败private long loginTime;public UserLoginRecord() {}public UserLoginRecord(String userId, int loginRes, long loginTime) {this.userId = userId;this.loginRes = loginRes;this.loginTime = loginTime;}@Overridepublic String toString() {return "UserLoginRecord{" +"userId='" + userId + '\'' +", loginRes=" + loginRes +", loginTime=" + loginTime +'}';}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public int getLoginRes() {return loginRes;}public void setLoginRes(int loginRes) {this.loginRes = loginRes;}public long getLoginTime() {return loginTime;}public void setLoginTime(long loginTime) {this.loginTime = loginTime;}
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

这篇关于Flink CEP实现10秒内连续登录失败用户分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/663111

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

poj2406(连续重复子串)

题意:判断串s是不是str^n,求str的最大长度。 解题思路:kmp可解,后缀数组的倍增算法超时。next[i]表示在第i位匹配失败后,自动跳转到next[i],所以1到next[n]这个串 等于 n-next[n]+1到n这个串。 代码如下; #include<iostream>#include<algorithm>#include<stdio.h>#include<math.

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

【测试】输入正确用户名和密码,点击登录没有响应的可能性原因

目录 一、前端问题 1. 界面交互问题 2. 输入数据校验问题 二、网络问题 1. 网络连接中断 2. 代理设置问题 三、后端问题 1. 服务器故障 2. 数据库问题 3. 权限问题: 四、其他问题 1. 缓存问题 2. 第三方服务问题 3. 配置问题 一、前端问题 1. 界面交互问题 登录按钮的点击事件未正确绑定,导致点击后无法触发登录操作。 页面可能存在