大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源

2024-09-03 06:12

本文主要是介绍大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节完成了如下的内容:

  • DataStreamAPI介绍
  • 基于文件、Socket、基于集合
  • 编写代码进行测试
  • Kafka连接器

在这里插入图片描述

非并行源

基本介绍

在 Apache Flink 中,非并行源(Non-Parallel Source)是一种特殊的源操作(Source Operator),它的最大并行度被限制为 1。这意味着,无论 Flink 集群中有多少个 Task Manager 和 Slot,该源操作都只能在一个并行实例中运行。这通常用于处理那些不适合并行化的任务或需要集中处理的工作。

主要特点

  • 单线程执行:非并行源只能在一个线程中执行,因此不会受益于并行化带来的性能提升。适合需要顺序处理或依赖全局状态的场景。
  • 全局状态管理:因为是单线程执行,非并行源可以方便地管理全局状态,而不需要像并行源那样处理多个并行实例间的状态同步问题。
  • 实现简单:对于某些简单的数据源,如单个文件读取器、时间戳生成器等,非并行源的实现相对简单,不需要处理复杂的并行和分片逻辑。

使用场景

  • 时间戳生成:当需要在流处理作业中引入事件时间(Event Time)时,可以使用一个非并行源来生成时间戳。
  • 控制输入:如从一个全局唯一的数据源(例如一个集中式消息队列)读取数据时,通常使用非并行源来确保顺序处理。
  • 测试与调试:在开发和调试阶段,非并行源可以用于生成简单的测试数据流。

示例代码

// 创建一个非并行的自定义源
public class MyNonParallelSource implements SourceFunction<String> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (isRunning) {ctx.collect("Non-Parallel Source Data");Thread.sleep(1000); // 模拟数据产生的延迟}}@Overridepublic void cancel() {isRunning = false;}
}// 在作业中使用非并行源
DataStream<String> stream = env.addSource(new MyNonParallelSource()).setParallelism(1);

在上述示例中,MyNonParallelSource 是一个简单的自定义非并行源,每秒生成一条字符串数据,并且通过 setParallelism(1) 明确指定其并行度为 1。

注意事项

  • 性能限制:由于非并行源仅在单个线程中执行,如果数据量较大或需要高吞吐量,可能成为系统的瓶颈。
  • 容错与恢复:Flink 提供了检查点机制(Checkpointing)来保证故障恢复时的状态一致性。在使用非并行源时,确保源的状态可以在故障恢复时正确重放。

NoParallelSource

package icu.wzk;import org.apache.flink.streaming.api.functions.source.SourceFunction;public class NoParallelSource implements SourceFunction<String> {private Long count = 1L;private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {count ++;ctx.collect(String.valueOf(count));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

NoParallelSourceTest

package icu.wzk;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;public class NoParallelSourceTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.getJavaEnv().addSource(new NoParallelSource());data.print();env.execute("NoParallelSourceTest");}}

运行结果

3> 2
4> 3
5> 4
6> 5
7> 6
8> 7
1> 8
2> 9
3> 10
4> 11
5> 12
6> 13
7> 14

运行过程的截图如下所示:
在这里插入图片描述

并行源

基本介绍

在 Apache Flink 中,并行源(Parallel Source)是一种可以在多个并行实例中运行的数据源操作。这种源操作允许通过分配多个任务槽(Task Slot)来并行地读取数据,从而提高数据处理的吞吐量和性能。与非并行源相比,并行源更适合处理大规模、可分割的数据源,如分布式文件系统、消息队列、数据库分片等。

主要特点

  • 多实例执行:并行源可以通过多个并行实例执行,每个实例处理源数据的一个分片。这种架构允许利用集群中的多个计算资源,从而大大提高数据处理能力。
  • 分片处理:并行源通常会将数据源分成多个分片(shard)或分区(partition),每个分片由不同的并行实例处理。这样可以将大量的数据分摊到多个并行实例上,实现更高的处理效率。
  • 状态管理:每个并行实例通常会管理自己的状态,而不是像非并行源那样管理全局状态。Flink 提供了状态后端和检查点机制,帮助管理和恢复并行源的状态。
  • 横向扩展:由于并行源可以在多个实例中运行,因此随着集群资源的增加(例如增加 Task Manager 和 Slot 的数量),并行源的处理能力也会随之增加。

使用场景

  • 分布式文件系统读取:从 HDFS、S3 等分布式文件系统中读取数据时,通常使用并行源将文件分块并分配给不同的并行实例处理。
  • 消息队列消费:从 Kafka、RabbitMQ 等消息队列中消费消息时,通常使用并行源来同时处理多个分区的数据。
  • 数据库读取:当从分片数据库(例如 MySQL 分片、Cassandra 等)读取数据时,使用并行源可以让多个实例并行读取不同分片的数据。

示例代码

Flink 提供了一些内置的并行源,例如 KafkaSource、Flink’s FileSource 等,这里以 KafkaSource 为例:

// 使用 Flink 内置的 Kafka Source
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink-group");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic-name",new SimpleStringSchema(),properties
);// 设置 Kafka Source 的并行度
DataStream<String> stream = env.addSource(kafkaSource).setParallelism(4);

注意事项

  • 数据分区一致性:在使用并行源时,需要确保数据源可以合理分区,并且每个并行实例只处理其分配的分区数据,避免数据重复处理或遗漏。
  • 状态恢复:当并行源需要保存状态时,确保状态的正确管理,以便在故障恢复时可以正确地恢复各个并行实例的状态。
  • 负载均衡:确保各个并行实例间的负载均衡,避免某些实例过载,而其他实例闲置。

ParallelSource

package icu.wzk;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;public class ParallelSource implements ParallelSourceFunction<String> {private long count = 1L;private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {count ++;ctx.collect(String.valueOf(count));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

ParallesSourceTest

package icu.wzk;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;public class ParallelSourceTest {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.getJavaEnv().addSource(new ParallelSource());data.print();env.execute("ParallelSourceTest");}}

运行结果

可以看到运行的速度是非常快的

4> 2
5> 2
1> 2
2> 2
8> 2
3> 2
6> 2
7> 2
6> 3
5> 3
8> 3
7> 3
4> 3
3> 3
2> 3
1> 3
6> 4

运行的对应的截图如下所示:
在这里插入图片描述

这篇关于大数据-113 Flink DataStreamAPI 程序输入源 自定义输入源 非并行源与并行源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1132253

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

【测试】输入正确用户名和密码,点击登录没有响应的可能性原因

目录 一、前端问题 1. 界面交互问题 2. 输入数据校验问题 二、网络问题 1. 网络连接中断 2. 代理设置问题 三、后端问题 1. 服务器故障 2. 数据库问题 3. 权限问题: 四、其他问题 1. 缓存问题 2. 第三方服务问题 3. 配置问题 一、前端问题 1. 界面交互问题 登录按钮的点击事件未正确绑定,导致点击后无法触发登录操作。 页面可能存在