【Flink】状态编程: 订单超时告警

2024-08-29 10:32

本文主要是介绍【Flink】状态编程: 订单超时告警,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 一、基础概念
  • 二、Flink状态编程
    • 1、支持的状态类型
    • 2、Managed Keyed State
      • 六种类型
      • 基本API
      • 状态的生命周期
    • 3、Managed Operator State
  • 三、案例:订单延迟告警统计
    • 1、需求描述
    • 2、需求分析
    • 3、数据与模型
    • 4、详细实现

Flink状态编程学习小结,附订单超时告警实战案例。

更多内容详见:https://github.com/pierre94/flink-notes

一、基础概念

在Flink架构体系中,有状态计算可以说是Flink非常重要的特性之一。

image.png

有状态计算是指:

在程序计算过程中,在Flink程序内部存储计算产生的中间结果,并提供给后续Function或算子计算结果使用。(如下图所示)

image.png

无状态计算实现的复杂度相对较低,实现起来较容易,但是无法完成提到的比较复杂的业务场景:

  • CEP(复杂事件处理):获取符合某一特定事件规则的事件,状态计算就可以将接入的事件进行存储,然后等待符合规则的事件触发
  • 最大值、均值等聚合指标(如pv,uv): 需要利用状态来维护当前计算过程中产生的结果,例如事件的总数、总和以及最大,最小值等
  • 机器学习场景,维护当前版本模型使用的参数
  • 其他需要使用历史数据的计算

二、Flink状态编程

1、支持的状态类型

Flink根据数据集是否根据Key进行分区,将状态分为Keyed State和Operator State(Non-keyed State)两种类型。

其中Keyed State是Operator State的特例,可以通过Key Groups进行管理,主要用于当算子并行度发生变化时,自动重新分布Keyed Sate数据

同时在Flink中Keyed State和Operator State均具有两种形式:

  • 一种为托管状态(ManagedState)形式,由Flink Runtime中控制和管理状态数据,并将状态数据转换成为内存Hashtables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久化到Checkpoints中,任务异常时可以通过这些状态数据恢复任务。
  • 另外一种是原生状态(Raw State)形式,由算子自己管理数据结构,当触发Checkpoint过程中,Flink并不知道状态数据内部的数据结构,只是将数据转换成bytes数据存储在Checkpoints中,当从Checkpoints恢复任务时,算子自己再反序列化出状态的数据结构。

在Flink中推荐用户使用Managed State管理状态数据,主要原因是Managed State能够更好地支持状态数据的重平衡以及更加完善的内存管理。

2、Managed Keyed State

六种类型

Managed Keyed State 又分为如下六种类型:

image.png

FoldingState已经被标注为deprecated

基本API

在Flink中需要通过创建StateDescriptor来获取相应State的操作类。如下方代码,构建一个ValueState:

lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))

其中对ValueState可以增删改查:

# 获取状态值
val isPayed = isPayedState.value()# 更新状态值
isPayedState.update(true)# 释放状态值
isPayedState.clear()

状态的生命周期

对于任何类型Keyed State都可以设定状态的生命周期(TTL),以确保能够在规定时间内及时地清理状态数据。

实现方法:

1、生成StateTtlConfig配置

2、将StateTtlConfig配置传入StateDescriptor中的enableTimeToLive方法中即可

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildval stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

StateTtlConfig的详细配置见: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/state.html#state-time-to-live-ttl

3、Managed Operator State

Operator State是一种non-keyed state,与并行的操作算子实例相关联,例如在KafkaConnector中,每个Kafka消费端算子实例都对应到Kafka的一个分区中,维护Topic分区和Offsets偏移量作为算子的Operator State。在Flink中可以实现Checkpointed-Function或者ListCheckpointed<T extends Serializable>两个接口来定义操作Managed Operator State的函数。

(待补充……)

三、案例:订单延迟告警统计

1、需求描述

需求与数据来自《大数据技术之电商用户行为分析》

在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。

所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。

此时需要给用户发送一个信息提醒用户,提高支付转换率!

2、需求分析

本需求可以使用CEP来实现,但这里推荐使用process function原生的状态编程。

问题可以简化成: 在pay事件超时未发生的情况下,输出超时报警信息。

一个简单的思路是:

  1. 在订单的 create 事件到来后注册定时器,15分钟后触发;
  2. 用一个布尔类型的 Value 状态来作为标识位,表明 pay 事件是否发生过。
  3. 如果 pay 事件已经发生,状态被置为true,那么就不再需要做什么操作;
  4. 而如果 pay 事件一直没来,状态一直为false,到定时器触发时,就应该输出超时报警信息。

3、数据与模型

示例数据:

34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,modify,3hu3k2432,1558430845
34731,create,,1558430846
34731,pay,35jue34we,1558430849
34732,create,,1558430852
34733,create,,1558430855
34734,create,,1558430859
34734,create,,1558431000
34733,pay,,1558431000             
34732,pay,,1558449999   

我们可以得到Flink的输入与输出类

// 定义输

这篇关于【Flink】状态编程: 订单超时告警的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1117606

相关文章

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

hdu1565(状态压缩)

本人第一道ac的状态压缩dp,这题的数据非常水,很容易过 题意:在n*n的矩阵中选数字使得不存在任意两个数字相邻,求最大值 解题思路: 一、因为在1<<20中有很多状态是无效的,所以第一步是选择有效状态,存到cnt[]数组中 二、dp[i][j]表示到第i行的状态cnt[j]所能得到的最大值,状态转移方程dp[i][j] = max(dp[i][j],dp[i-1][k]) ,其中k满足c

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

状态dp总结

zoj 3631  N 个数中选若干数和(只能选一次)<=M 的最大值 const int Max_N = 38 ;int a[1<<16] , b[1<<16] , x[Max_N] , e[Max_N] ;void GetNum(int g[] , int n , int s[] , int &m){ int i , j , t ;m = 0 ;for(i = 0 ;

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

hdu3006状态dp

给你n个集合。集合中均为数字且数字的范围在[1,m]内。m<=14。现在问用这些集合能组成多少个集合自己本身也算。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.Inp

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动