本文主要是介绍【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- issue详情
- reader和writer的通道数不一致
- 获取JobId
- 代码分析
- #issue145
- 配置说明
- 源码分析:
- #issue148
最近准备再花点时间优化一下之前的FlinkX版本,特地去看了一下项目的issues区域,发现两个自己比较关注的issue。
issue详情
reader和writer的通道数不一致
- 异构数据源reader和writer设置不同的Parallelism数#145
这个issue是我之前提的。当时我在测试拉取mysql数据写入我司一个自研MQ时发现,当channel过小时写MQ会比较慢(写入过程是同步的),当channel跳大后没有提速,读取成为瓶颈,甚至比单channel更慢。
详见我更早之前的一个issue#143。
后来虽然通过其他方法将写入能力提升勉强达到可用状态,但一直想框架本身能支持设置不同的Parallelism数。
获取JobId
- flink on yarn获取jobid #148
这个就不用说了,现网程序大规模上线后肯定需要能获取获取job id做更精细的告警。
代码分析
#issue145
配置说明
目前版本已经支持,配置demo:
"speed": {"bytes": 1048576,"channel": 2,"rebalance": false,"readerChannel": 1,"writerChannel": 1
}
- channel:任务并发数
- readerChannel:reader的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为reader的并发数。
- writerChannel:writer的并发数,配置此参数时会覆盖channel配置的并发数,不配置或配置为-1时将使用channel配置的并发数作为writer的并发数。
- rebalance:此参数配置为true时将强制对reader的数据做Rebalance,不配置此参数或者配置为false时,程序会根据reader和writer的通道数选择是否Rebalance,reader和writer的通道数一致时不使用Reblance,通道数不一致时使用Reblance。
源码分析:
// com.dtstack.flinkx.Main
StreamExecutionEnvironment env = ……
……
// 设置全局并发
env.setParallelism(speedConfig.getChannel());
……
// 设置读并发
dataStream = ((DataStreamSource<Row>) dataStream).setParallelism(speedConfig.getReaderChannel());
// 强制Rebalance有助于数据均匀
if (speedConfig.isRebalance()) {dataStream = dataStream.rebalance();}
……
// 设置写并发
dataWriter.writeData(dataStream).setParallelism(speedConfig.getWriterChannel());
读写默认并发时是 -1。在flink中setParallelism(-1) 时就说使用系统当前的默认的并发
// com.dtstack.flinkx.config.SpeedConfig
public static final int DEFAULT_NUM_READER_WRITER_CHANNEL = -1;
public static final int PARALLELISM_DEFAULT = -1;/*** The flag value indicating an unknown or unset parallelism. This value is* not a valid parallelism and indicates that the parallelism should remain* unchanged.*/public static final int PARALLELISM_UNKNOWN = -2;
#issue148
官方回答:RichInputFormat.initJobInfo()里面可以拿到
实际上RichOutputForma中也可以获得,这里我写一个demo
// com.dtstack.flinkx.stream.writer.StreamOutputFormat
public class StreamOutputFormat extends BaseRichOutputFormat {……@Overrideprotected void initJobInfo() {Map<String, String> vars = context.getMetricGroup().getAllVariables();System.out.println("Metrics.JOB_ID:" + vars.get(Metrics.JOB_ID));
// super.initJobInfo();}
}
与RichFunction类似,Rich类可以拿到运行时的上下文,包括Job ID,Metric等
这篇关于【FlinkX】两个issue分析:reader和writer的通道数不一致+获取JobId的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!