本文主要是介绍是谁还不会flink的checkpoint呀~,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、State Vs Checkpoint
State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。
Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息
一句话概括:
Checkpoint就是State的快照
目的:假设作业停止了,下次启动的时候可以加载快照中的状态数据,以此到达恢复数据的目的。
State backend(状态后端):状态后端用于控制状态保存的位置的。
2、设置Checkpoint
package com.demo.day4;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.HashMap;
import java.util.Map;public class CheckpointDemo03 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//设置checkpointenv.enableCheckpointing(1000);// 默认ck是关闭,这行代码开启checkpoint,并且控制每隔多久进行一次ckenv.setStateBackend(new FsStateBackend("hdfs://hdfs-cluster/flink-checkpoint"));CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092").setTopics("topic1").setValueOnlyDeserializer(new SimpleStringSchema()).setGroupId("g1").setStartingOffsets(OffsetsInitializer.latest()).build();DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source");DataStream<Tuple2<String, Long>> tupleDS = ds.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return Tuple2.of(arr[0],Long.parseLong(arr[1]));}});DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {private ValueState<Long> maxValueState = null;//-2.初始化状态(一次)@Overridepublic void open(Configuration parameters) throws Exception {//创建状态描述器ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("maxValueState", Long.class);//根据状态描述器获取状态maxValueState = getRuntimeContext().getState(stateDescriptor);}//-3.使用状态@Overridepublic Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {Long currentValue = value.f1;//当前值Long maxValue = maxValueState.value();//历史最大值if (maxValue == null || currentValue > maxValue) {maxValue = currentValue;//当前值作为最大值//-4.更新状态maxValueState.update(maxValue);}return Tuple3.of(value.f0, currentValue, maxValue);}});result2.print();env.execute();}
}
上传jar包到hdfs:
运行:
tijiao.sh:
#! /bin/bashflink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-c com.demo.day4.CheckpointDemo03 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
运行结果:
测试:
停止job:
再输入测试数据:
再次运行(加入检查点/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416):
tijiao.sh:
#! /bin/bashflink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-s hdfs://hdfs-cluster/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416 \
-c com.demo.day4.CheckpointDemo03 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
运行成功:
查看(关闭任务后的数据可以成功显示):
再次测试:
3、设置savepoint
checkpoint自动完成state快照、savePoint是手动的完成快照。
与checkpoint类似,将jar包上传到hdfs
执行job:
#! /bin/bashflink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-c com.demo.day4.CheckpointDemo03 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
查看此刻的yarnid(application_1725272814044_0002)以及jobid(f28c53d5ccf838c8d5e7ca7b8c45e5b0)
测试:
执行savepoint操作(2种方法)
停止flink job,并且触发savepoint操作
flink stop --savepointPath hdfs://hdfs-cluster/flink-savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002不会停止flink的job,只是完成savepoint操作
语法:flink savepoint jobId savePointpath -yid yarn applicationId flink savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 hdfs://hdfs-cluster/flink-savepoint -yid application_1725272814044_0002
运行第一种:
flink stop --savepointPath hdfs://hdfs-cluster/flink-savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002
关闭job,再次测试:
查看最近完成的flink job对应的savepoint,进行快照恢复:
根据之前的savepoint路径,重新启动flink job
#! /bin/bashflink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-c com.demo.day4.CheckpointDemo03 \
-s hdfs://hdfs-cluster/flink-savepoint/savepoint-f28c53-23e87194a6b2 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
再次查看(中断时输入的数据):
再次输入数据:
4、Dinkey--checkpoint
set 'execution.checkpointing.interval'='2sec';
set 'state.checkpoints.dir'='hdfs://hdfs-cluster/flink-checkpoint';
set 'execution.checkpointing.externalized-checkpoint-retention'='RETAIN_ON_CANCELLATION';CREATE TABLE table1 (actionTime BIGINT,monitorId string,cameraId string,car string,speed double,roadId string,areaId string,event_time as proctime() -- 计算列(时间类型为timestamp)
) WITH ('connector' = 'kafka','topic' = 'topic1','properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092','properties.group.id' = 'g1','scan.startup.mode' = 'latest-offset','format' = 'json'
);create table table2(id int,start_time TIMESTAMP,end_time TIMESTAMP,monitor_id string,avg_speed double,car_count BIGINT
)WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://hadoop11:3306/cars','table-name' = 't_average_speed','username' = 'root','password' = '123456');insert into table2
SELECT CAST(null as int) as id,window_start as start_time,window_end as end_time,monitorId as monitor_id,AVG(speed) as avg_speed,COUNT(DISTINCT car) as car_count
from table(HOP(table table1,DESCRIPTOR(event_time),INTERVAL '30' SECONDS,INTERVAL '60' SECONDS))
group by monitorId,window_start,window_end;
这篇关于是谁还不会flink的checkpoint呀~的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!