是谁还不会flink的checkpoint呀~

2024-09-07 10:04
文章标签 flink 不会 checkpoint

本文主要是介绍是谁还不会flink的checkpoint呀~,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、State Vs Checkpoint

State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息

一句话概括: Checkpoint就是State的快照

目的:假设作业停止了,下次启动的时候可以加载快照中的状态数据,以此到达恢复数据的目的。

State backend(状态后端):状态后端用于控制状态保存的位置的。

2、设置Checkpoint 

 

package com.demo.day4;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.HashMap;
import java.util.Map;public class CheckpointDemo03 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//设置checkpointenv.enableCheckpointing(1000);// 默认ck是关闭,这行代码开启checkpoint,并且控制每隔多久进行一次ckenv.setStateBackend(new FsStateBackend("hdfs://hdfs-cluster/flink-checkpoint"));CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setGroupId("g1").setStartingOffsets(OffsetsInitializer.latest()).build();DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source");DataStream<Tuple2<String, Long>> tupleDS = ds.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return Tuple2.of(arr[0],Long.parseLong(arr[1]));}});DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {private ValueState<Long> maxValueState = null;//-2.初始化状态(一次)@Overridepublic void open(Configuration parameters) throws Exception {//创建状态描述器ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("maxValueState", Long.class);//根据状态描述器获取状态maxValueState = getRuntimeContext().getState(stateDescriptor);}//-3.使用状态@Overridepublic Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {Long currentValue = value.f1;//当前值Long maxValue = maxValueState.value();//历史最大值if (maxValue == null || currentValue > maxValue) {maxValue = currentValue;//当前值作为最大值//-4.更新状态maxValueState.update(maxValue);}return Tuple3.of(value.f0, currentValue, maxValue);}});result2.print();env.execute();}
}

上传jar包到hdfs:

 运行:

 tijiao.sh:

#! /bin/bashflink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day4.CheckpointDemo03  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

运行结果:

测试:

 

停止job:

再输入测试数据:

 再次运行(加入检查点/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416):

 

 tijiao.sh:

#! /bin/bashflink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-s hdfs://hdfs-cluster/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416  \
-c com.demo.day4.CheckpointDemo03  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

运行成功:

 

 查看(关闭任务后的数据可以成功显示):

 

再次测试:

 

 

3、设置savepoint 

 checkpoint自动完成state快照、savePoint是手动的完成快照。

 与checkpoint类似,将jar包上传到hdfs

执行job:

#! /bin/bashflink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day4.CheckpointDemo03  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

查看此刻的yarnid(application_1725272814044_0002)以及jobid(f28c53d5ccf838c8d5e7ca7b8c45e5b0) 

 

测试:

 

 执行savepoint操作(2种方法)

停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://hdfs-cluster/flink-savepoint  f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002不会停止flink的job,只是完成savepoint操作
语法:flink savepoint jobId  savePointpath -yid yarn applicationId flink savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 hdfs://hdfs-cluster/flink-savepoint -yid application_1725272814044_0002

运行第一种:

flink stop --savepointPath  hdfs://hdfs-cluster/flink-savepoint  f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002

 关闭job,再次测试:

 查看最近完成的flink job对应的savepoint,进行快照恢复:

 根据之前的savepoint路径,重新启动flink job

#! /bin/bashflink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day4.CheckpointDemo03  \
-s hdfs://hdfs-cluster/flink-savepoint/savepoint-f28c53-23e87194a6b2 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

再次查看(中断时输入的数据): 

再次输入数据: 

 

 4、Dinkey--checkpoint 

set 'execution.checkpointing.interval'='2sec';
set 'state.checkpoints.dir'='hdfs://hdfs-cluster/flink-checkpoint';
set 'execution.checkpointing.externalized-checkpoint-retention'='RETAIN_ON_CANCELLATION';CREATE TABLE table1 (actionTime BIGINT,monitorId string,cameraId  string,car     string,speed    double,roadId string,areaId string,event_time  as proctime()     -- 计算列(时间类型为timestamp)
) WITH ('connector' = 'kafka','topic' = 'topic1','properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092','properties.group.id' = 'g1','scan.startup.mode' = 'latest-offset','format' = 'json'
);create table table2(id     int,start_time  TIMESTAMP,end_time    TIMESTAMP,monitor_id  string,avg_speed   double,car_count   BIGINT
)WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://hadoop11:3306/cars','table-name' = 't_average_speed','username' = 'root','password' = '123456');insert into table2
SELECT CAST(null as int) as id,window_start as start_time,window_end as end_time,monitorId as monitor_id,AVG(speed) as avg_speed,COUNT(DISTINCT car) as car_count
from table(HOP(table table1,DESCRIPTOR(event_time),INTERVAL '30' SECONDS,INTERVAL '60' SECONDS))
group by monitorId,window_start,window_end;

 

这篇关于是谁还不会flink的checkpoint呀~的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1144752

相关文章

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

如何保证android程序进程不到万不得已的情况下,不会被结束

最近,做一个调用系统自带相机的那么一个功能,遇到的坑,在此记录一下。 设备:红米note4 问题起因 因为自定义的相机,很难满足客户的所有需要,比如:自拍杆的支持,优化方面等等。这些方面自定义的相机都不比系统自带的好,因为有些系统都是商家定制的,难免会出现一个奇葩的问题。比如:你在这款手机上运行,无任何问题,然而你换一款手机后,问题就出现了。 比如:小米的红米系列,你启用系统自带拍照功能后

看完这个不会配置 logback ,请你吃瓜!

之前在 日志?聊一聊slf4j吧 这篇文章中聊了下slf4j。本文也从实际的例子出发,针对logback的日志配置进行学习。 logack 简介 logback 官网:https://logback.qos.ch/ 目前还没有看过日志类框架的源码,仅限于如何使用。所以就不说那些“空话”了。最直观的认知是: logback和log4j是一个人写的springboot默认使用的日志框架是

在项目开发中,jsp页面不会少了,如何公用页面(添加页面和修改页面)和公用样式代码(css,js)?

在项目开发中,如何公用添加页面和修改页面? <%@ page language="java" import="java.util.*" pageEncoding="utf-8"%><html><head><title>岗位设置</title><%@ include file="/WEB-INF/jsp/public/common.jspf"%></head><body> <!-- 标

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

涉密电脑插U盘会不会被发现?如何禁止涉密电脑插U盘?30秒读懂!

在涉密电脑插U盘的那一瞬间,你是否也好奇会不会被发现?涉密电脑的安全监控可是滴水不漏的!想知道如何彻底禁止涉密电脑插U盘?简单几招搞定,轻松锁死外部设备,信息安全无懈可击! 涉密电脑插U盘会不会被发现? 涉密电脑是否会在插入U盘时被发现,需要根据具体情况来判断。在一些情况下,涉密电脑可能没有安装任何监控软件或安全工具,插入U盘可能不会立即触发警告。然而,随着信息安全管理的不断升级,越来越多

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Apache-Flink深度解析-State

来源:https://dwz.cn/xrMCqbk5 Flink系列精华文章合集入门篇: Flink入门Flink DataSet&DataSteam APIFlink集群部署Flink重启策略Flink分布式缓存Flink重启策略Flink中的TimeFlink中的窗口Flink的时间戳和水印Flink广播变量Flink-Kafka-connetorFlink-Table&SQLFlink

Apache-Flink深度解析-Temporal-Table-JOIN

在《JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作流程如下:

Flink 原理与实现:Operator Chain原理

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇