《十堂课学习 Flink》第九章:Flink Stream 的实战案例一:CPU 平均使用率监控告警案例

本文主要是介绍《十堂课学习 Flink》第九章:Flink Stream 的实战案例一:CPU 平均使用率监控告警案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

9.1 本章概述

本章的所有需求、设计、开发仅是模拟真实业务场景,因为实际业务需求、现场环境更加复杂,并且考虑到本系列课程本身就偏向于基础内容,因此这里我们对自己假设的业务场景进行设计与开发,整个流程虽然简单,但涉及到的内容较多,通过这个案例可以初步了解整个flink流计算开发案例的基本过程。

9.2 需求描述

这个案例中,我们假设,需要对 多个机器集群 进行监控,即定期采样这 若干个集群 中的每台机器的CPU使用率,并将采样结果写入 kafka 。我们的任务是开发一个 flink 项目,监听 kafka 作为输入数据,并且满足 特定条件 时进行告警。

9.2.1 通讯流程

在这里插入图片描述
在这里插入图片描述

9.2.2 通讯协议 —— 采样数据

我们需要与负责采样的上游约定通讯协议,按照指定的 json 格式进行通讯。上游按照指定的格式将样本数据写入 kafka 后,flink 根据kafka消息进行聚合、触发检测、返回告警结果等。

{"taskId": "39xr4d2dnb9x72d","clusterId": "49xrt","itemId": "38fx2d","clusterSize": 4,"currentIndex": 0,"data": {"timestamp": 1715003462,"value": 0.43},"thresholdConfig": {"cpuUsageThresholdAverage": 0.93,"cpuUsageThresholdMax": 0.99}
}

其中,

  • taskId 是每个集群检测任务的唯一标识,同一个集群中的不同机器具有相同taskId,同一个集群中同一台机器在不同时间的采样结果对应的样本编号均不同。在排查问题时可以根据这个定位到哪条消息有问题等。
  • clusterId 是指集群编号,同一个集群中的不同机器具有相同的集群编号,集群编号不因为采样时间而改变。
  • itemId:集群中某机器的唯一标识;
  • clusterSize:集群大小,即集群中含有多少台机器。
  • currentIndex:当前消息在集群中的索引,从0开始。比如一个集群中共 4 台机器,索引分别为 0, 1, 2, 3。由于 kafka 集群、flink 集群等环境原因,接收到 kafka 的消息可能不会严格按照索引从小到大的顺序。
  • data:采样实体类,即某个时刻采样得到CPU使用率的值
    • timestamp: 采样时的时间戳(10位,精度为秒)
    • value:采样的值,即CPU使用率。
  • thresholdConfig:检查时的触发风险阈值,因为不同的业务场景下不同集群可能导致 CPU 使用率情况不同,所以应该对不同集群有相应的触发配置。
    • cpuUsageThresholdAverage:集群中平均CPU使用率阈值;
    • cpuUsageThresholdMax:集群中单个CPU使用率阈值。

9.2.3 通讯协议 —— 结果数据

经过检测以后,算法返回对集群的检查结果,检查规则我们在后面介绍。这里只定义大致的检查结果实体结构:

{"taskId": "xejcfl34w23mfs""clusterId": "49xrt","results": {"code": 0,"message": "success","data": {"riskType": 0,"timestamp": 1715003462,"riskItems": []}}
}

其中,

  • taskId:任务唯一编号,与接收到任务编号保持一致。
  • clusterId:集群编号。
  • results:结果实体类
    • code:结果通讯码,成功为 0 ,样本数据缺失为1,执行过程未知异常为 -1。
    • message:结果消息说明。
    • data:结果实体类
      • riskType:结果类型;
      • timestamp:检查数据的采样时间;
      • riskItems:有风险的机器编号链表,即输入数据中的 itemId

9.2.4 风险检查规则

对 CPU集群 进行检测,检测规则包括

    1. 不满足以下条件,无风险
    1. 集群中 CPU 采样数据是否存在缺失,比如 CPU 1 采集到了,CPU 2 采集不到
    1. 集群中 CPU 平均使用率是否超过阈值(阈值在接收数据中配置)
    1. 集群中单个CPU使用率是否超过阈值(阈值在接收数据中配置)

9.3 开发过程设计

首先这里引入相关依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.smileyan.demos</groupId><artifactId>flink-cpu-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.binary.version>2.12</scala.binary.version><lombok.version>1.18.30</lombok.version><flink.version>1.14.6</flink.version><slf4j.version>2.0.9</slf4j.version><logback.version>1.3.14</logback.version></properties><dependencies><!-- flink 相关 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.45</version></dependency><!-- 编译工具 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- log 相关 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>${logback.version}</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>${logback.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>ch.qos.logback:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude><exclude>logback.xml</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

9.3.1 main 方法入口

这里是总体流程,我们大概可以分为3个过程:

  1. 初始化kafka连接(包括source与sink对象)。
  2. 编写收集kafka数据以及窗口化过程,前面需求中有提到我们检测的是集群中各机器的CPU使用率情况。因此可能接收同个集群中的多条消息,需要进行窗口化聚合,进而进行检测。
  3. 检测过程,根据实际数据表现对风险等级进行评分。
  4. 返回风险结果到 kafka 中。
package cn.smileyan.demos;import cn.smileyan.demos.core.CpuCheckMapFunction;
import cn.smileyan.demos.core.TaskProcessingFunction;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskInput;
import cn.smileyan.demos.entity.TaskOutput;
import cn.smileyan.demos.core.CountAndTimeTrigger;
import cn.smileyan.demos.io.KafkaArgsBuilder;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;import java.util.Objects;/*** flink 任务入口* @author smileyan*/
public class CpuCheckJob {/*** 参数解释:*  -bs broker 地址 localhost:9092*  -kcg kafka consumer group*  -it kafka 输入数据 topic test-input-topic*  -ot kafka 输出数据 topic test-output-topic*  -ct 可选,是否自动创建 topic,使用方法 添加  -ct 即可,无需指定其值*  -pt topic 可选,分区数 1*  -rf topic 可选,副本数 1*  example:*  -bs localhost:9092 -it test-input-topic -ot test-output-topic -pt 1 -rf 1 -ct*/public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);final KafkaSource<TaskInput> source = kafkaArgsBuilder.buildSource(TaskInput.class);final KafkaSink<TaskOutput> kafkaSink = kafkaArgsBuilder.buildSink(TaskOutput.class);final long gapSeconds = 10L;final DynamicEventTimeSessionWindows<TaskInput> dynamicWindow = DynamicEventTimeSessionWindows.withDynamicGap((SessionWindowTimeGapExtractor<TaskInput>) element -> gapSeconds * element.getClusterSize());final DataStreamSource<TaskInput> dataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");SingleOutputStreamOperator<TaskClusterData> mergedTaskData = dataStreamSource.filter(Objects::nonNull).keyBy(TaskInput::getTaskId).window(dynamicWindow).trigger(new CountAndTimeTrigger<>()).process(new TaskProcessingFunction()).name("taskProcessing");SingleOutputStreamOperator<TaskOutput> resultData = mergedTaskData.filter(Objects::nonNull).map(new CpuCheckMapFunction()).name("cpu usage check");resultData.sinkTo(kafkaSink);env.execute("Flink Kafka Example");}}

9.3.2 编写 kafka 序列化与反序列化类

package cn.smileyan.demos.io;import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;import java.nio.charset.StandardCharsets;/*** 将字节码数据进行序列化,以及将实体类转换* @author smileyan* @param <O> 实体类*/
@Slf4j
public class CommonEntitySchema<O> implements DeserializationSchema<O>, SerializationSchema<O> {private final Class<O> clazz;public CommonEntitySchema(Class<O> clazz) {this.clazz = clazz;}@Overridepublic O deserialize(byte[] message) {try {String str = new String(message, StandardCharsets.UTF_8);log.info("kafka received message: {}", str);return JSON.parseObject(str, clazz);} catch (Exception e) {log.error(e.getMessage());}return null;}@Overridepublic boolean isEndOfStream(O nextElement) {return false;}@Overridepublic TypeInformation<O> getProducedType() {return TypeInformation.of(clazz);}@Overridepublic byte[] serialize(O element) {return JSON.toJSONBytes(element);}
}

9.3.3 编写 kafka 的 source 与 sink 构建器

package cn.smileyan.demos.io;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/**
* 通过参数构建通用的 kafka 通讯序列化与反序列化实体
* @author smileyan
*/
@Slf4j
public class KafkaArgsBuilder {/*** 构建参数*/private final MultipleParameterTool parameterTool;public KafkaArgsBuilder(String[] args) {parameterTool = MultipleParameterTool.fromArgs(args);}/*** 构建kafka sink* @param clazz 实体类class* @param <E> 实体类泛型* @return kafka sink 对象*/public <E> KafkaSink<E> buildSink(Class<E> clazz) {final String bs = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);final String ot = parameterTool.getRequired(KafkaArgs.OUTPUT_TOPIC.key);return KafkaSink.<E>builder().setBootstrapServers(bs).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(ot).setValueSerializationSchema(new CommonEntitySchema<>(clazz)).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();}/*** 构建kafka source* @param clazz 实体类class* @param <E> 实体类泛型* @return kafka source 对象*/public <E> KafkaSource<E> buildSource(Class<E> clazz) throws ExecutionException, InterruptedException {final String kafkaConsumerGroup = parameterTool.getRequired(KafkaArgs.KAFKA_CONSUMER_GROUP.key);final String bootstrapServer = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);final String inputTopic = parameterTool.getRequired(KafkaArgs.INPUT_TOPIC.key);final boolean createTopic = parameterTool.has(KafkaArgs.CREATE_TOPIC.key);if (createTopic) {final int partition = parameterTool.getInt(KafkaArgs.CREATE_TOPIC_PARTITION.key, 1);final short replicationFactor = parameterTool.getShort(KafkaArgs.REPLICATION_FACTOR.key, (short) 1);createTopic(bootstrapServer, inputTopic, partition, replicationFactor);}return KafkaSource.<E>builder().setGroupId(kafkaConsumerGroup).setStartingOffsets(OffsetsInitializer.latest()).setBootstrapServers(bootstrapServer).setTopics(inputTopic).setValueOnlyDeserializer(new CommonEntitySchema<>(clazz)).build();}public enum KafkaArgs {/** kafka 服务地址*/BOOTSTRAP_SERVER("bs"),/** kafka 消费者组*/KAFKA_CONSUMER_GROUP("kcg"),/** kafka 输入主题*/INPUT_TOPIC("it"),/** kafka 输出主题*/OUTPUT_TOPIC("ot"),/** 是否自动创建主题*/CREATE_TOPIC("ct"),/** 分区数*/CREATE_TOPIC_PARTITION("pt"),/** 副本数*/REPLICATION_FACTOR("rf");private final String key;KafkaArgs(String key) {this.key = key;}}/*** 如果 TOPIC 不存在则创建该 TOPIC* @param bootstrapServer kafka broker 地址* @param topic 想要创建的 TOPIC* @param partitions 并行度* @param replicationFactor 副本数*/public static void createTopic(String bootstrapServer,String topic,int partitions,int replicationFactor) throws ExecutionException, InterruptedException {Properties adminProperties = new Properties();adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);try (AdminClient adminClient = AdminClient.create(adminProperties)) {if (!adminClient.listTopics().names().get().contains(topic)) {NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);adminClient.createTopics(Collections.singletonList(newTopic)).all().get();log.info("created topic: {}", topic);}}}
}

9.3.4 自定义 window 触发器

这里我们不使用默认的触发器,而是自定义一个更加方便的触发器。当接收到相同 taskId 的数据时,我们需要确定,什么时候确定接收完成,并触发检测过程。

需要注意的地方包括:

  1. 消息接收完成就触发。比如一个集群中总共有4台机器,当接收到这个4台机器的样本数据时,就应该触发检测过程。
  2. 消息接收到一半突然终止。比如一个集群中总共有4台机器,接收到3台机器的采样数据以后,等了很久没有收到第四条消息。等待超时后触发检查。
package cn.smileyan.demos.core;import cn.smileyan.demos.entity.TaskInput;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** 自定义 window 触发器* @author smileyan*/
@Slf4j
public class CountAndTimeTrigger<T extends TaskInput, W extends TimeWindow> extends Trigger<T, W> {/*** ReducingStateDescriptor 的 key 字段,上下文根据这个字段获取状态指*/private static final String COUNT_KEY = "count";/*** ReducingStateDescriptor 根据聚合过程更新 count 结果*/private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>(COUNT_KEY, new Sum(), LongSerializer.INSTANCE);@Overridepublic TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {ctx.registerEventTimeTimer(window.getEnd());ctx.registerProcessingTimeTimer(window.getEnd());final int size = element.getClusterSize();final String id = element.getTaskId();ReducingState<Long> count = ctx.getPartitionedState(stateDesc);count.add(1L);log.info("[{}] window: ({}, {}) -> merged {}", ctx.getCurrentWatermark(), window.getStart(), window.getEnd(), count.get());if (count.get().intValue() == size) {log.info("[{} -> {}] merged successfully.", id, ctx.getCurrentWatermark());clear(window, ctx);ctx.getPartitionedState(stateDesc).clear();return TriggerResult.FIRE_AND_PURGE;} else if (count.get() > size) {log.warn("[{} -> {}] sent more than need {}", id, ctx.getCurrentWatermark(), size);return TriggerResult.PURGE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {if (time >= window.getEnd()) {log.debug("[ -> {}] onProcessingTime", ctx.getCurrentWatermark());return TriggerResult.FIRE_AND_PURGE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {if (time >= window.getEnd()) {log.debug("[ -> {}] onEventTime", ctx.getCurrentWatermark());return TriggerResult.FIRE_AND_PURGE;}return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {log.debug("[ -> {}] cleaning window ({}, {})", ctx.getCurrentWatermark(), window.getStart(), window.getEnd());ctx.deleteEventTimeTimer(window.getEnd());ctx.deleteProcessingTimeTimer(window.getEnd());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {log.debug("[ -> {}] onMerge ({}, {})", ctx.getCurrentWatermark(), window.getStart(), window.getEnd());ctx.mergePartitionedState(stateDesc);}private static class Sum implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}}
}

此处有两个地方需要额外强调:

  1. 必须重写 canMerge 与 onMerge 方法。
  2. 必须使用ReducingStateDescriptor状态描述器而不是对象的属性。这里牵扯到 flink 的特性,比如4条消息每到来一次,对应的应该是再次创建 CountAndTimeTrigger 对象进行 trigger 检查,并根据检查结果决定是否触发风险。

9.3.5 将集群中每台机器的采样结果进行合并

package cn.smileyan.demos.core;import cn.smileyan.demos.entity.CpuDataItem;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskInput;
import cn.smileyan.demos.entity.ThresholdConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;/*** 合并任务数据为集群数据* @author smileyan*/
@Slf4j
public class TaskProcessingFunction extends ProcessWindowFunction<TaskInput, TaskClusterData, String, TimeWindow> {@Overridepublic void process(String key,ProcessWindowFunction<TaskInput, TaskClusterData, String, TimeWindow>.Context context,Iterable<TaskInput> elements,Collector<TaskClusterData> out) throws Exception {log.info("[{}] starting merge processing", key);final List<CpuDataItem> cpuDataItems = new LinkedList<>();Iterator<TaskInput> inputIterator = elements.iterator();TaskInput first = inputIterator.next();cpuDataItems.add(new CpuDataItem(first));String clusterId = first.getClusterId();String taskId = first.getTaskId();Integer clusterSize = first.getClusterSize();ThresholdConfig thresholdConfig = first.getThresholdConfig();while(inputIterator.hasNext()) {cpuDataItems.add(new CpuDataItem(inputIterator.next()));}log.info("[{}] finished merge processing", key);out.collect(new TaskClusterData(taskId, clusterId, clusterSize, thresholdConfig, cpuDataItems));}
}

9.3.6 最核心的风险检查过程

package cn.smileyan.demos.core;import cn.smileyan.demos.entity.CpuDataItem;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskOutput;
import cn.smileyan.demos.entity.TaskResult;
import cn.smileyan.demos.entity.TaskResultData;
import org.apache.flink.api.common.functions.MapFunction;import java.util.LinkedList;
import java.util.List;
import java.util.OptionalDouble;
import java.util.stream.Collectors;/*** 对 CPU集群 进行检测,检测规则包括*    0. 不满足以下条件,无风险*    1. 集群中 CPU 采样数据是否存在缺失,比如 CPU 1 采集到了,CPU 2 采集不到*    2. 集群中 CPU 平均使用率是否超过阈值(阈值在接收数据中配置)*    3. 集群中单个CPU使用率是否超过阈值(阈值在接收数据中配置)* @author smileyan*/
public class CpuCheckMapFunction implements MapFunction<TaskClusterData, TaskOutput> {@Overridepublic TaskOutput map(TaskClusterData taskClusterData) {TaskOutput taskOutput = new TaskOutput();taskOutput.setTaskId(taskClusterData.getTaskId());taskOutput.setClusterId(taskClusterData.getClusterId());TaskResult taskResult = new TaskResult();taskOutput.setResults(taskResult);TaskResultData taskResultData = new TaskResultData();taskResultData.setTimestamp(taskClusterData.getCpuDataItems().get(0).getTimestamp());List<String> items = taskClusterData.getCpuDataItems().stream().map(CpuDataItem::getItemId).collect(Collectors.toList());/** 1. 集群中 CPU 采样数据是否存在缺失,比如 CPU 1 采集到了,CPU 2 采集不到*/if (taskClusterData.getClusterSize() != taskClusterData.getCpuDataItems().size()) {taskResult.setCode(ResultCodeEnum.MISSING.getCode());taskResult.setMessage(ResultCodeEnum.MISSING.getMessage());return taskOutput;}taskResult.setCode(ResultCodeEnum.SUCCESS.getCode());taskResult.setMessage(ResultCodeEnum.SUCCESS.getMessage());taskResultData.setRiskType(RiskTypeEnum.NONE.getValue());/** 2. 集群中 CPU 平均使用率是否超过阈值(阈值在接收数据中配置)*/OptionalDouble average = taskClusterData.getCpuDataItems().stream().mapToDouble(CpuDataItem::getValue).average();if (average.isPresent()) {if (average.getAsDouble() > taskClusterData.getThresholdConfig().getCpuUsageThresholdAverage()) {taskResultData.setRiskItems(items);taskResultData.setRiskType(RiskTypeEnum.CPU_USAGE_AVERAGE.getValue());return taskOutput;}} else {taskResult.setCode(ResultCodeEnum.UNKNOWN_ERROR.getCode());taskResult.setMessage(ResultCodeEnum.UNKNOWN_ERROR.getMessage());return taskOutput;}// 3. 集群中单个CPU使用率是否超过阈值(阈值在接收数据中配置)List<String> riskItems = new LinkedList<>();for (CpuDataItem cpuDataItem : taskClusterData.getCpuDataItems()) {if (cpuDataItem.getValue() > taskClusterData.getThresholdConfig().getCpuUsageThresholdMax()) {riskItems.add(cpuDataItem.getItemId());}}if (!riskItems.isEmpty()) {taskResultData.setRiskItems(riskItems);taskResultData.setRiskType(RiskTypeEnum.CPU_USAGE_MAX.getValue());return taskOutput;}return taskOutput;}
}

9.3.7 其他部分代码未展示

由于篇幅问题,这里省略了一部分不那么重要的代码,具体内容请参考 https://gitee.com/smile-yan/flink-cpu-demo

9.4 编写测试

9.4.1 测试数据准备

我们准备了一些测试数据,主要包括以下几种情况:

  1. 数据 size 不够聚合,比如期望达到 4 条消息时,才进行合并并检测;
  2. 数据 size 足够聚合,并且数据正常;
  3. 数据 size 足够聚合,但存在个别数据有风险;
  4. 数据 size 足够聚合,所有数据都存在风险。

具体内容请参考我的开源地址:https://gitee.com/smile-yan/flink-cpu-demo

9.4.2 测试脚本准备

我们需要将前面准备的 json 数据按照顺序写入 kafka ,以触发数据的聚合以及检测。

在这里插入图片描述

这里我们准备了一份python 脚本,将 json 文件写入 kafka 中。

import os
import sysfrom kafka import KafkaProducer
import jsonif __name__ == '__main__':kafka_broker, kafka_topic = sys.argv[1], sys.argv[2]files_dir = sys.argv[3]producer = KafkaProducer(bootstrap_servers=kafka_broker, value_serializer=lambda v: json.dumps(v).encode('utf-8'))files = os.listdir(files_dir)for file in files:if file.endswith(".json"):with open(f'{files_dir}/{file}', 'r') as f:data = json.load(f)producer.send(kafka_topic, data)producer.flush()producer.close()

注意,这里我们考虑到不同用户的存放数据的位置不同、绝对路径不同,将存放数据的路径、kafka 的相关参数以参数的形式存放在 main 方法的参数中,其中:

  • 参数 1:kafka 的服务地址,比如 localhost:9092
  • 参数 2:kafka 的通讯 topic,也就是 flink 任务监听的 topic。比如 input-test-data
  • 参数 3:json 文件所在文件夹,前面有提到四个场景中存放在四个文件夹中。比如 /Users/smileyan/me/flink-cpu-demo/scripts/normal

9.4.3 运行 flink 任务

运行时需要注意启动参数:

-bs
localhost:9092
-kcg
flink-consumer
-it
test-input-topic
-ot
test-output-topic
-ct

在这里插入图片描述
运行时还需要添加
在这里插入图片描述

在这里插入图片描述

9.5 运行效果展示

9.5.1 启动 kafka 过程

这个过程不再重复介绍,本地启动 kafka 两行命令即可。

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

9.5.2 运行 flink 与 kafka 过程录屏

flink-kafka检测CPU使用率案例

这里案例中,我们通过脚本将每个文件夹中的文件发送到 kafka 中,触发检测过程。

在这里插入图片描述

9.5.3 源码地址

https://gitee.com/smile-yan/flink-cpu-demo

除了 java 源码,还包括生成测试数据的脚本,以及发送数据的脚本,以及已经生成的数据文件夹。

9.5 总结

本章内容提供了一个非常简单的Flink Stream 计算案例,涉及内容包括:kafka 通讯,flink 的动态时间窗口,自定义窗口聚合条件以及 flink 的窗口聚合处理等。此外,本章通过录屏的方式验证整个项目运行正常。

希望作为 flink 的初学者能够提供一个简单案例。感谢各位小伙伴们的支持 ~ 共勉 ~

如果认为本章节写得还行,一定记得点击下方免费的赞 ~ 感谢 !
在这里插入图片描述

这篇关于《十堂课学习 Flink》第九章:Flink Stream 的实战案例一:CPU 平均使用率监控告警案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/982841

相关文章

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

使用Navicat工具比对两个数据库所有表结构的差异案例详解

《使用Navicat工具比对两个数据库所有表结构的差异案例详解》:本文主要介绍如何使用Navicat工具对比两个数据库test_old和test_new,并生成相应的DDLSQL语句,以便将te... 目录概要案例一、如图两个数据库test_old和test_new进行比较:二、开始比较总结概要公司存在多

在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程

《在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程》本文介绍了在Java中使用ModelMapper库简化Shapefile属性转JavaBean的过程,对比... 目录前言一、原始的处理办法1、使用Set方法来转换2、使用构造方法转换二、基于ModelMapper

Java实战之自助进行多张图片合成拼接

《Java实战之自助进行多张图片合成拼接》在当今数字化时代,图像处理技术在各个领域都发挥着至关重要的作用,本文为大家详细介绍了如何使用Java实现多张图片合成拼接,需要的可以了解下... 目录前言一、图片合成需求描述二、图片合成设计与实现1、编程语言2、基础数据准备3、图片合成流程4、图片合成实现三、总结前

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装

MySQL的cpu使用率100%的问题排查流程

《MySQL的cpu使用率100%的问题排查流程》线上mysql服务器经常性出现cpu使用率100%的告警,因此本文整理一下排查该问题的常规流程,文中通过代码示例讲解的非常详细,对大家的学习或工作有一... 目录1. 确认CPU占用来源2. 实时分析mysql活动3. 分析慢查询与执行计划4. 检查索引与表

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

nginx-rtmp-module构建流媒体直播服务器实战指南

《nginx-rtmp-module构建流媒体直播服务器实战指南》本文主要介绍了nginx-rtmp-module构建流媒体直播服务器实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. RTMP协议介绍与应用RTMP协议的原理RTMP协议的应用RTMP与现代流媒体技术的关系2