Flink 计数器Accumulator

2024-06-18 21:28
文章标签 flink 计数器 accumulator

本文主要是介绍Flink 计数器Accumulator,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简述

在 Apache Flink 中,Accumulator 是一个用于收集作业执行期间聚合信息的工具。它允许在用户定义的函数(如 MapFunction, FlatMapFunction, ProcessFunction 等)中累积值,并在作业完成后检索这些值。这对于跟踪诸如事件数量、处理延迟等统计信息非常有用。

要使用 Accumulator,需要首先定义一个 Accumulator 接口的实现,然后在用户定义函数中注册和使用它。

1. 定义 Accumulator:

通常,不需要直接定义 Accumulator 接口的实现,因为 Flink 已经为提供了一些内置的 Accumulator 类型,如 IntCounter, LongCounter, DoubleCounter 等。但如果需要自定义的聚合逻辑,可以实现 Accumulator 接口。

2. 在函数中使用 Accumulator:

在用户定义函数中,可以通过 getRuntimeContext().getAccumulator(“name”) 来获取或注册一个 Accumulator。然后,可以在逻辑中更新它的值。

3. 检索 Accumulator 的值:

在作业执行完成后,可以通过 JobExecutionResult 的 getAccumulatorResult() 方法来检索 Accumulator 的值。

但请注意,由于 Accumulator 已经被 Metric 系统所取代,以下是一个使用 Metric 的示例,它提供了类似的功能:

import org.apache.flink.api.common.functions.RuntimeContext;  
import org.apache.flink.metrics.Counter;  
import org.apache.flink.streaming.api.functions.source.SourceFunction;  public class MySourceFunction implements SourceFunction<String> {  private transient Counter counter;  @Override  public void open(Configuration parameters) throws Exception {  super.open(parameters);  // 获取或注册一个 Counter  this.counter = getRuntimeContext().getMetricGroup().counter("my-counter");  }  @Override  public void run(SourceContext<String> ctx) throws Exception {  // ... 数据源逻辑 ...  // 更新 Counter 的值  counter.inc();  // 发送数据到下游  ctx.collect("some data");  }  // ... 其他必要的方法 ...  
}

在这个示例中,我们使用了 Flink 的 Metric 系统来创建一个计数器,并在数据源函数中更新它的值。这样,就可以在作业执行期间跟踪和检索这个计数器的值了。

代码

package com.wfg.flink.connector.mongodb;import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;/*** @author wfg* 根据名字统计访问次数*/
public class MongoAccumulatorCounts {public static void main(String[] args) throws Exception {String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");System.out.println("StartTime:" + startTime);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启Checkpointing,设置Checkpoint间隔env.enableCheckpointing(30000);// 设置Checkpoint模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置最小Checkpoint间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 设置最大并发Checkpoint数目env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 使用RocksDB作为状态后端env.setStateBackend(new HashMapStateBackend());env.setParallelism(10);// 配置MongoDB源MongoSource<String> mongoSource = MongoSource.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection(MONGO_TEST_PV_COLLECTION).setFetchSize(2048)
//                .setLimit(10000).setNoCursorTimeout(true).setPartitionStrategy(PartitionStrategy.SINGLE).setPartitionSize(MemorySize.ofMebiBytes(64)).setDeserializationSchema(new MongoDeserializationSchema<>() {@Overridepublic String deserialize(BsonDocument document) {return document.toJson();}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();// 创建MongoDB数据流DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据,提取人名作为KeyDataStream<Tuple2<String, Integer>> nameCountStream = sourceStream.map(new RichMapFunction<String, Tuple2<String, Integer>>() {private LongCounter elementCounter = new LongCounter();Long count = 0L;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//-2注册累加器getRuntimeContext().addAccumulator("elementCounter", elementCounter);}@Overridepublic Tuple2<String, Integer> map(String value) {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);//-3.使用累加器this.elementCounter.add(1);count += 1;System.out.println("不使用累加器统计的结果:" + count);return Tuple2.of(data.getUserName(), 1);}}).setParallelism(10);
//                .keyBy(value->value.f0)
//                 .sum("f1");sourceStream.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);
//-4.获取加强结果JobExecutionResult jobResult = env.execute();long nums = jobResult.getAccumulatorResult("elementCounter");System.out.println("使用累加器统计的结果:" + nums);System.out.println("-----------------------------------");System.out.println("startTime: " + startTime);System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));}
}

这篇关于Flink 计数器Accumulator的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1073101

相关文章

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

SQL2005 性能监视器计数器错误解决方法

【系统环境】 windows 2003 +sql2005 【问题状况】 用户在不正当删除SQL2005后会造成SQL2005 性能监视器计数器错误,如下图 【解决办法】 1、在 “开始” --> “运行”中输入 regedit,开启注册表编辑器,定位到 [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVer

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Apache-Flink深度解析-State

来源:https://dwz.cn/xrMCqbk5 Flink系列精华文章合集入门篇: Flink入门Flink DataSet&DataSteam APIFlink集群部署Flink重启策略Flink分布式缓存Flink重启策略Flink中的TimeFlink中的窗口Flink的时间戳和水印Flink广播变量Flink-Kafka-connetorFlink-Table&SQLFlink

Apache-Flink深度解析-Temporal-Table-JOIN

在《JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作流程如下:

Flink 原理与实现:Operator Chain原理

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇

Flink原理与实现:如何生成ExecutionGraph及物理执行图

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇

Flink原理与实现:Window的实现原理

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇