Flink 的键控状态和操作符状态

2023-12-02 20:59
文章标签 状态 flink 操作符 键控

本文主要是介绍Flink 的键控状态和操作符状态,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、键控状态(Keyed State)介绍

键控状态是 Apache Flink 中一种重要的状态管理方式,它允许用户在流处理应用中存储和访问与特定键相关的状态。在流处理应用中,键控状态可用于存储和更新与特定键相关的信息,比如某个键的累加计数或最近的事件记录等。通过键控状态,Flink 可以保持对流中每个键的状态进行跟踪和维护,从而使得应用可以根据键的属性进行灵活的处理和计算。

1.键控状态的类型
Flink 提供了不同类型的键控状态,包括:

  • ValueState:用于存储单个值的状态。
  • ListState:用于存储列表的状态。
  • MapState:用于存储键值对的状态。
  • ReducingState:用于存储一个可变的聚合结果的状态。
  • AggregatingState:用于存储一个可变的聚合结果的状态,但是可以通过提供一个 AggregateFunction 对状态进行累积和合并。

2.键控状态的生命周期
键控状态的生命周期与键相关联,当一个键第一次出现时,Flink 会自动为该键创建对应的状态。键控状态会在应用程序的整个生命周期中保持存在,并在每个事件到达时进行更新。当一个键不再出现时,Flink 会将与之相关的状态释放,以便释放内存资源。

3.键控状态的访问
Flink 提供了对键控状态的访问方式。在 ProcessFunctionCoProcessFunction KeyedProcessFunction 等函数中,可以通过 getRuntimeContext().getState() 方法来获取对键控状态的引用。然后,可以使用状态引用的方法进行状态的读取和更新操作。

二、操作符状态(Operator State)介绍

操作符状态是 Flink 中另一种重要的状态管理方式,它允许用户在流处理应用中存储和访问与操作符相关的状态。与键控状态不同,操作符状态与键无关,它可以被应用程序中的所有键和操作符共享。操作符状态通常用于存储一些全局的信息或累加计数等与键无关的状态。

1.操作符状态的类型
Flink 提供了不同类型的操作符状态,包括:

  • ValueState:用于存储单个值的状态。
  • ListState:用于存储列表的状态。
  • MapState:用于存储键值对的状态。
  • ReducingState:用于存储一个可变的聚合结果的状态。
  • AggregatingState:用于存储一个可变的聚合结果的状态,但是可以通过提供一个 AggregateFunction 对状态进行累积和合并。

2.操作符状态的生命周期
操作符状态的生命周期与应用程序的生命周期相关联。当应用程序启动时,Flink 会为每个操作符创建对应的状态。操作符状态会在整个应用程序执行过程中保持存在,并在每个事件到达时进行更新。当应用程序停止时,Flink 会将操作符状态释放,以便释放内存资源。

3.操作符状态的访问
Flink 提供了对操作符状态的访问方式。在 ProcessFunctionCoProcessFunction KeyedProcessFunction 等函数中,可以通过 getRuntimeContext().getState() 方法来获取对操作符状态的引用。然后,可以使用状态引用的方法进行状态的读取和更新操作。

三、参数介绍和完整代码案例

1.键控状态的参数介绍和代码案例
Flink 中,使用键控状态需要以下参数:

  • KeyedStream:表示输入流的键控流。
  • KeySelector:用于从输入流中选择键的函数。
  • ValueStateDescriptor:用于描述键控状态的类型和名称。

以下是一个使用键控状态的示例代码:

// 导入必要的包

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.api.common.state.ValueStateDescriptor;

public class KeyedStateExample {

    public static void main(String[] args) throws Exception {

        // 创建执行环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建输入流

        DataStream<String> input = env.fromElements("apple", "orange", "banana", "apple", "banana");

        // 将输入流转换为键控流

        KeyedStream<Tuple2<String, Integer>, String> keyedStream = input.map(new MapFunction<String, Tuple2<String, Integer>>() {

            @Override

            public Tuple2<String, Integer> map(String value) throws Exception {

                return new Tuple2<>(value, 1);

            }

        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override

            public String getKey(Tuple2<String, Integer> value) throws Exception {

                return value.f0;

            }

        });

        // 定义键控状态描述符

        ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("count", Integer.class);

        // 根据键控流更新状态

        keyedStream.map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {

            private static final long serialVersionUID = 1L;

            @Override

            public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {

                // 获取键控状态

                ValueState<Integer> state = getRuntimeContext().getState(stateDescriptor);

                // 获取当前键的计数

                Integer count = state.value();

                // 更新计数

                count = count != null ? count + 1 : 1;

                state.update(count);

                // 返回键和计数

                return new Tuple2<>(value.f0, count);

            }

        }).print();

        // 执行任务

        env.execute("Keyed State Example");

    }

}

2.操作符状态的参数介绍和代码案例
Flink 中,使用操作符状态需要以下参数:

  • DataStream:表示输入流。
  • OperatorState:用于描述操作符状态的类型和名称。

以下是一个使用操作符状态的示例代码:

// 导入必要的包

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.common.state.OperatorState;

import org.apache.flink.api.common.state.OperatorStateDescriptor;

public class OperatorStateExample {

    public static void main(String[] args) throws Exception {

        // 创建执行环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建输入流

        DataStream<String> input = env.fromElements("apple", "orange", "banana", "apple", "banana");

        // 定义操作符状态描述符

        OperatorStateDescriptor<Integer> stateDescriptor = new OperatorStateDescriptor<>("count", Integer.class, 0);

        // 根据输入流更新状态

        input.map(new MapFunction<String, Tuple2<String, Integer>>() {

            private static final long serialVersionUID = 1L;

            private OperatorState<Integer> state;

            @Override

            public void open(Configuration parameters) throws Exception {

                // 获取操作符状态

                state = getRuntimeContext().getOperatorState(stateDescriptor);

            }

            @Override

            public Tuple2<String, Integer> map(String value) throws Exception {

                // 获取当前状态

                Integer count = state.value();

                // 更新状态

                count = count + 1;

                state.update(count);

                // 返回键和计数

                return new Tuple2<>(value, count);

            }

        }).print();

        // 执行任务

        env.execute("Operator State Example");

    }

}

以上代码示例中,分别展示了如何使用键控状态和操作符状态。键控状态可以在键控流中对每个键的状态进行跟踪和更新,而操作符状态可以在整个应用程序中共享和更新。通过对状态的读取和更新操作,可以实现更复杂的流处理应用。代码中的注释提供了详细的解释和说明,有助于理解和执行生成的代码。

这篇关于Flink 的键控状态和操作符状态的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/446799

相关文章

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

hdu1565(状态压缩)

本人第一道ac的状态压缩dp,这题的数据非常水,很容易过 题意:在n*n的矩阵中选数字使得不存在任意两个数字相邻,求最大值 解题思路: 一、因为在1<<20中有很多状态是无效的,所以第一步是选择有效状态,存到cnt[]数组中 二、dp[i][j]表示到第i行的状态cnt[j]所能得到的最大值,状态转移方程dp[i][j] = max(dp[i][j],dp[i-1][k]) ,其中k满足c

状态dp总结

zoj 3631  N 个数中选若干数和(只能选一次)<=M 的最大值 const int Max_N = 38 ;int a[1<<16] , b[1<<16] , x[Max_N] , e[Max_N] ;void GetNum(int g[] , int n , int s[] , int &m){ int i , j , t ;m = 0 ;for(i = 0 ;

hdu3006状态dp

给你n个集合。集合中均为数字且数字的范围在[1,m]内。m<=14。现在问用这些集合能组成多少个集合自己本身也算。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.Inp

C++操作符重载实例(独立函数)

C++操作符重载实例,我们把坐标值CVector的加法进行重载,计算c3=c1+c2时,也就是计算x3=x1+x2,y3=y1+y2,今天我们以独立函数的方式重载操作符+(加号),以下是C++代码: c1802.cpp源代码: D:\YcjWork\CppTour>vim c1802.cpp #include <iostream>using namespace std;/*** 以独立函数

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

实例:如何统计当前主机的连接状态和连接数

统计当前主机的连接状态和连接数 在 Linux 中,可使用 ss 命令来查看主机的网络连接状态。以下是统计当前主机连接状态和连接主机数量的具体操作。 1. 统计当前主机的连接状态 使用 ss 命令结合 grep、cut、sort 和 uniq 命令来统计当前主机的 TCP 连接状态。 ss -nta | grep -v '^State' | cut -d " " -f 1 | sort |

状态模式state

学习笔记,原文链接 https://refactoringguru.cn/design-patterns/state 在一个对象的内部状态变化时改变其行为, 使其看上去就像改变了自身所属的类一样。 在状态模式中,player.getState()获取的是player的当前状态,通常是一个实现了状态接口的对象。 onPlay()是状态模式中定义的一个方法,不同状态下(例如“正在播放”、“暂停

qml states 状态

states 状态 在QML中,states用于定义对象在不同状态下的属性变化。每个状态可以包含一组属性设置,当状态改变时,这些属性设置会被应用到对象上。 import QtQuick 2.15import QtQuick.Controls 2.15// 定义应用程序的主窗口ApplicationWindow {visible: true // 使窗口可见width: 640 /

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe