【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId

2024-08-29 10:32

本文主要是介绍【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • issue详情
    • reader和writer的通道数不一致
    • 获取JobId
  • 代码分析
    • #issue145
      • 配置说明
      • 源码分析:
    • #issue148

最近准备再花点时间优化一下之前的FlinkX版本,特地去看了一下项目的issues区域,发现两个自己比较关注的issue。

issue详情

reader和writer的通道数不一致

  • 异构数据源reader和writer设置不同的Parallelism数#145

这个issue是我之前提的。当时我在测试拉取mysql数据写入我司一个自研MQ时发现,当channel过小时写MQ会比较慢(写入过程是同步的),当channel跳大后没有提速,读取成为瓶颈,甚至比单channel更慢。

详见我更早之前的一个issue#143。

后来虽然通过其他方法将写入能力提升勉强达到可用状态,但一直想框架本身能支持设置不同的Parallelism数。

获取JobId

  • flink on yarn获取jobid #148

这个就不用说了,现网程序大规模上线后肯定需要能获取获取job id做更精细的告警。

代码分析

#issue145

配置说明

目前版本已经支持,配置demo:

"speed": {"bytes": 1048576,"channel": 2,"rebalance": false,"readerChannel": 1,"writerChannel": 1
}
  • channel:任务并发数
  • readerChannel:reader的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为reader的并发数。
  • writerChannel:writer的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为writer的并发数。
  • rebalance:此参数配置为true时将强制对reader的数据做Rebalance,不配置此参数或者配置为false时,程序会根据reader和writer的通道数选择是否Rebalance,reader和writer的通道数一致时不使用Reblance,通道数不一致时使用Reblance。

源码分析:

// com.dtstack.flinkx.Main 
StreamExecutionEnvironment env = ……
……
// 设置全局并发
env.setParallelism(speedConfig.getChannel());
……
// 设置读并发
dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());
// 强制Rebalance有助于数据均匀
if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}
……
// 设置写并发
dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());

读写默认并发时是 -1。在flink中setParallelism(-1) 时就说使用系统当前的默认的并发

// com.dtstack.flinkx.config.SpeedConfig
public static final int DEFAULT_NUM_READER_WRITER_CHANNEL = -1;
	public static final int PARALLELISM_DEFAULT = -1;/*** The flag value indicating an unknown or unset parallelism. This value is* not a valid parallelism and indicates that the parallelism should remain* unchanged.*/public static final int PARALLELISM_UNKNOWN = -2;

#issue148

官方回答:RichInputFormat.initJobInfo()里面可以拿到
实际上RichOutputForma中也可以获得,这里我写一个demo

// com.dtstack.flinkx.stream.writer.StreamOutputFormat
public class StreamOutputFormat extends BaseRichOutputFormat {……@Overrideprotected void initJobInfo() {Map<String, String> vars = context.getMetricGroup().getAllVariables();System.out.println("Metrics.JOB_ID:" + vars.get(Metrics.JOB_ID));
//        super.initJobInfo();}
}

与RichFunction类似,Rich类可以拿到运行时的上下文,包括Job ID,Metric等

这篇关于【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1117609

相关文章

C#实现系统信息监控与获取功能

《C#实现系统信息监控与获取功能》在C#开发的众多应用场景中,获取系统信息以及监控用户操作有着广泛的用途,比如在系统性能优化工具中,需要实时读取CPU、GPU资源信息,本文将详细介绍如何使用C#来实现... 目录前言一、C# 监控键盘1. 原理与实现思路2. 代码实现二、读取 CPU、GPU 资源信息1.

在C#中获取端口号与系统信息的高效实践

《在C#中获取端口号与系统信息的高效实践》在现代软件开发中,尤其是系统管理、运维、监控和性能优化等场景中,了解计算机硬件和网络的状态至关重要,C#作为一种广泛应用的编程语言,提供了丰富的API来帮助开... 目录引言1. 获取端口号信息1.1 获取活动的 TCP 和 UDP 连接说明:应用场景:2. 获取硬

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

C#实现获取电脑中的端口号和硬件信息

《C#实现获取电脑中的端口号和硬件信息》这篇文章主要为大家详细介绍了C#实现获取电脑中的端口号和硬件信息的相关方法,文中的示例代码讲解详细,有需要的小伙伴可以参考一下... 我们经常在使用一个串口软件的时候,发现软件中的端口号并不是普通的COM1,而是带有硬件信息的。那么如果我们使用C#编写软件时候,如

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

C#实现WinForm控件焦点的获取与失去

《C#实现WinForm控件焦点的获取与失去》在一个数据输入表单中,当用户从一个文本框切换到另一个文本框时,需要准确地判断焦点的转移,以便进行数据验证、提示信息显示等操作,本文将探讨Winform控件... 目录前言获取焦点改变TabIndex属性值调用Focus方法失去焦点总结最后前言在一个数据输入表单

通过C#获取PDF中指定文本或所有文本的字体信息

《通过C#获取PDF中指定文本或所有文本的字体信息》在设计和出版行业中,字体的选择和使用对最终作品的质量有着重要影响,然而,有时我们可能会遇到包含未知字体的PDF文件,这使得我们无法准确地复制或修改文... 目录引言C# 获取PDF中指定文本的字体信息C# 获取PDF文档中用到的所有字体信息引言在设计和出

python中os.stat().st_size、os.path.getsize()获取文件大小

《python中os.stat().st_size、os.path.getsize()获取文件大小》本文介绍了使用os.stat()和os.path.getsize()函数获取文件大小,文中通过示例代... 目录一、os.stat().st_size二、os.path.getsize()三、函数封装一、os

Redis主从复制的原理分析

《Redis主从复制的原理分析》Redis主从复制通过将数据镜像到多个从节点,实现高可用性和扩展性,主从复制包括初次全量同步和增量同步两个阶段,为优化复制性能,可以采用AOF持久化、调整复制超时时间、... 目录Redis主从复制的原理主从复制概述配置主从复制数据同步过程复制一致性与延迟故障转移机制监控与维

Redis连接失败:客户端IP不在白名单中的问题分析与解决方案

《Redis连接失败:客户端IP不在白名单中的问题分析与解决方案》在现代分布式系统中,Redis作为一种高性能的内存数据库,被广泛应用于缓存、消息队列、会话存储等场景,然而,在实际使用过程中,我们可能... 目录一、问题背景二、错误分析1. 错误信息解读2. 根本原因三、解决方案1. 将客户端IP添加到Re