Kettle-ActiveMQ Consumer插件开发笔记

2024-04-29 08:32

本文主要是介绍Kettle-ActiveMQ Consumer插件开发笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ActiveMQ Consumer插件开发笔记

源代码

https://github.com/tangwenixng/soyuan-activemq-plugin

概览

前提

该插件基于kettle 8.1.0.0-365 开发

如果是其他版本,不保证可用。(由于继承的BaseStreamingDialog等父类会随版本而变化)

本插件模仿官方Kafka插件源码编写:

https://github.com/pentaho/big-data-plugin/tree/master/kettle-plugins/kafka

暂不支持topic,需要的可自行修改源码(工程量应该不大)。

必备模板

首先必须创建的4个类:

  • ActiveMQConsumer extends BaseStreamStep implements StepInterface
  • ActiveMQConsumerData extends TransExecutorData implements StepDataInterface
  • ActiveMQConsumerDialog extends BaseStreamingDialog implements StepDialogInterface
  • ActiveMQConsumerMeta extends BaseStreamStepMeta implements StepMetaInterface

注意这4个类继承的父类比较特殊,不同于一般的步骤插件继承的是BaseStep***

然后创建多语言(资源)配置文件:结构如下图所示

接下来将分别说明刚刚列举的4个类。

ActiveMQConsumerMeta

ActiveMQConsumerMeta是非常重要的一个类。

  1. 可视化Dialog里看到的属性值(比如: Text框框)在点击了确认按钮时会保存到ActiveMQConsumerMeta中对应的成员变量的。当第一次打开步骤界面Dialog时(即open方法时-后面会讲到),也是从ActiveMQConsumerMeta中读取成员变量赋值到Text框框中。
  2. 当在Kettle编辑界面点击了保存Save按钮时,会将ActiveMQConsumerMeta中的属性通过getXML()方法写入到文件(ktr)中。当点击运行按钮时,kettle会调用loadXML()将ktr文件内容读取到ActiveMQConsumerMeta成员变量中。同理readRep和saveRep。

上面介绍了Meta类的主要工作,接着具体说明下代码中需要注意的点:

Step注解

@Step(id = "ActiveMQConsumer",name = "ActiveMQConsumer.TypeLongDesc",description = "ActiveMQConsumer.TypeTooltipDesc",image = "com/soyuan/steps/activemq/resources/activemq.svg",categoryDescription = "i18n:org.pentaho.di.trans.step:BaseStep.Category.Streaming",i18nPackageName = "com.soyuan.steps.activemq",documentationUrl = "ActiveMQConsumer.DocumentationURL",casesUrl = "ActiveMQConsumer.CasesURL",forumUrl = "ActiveMQConsumer.ForumURL"
)
@InjectionSupported(localizationPrefix = "ActiveMQConsumerMeta.Injection.")

@step注解是定义步骤的规范,kettle会自动扫描此注解,并将它注入到插件容器内。

  • id必须是全局唯一的
  • name: 也就是我们在可视化界面中看到的插件名字。后面跟的ActiveMQConsumer.TypeLongDesc指向的是配置文件properties中的属性
  • @InjectionSupported(localizationPrefix = "ActiveMQConsumerMeta.Injection.") 中的ActiveMQConsumerMeta.Injection.需要配合ActiveMQConsumerMeta中的成员变量来使用。比如:
/*** 连接地址*/
@Injection( name = "BROKER_URL" )
private String brokerUrl;

这里的BROKER_URL和刚刚的ActiveMQConsumerMeta.Injection.搭配起来就成了ActiveMQConsumer.Injection.BROKER_URL

这个属性也是在配置文件properties中配置的

构造方法

public ActiveMQConsumerMeta() {super();...setSpecificationMethod(ObjectLocationSpecificationMethod.FILENAME);
}
  • 注意指定setSpecificationMethod(ObjectLocationSpecificationMethod.FILENAME);这里设置的ObjectLocationSpecificationMethod.FILENAME值会在ActiveMQConsumerDialog.getData()用到

接口方法

@Override
public StepInterface getStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta, Trans trans) {return new ActiveMQConsumer(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}@Override
public StepDataInterface getStepData() {return new ActiveMQConsumerData();
}

这两个方法是接口必须实现的,按照模板来就行

成员变量

看代码注释

//固定用法,配合BaseMessages类从配置文件中读取配置
private static Class<?> PKG = ActiveMQConsumerMeta.class;/*** 以下静态变量用于定义xml中的标签tag*/
public static final String BROKER_URL = "brokerUrl";
public static final String QUEUE_NAME = "queue";public static final String TRANSFORMATION_PATH = "transformationPath";
public static final String BATCH_SIZE = "batchSize";
public static final String BATCH_DURATION = "batchDuration";public static final String OUTPUT_FIELD_TAG_NAME = "OutputField";
public static final String INPUT_NAME_ATTRIBUTE = "input";
public static final String TYPE_ATTRIBUTE = "type";public static final String ADVANCED_CONFIG = "advancedConfig" ;
private static final String CONFIG_OPTION = "option";
private static final String OPTION_PROPERTY = "property";
private static final String OPTION_VALUE = "value";/*** 连接地址*/
@Injection( name = "BROKER_URL" )
private String brokerUrl;/*** 队列名称*/
@Injection(name="QUEUE")
private String queue;/*** 注入的配置: 注意是transient* 在哪赋值的-Dialog中*/
@Injection(name = "NAMES", group = "CONFIGURATION_PROPERTIES")
protected transient List<String> injectedConfigNames;@Injection(name = "VALUES", group = "CONFIGURATION_PROPERTIES")
protected transient List<String> injectedConfigValues;private ActiveMQConsumerField msgIdField;
private ActiveMQConsumerField msgField;
private ActiveMQConsumerField timestampField;/*** 存放xml 中的advancedConfig option*/
private Map<String, String> config = new LinkedHashMap<>();

brokerUrl queue config msgIdField config 等变量是核心,它们流转于Dialog、ActiveMQConsumer(StepInterface)中。

injectedConfigNames、injectedConfigValues 是用于辅助生成config变量的(可以丢掉)

config变量对应的是Options Tab中的属性,是可变化的(可删除、增加)

msgField封装成ActiveMQConsumerField 枚举类,是便于可扩展以及可流转。(后面再详细叙说)

其他方法

@Override
public RowMeta getRowMeta(String origin, VariableSpace space) throws KettleStepException {RowMeta rowMeta = new RowMeta();putFieldOnRowMeta(getMsgIdField(), rowMeta, origin, space);putFieldOnRowMeta(getMsgField(), rowMeta, origin, space);putFieldOnRowMeta(getTimestampField(), rowMeta, origin, space);return rowMeta;
}private void putFieldOnRowMeta(ActiveMQConsumerField field, RowMetaInterface rowMeta,String origin, VariableSpace space) throws KettleStepException {if (field != null && !Utils.isEmpty(field.getOutputName())) {try {String value = space.environmentSubstitute(field.getOutputName());ValueMetaInterface v = ValueMetaFactory.createValueMeta(value,field.getOutputType().getValueMetaInterfaceType());//这里为什么要set步骤名称v.setOrigin(origin);rowMeta.addValueMeta(v);} catch (KettlePluginException e) {throw new KettleStepException(BaseMessages.getString(PKG,"ActiveMQConsumerInputMeta.UnableToCreateValueType",field), e);}}
}public List<ActiveMQConsumerField> getFieldDefinitions() {return Lists.newArrayList(getMsgIdField(), getMsgField(), getTimestampField());
}protected void setField(ActiveMQConsumerField field) {field.getInputName().setFieldOnMeta(this, field);
}
  • getRowMeta 是用于获取输出的字段的,即一行数据由哪几列组成。在步骤初始化(ActiveMQConsumer#init)的时候被调用。
  • putFieldOnRowMeta 组装一列数据(数据名称、类型)
  • getFieldDefinitions 获取输出字段列表(只是简单的将成员变量组成列表)
  • setField(ActiveMQConsumerField field) 这里比较绕–稍候描述

ActiveMQConsumerDialog

ActiveMQConsumerDialog 继承了 BaseStreamingDialogBaseStreamingDialog中实现了open 方法 ,所以不需要复写open方法,只需重写以下几个方法即可。

  • getDialogTitle()-设置标题
  • buildSetup(Composite wSetupComp) - 实现启动页面(必要的信息-服务器地址、队列名称)
  • getData()-重写此方法,将meta中的信息设置到启动页面的元素和父类的Text 或者 其他Tab也中(如果有的话)
  • createAdditionalTabs() 在此方法里创建额外的Tab
  • additionalOks(BaseStreamStepMeta meta):确认按钮,将Dialog中的数据保存至meta中。保存启动页、额外Tab页数据
  • getFieldNames() -如果创建了Field Tab,这里对应的是Output Name(第2列)
  • getFieldTypes() -如果创建了Field Tab,这里对应的是Type(第3列)

构造方法

public ActiveMQConsumerDialog(Shell parent, Object in, TransMeta tr, String sname) {super(parent, in, tr, sname);this.consumerMeta = (ActiveMQConsumerMeta) in;
}

需要注意的是 第二个参数是Object(实际是ActiveMQConsumerMeta对象)

getData()

@Override
protected void getData() {...switch ( specificationMethod ) {case FILENAME:wTransPath.setText(Const.NVL(meta.getFileName(), ""));break;case REPOSITORY_BY_NAME:String fullPath = Const.NVL(meta.getDirectoryPath(), "") + "/" + Const.NVL(meta.getTransName(), "");wTransPath.setText(fullPath);break;case REPOSITORY_BY_REFERENCE:referenceObjectId = meta.getTransObjectId();getByReferenceData(referenceObjectId);break;default:break;}...
}

这一段直接抄过来即可。

additionalOks()

将Dialog中的数据保存至meta中。保存启动页、额外Tab页数据

@Override
protected void additionalOks(BaseStreamStepMeta meta) {consumerMeta.setBrokerUrl(wBrokerUrl.getText());consumerMeta.setQueue(wQueue.getText());//将field值设置到meta中setFieldsFromTable();//将option中的值设置到meta中setOptionsFromTable();
}

注意一下setFieldsFromTable()方法=>保存field

/*** 将field值设置到meta中*/
private void setFieldsFromTable() {int itemCount = fieldsTable.getItemCount();for (int rowIndex = 0; rowIndex < itemCount; rowIndex++) {TableItem row = fieldsTable.getTable().getItem(rowIndex);String inputName = row.getText(1);String outputName = row.getText(2);String outputType = row.getText(3);final ActiveMQConsumerField.Name ref = ActiveMQConsumerField.Name.valueOf(inputName.toUpperCase());final ActiveMQConsumerField field = new ActiveMQConsumerField(ref, outputName,ActiveMQConsumerField.Type.valueOf(outputType));consumerMeta.setField(field);}
}

Field Table中每一行数据 实例化成 ActiveMQConsumerField对象,然后setmeta中。

consumerMeta.setField(field);最终会调用 类似 consumerMeta.setMsgField 等具体的set方法,可以仔细研究一下ActiveMQConsumerField

getFieldNames()

getFieldNames()和getFieldTypes() 从描述来看,其实是提取Field Tab中的值,但它们的实际作用是什么呢?

如上图所示,当点击了New(新建转换)并保存后,在新文件中的Get records from stream步骤中就会有Field Tab中的值了

ActiveMQConsumerData

ActiveMQConsumerData 继承自 TransExecutorData ,只有一个成员变量 RowMetaInterface outputRowMeta=>存储[行元数据]

ActiveMQConsumer

ActiveMQConsumer继承自BaseStreamStep,所以无需重写processRow(),只需重写init()方法即可。

@Override
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {ActiveMQConsumerMeta meta = (ActiveMQConsumerMeta) stepMetaInterface;ActiveMQConsumerData data = (ActiveMQConsumerData) stepDataInterface;if (!super.init(meta,data)){logError(BaseMessages.getString(PKG, "ActiveMQConsumer.Error.InitFailed"));return false;}try {//创建[行元数据]-即:输出哪些字段data.outputRowMeta = meta.getRowMeta(getStepname(), this);} catch (KettleStepException e) {log.logError(e.getMessage(), e);}//创建activemq connectionfinal Connection connection;try {connection = ActiveMQFactory.getConn(meta.getActiveMQEntity());//subtransExecutor:子转换执行器window = new FixedTimeStreamWindow<>(subtransExecutor,data.outputRowMeta,getDuration(),getBatchSize());source = new ActiveMQStreamSource(connection, meta, data, this);} catch (JMSException e) {log.logError(e.getMessage(),e);return false;}return true;
}

以上是init方法的全部内容。我们来分段看。

try {//创建[行元数据]-即:输出哪些字段data.outputRowMeta = meta.getRowMeta(getStepname(), this);
} catch (KettleStepException e) {log.logError(e.getMessage(), e);
}

meta.getRowMeta(getStepname(), this);刚刚在ActiveMQConsumerMeta中已经介绍过了。主要是构建[行数据]-即列名称、类型。

connection = ActiveMQFactory.getConn(meta.getActiveMQEntity());从meta中获取服务器地址、队列名称等信息来获取连接。

//subtransExecutor:子转换执行器
window = new FixedTimeStreamWindow<>(subtransExecutor,data.outputRowMeta,getDuration(),getBatchSize());

固定这样写,将 data.outputRowMeta【行元数据】传给子窗口即可

source = new ActiveMQStreamSource(connection, meta, data, this);

source是父类BaseStreamStep的一个成员变量protected StreamSource<List<Object>> source ,所以我们的ActiveMQStreamSourceStreamSource<List<Object>>的实现类。

主要的职责是消费ActiveMQ的数据,然后传递给子窗口,怎么传递不需要关心。

我们现在看ActiveMQStreamSource代码。

ActiveMQStreamSource

在open()方法中有这样一段代码:

final List<ValueMetaInterface> valueMetas = consumerData.outputRowMeta.getValueMetaList();
positions = new HashMap<>(valueMetas.size());for (int i = 0; i < valueMetas.size(); i++) {for (ActiveMQConsumerField.Name name : ActiveMQConsumerField.Name.values()) {final ActiveMQConsumerField field = name.getFieldFromMeta(consumerMeta);String outputName = field.getOutputName();if (outputName != null && outputName.equals(valueMetas.get(i).getName())) {positions.putIfAbsent(name, i);}}
}

目的是找出某一列的位置。 假如:Message-1 MessageId-2

callable = new ActiveMQConsumerCallable(connection, super::close);
future = executorService.submit(callable);

具体的消费线程ActiveMQConsumerCallable

while (!closed.get()) {final TextMessage msg = (TextMessage) consumer.receive(1000L);if (msg != null) {List<List<Object>> rows = new ArrayList<>(1);final List<Object> row = processMessageAsRow(msg);rows.add(row);acceptRows(rows);session.commit();}
}

一直尝试拉取activemq的数据,如果有数据,调用processMessageAsRow(msg)处理数据,然后调用acceptRows(rows)传递给后续的步骤处理。

List<Object> processMessageAsRow(TextMessage msg) throws JMSException {Object[] rowData = RowDataUtil.allocateRowData(consumerData.outputRowMeta.size());if (positions.get(ActiveMQConsumerField.Name.MESSAGEID) != null) {rowData[positions.get(ActiveMQConsumerField.Name.MESSAGEID)] = msg.getJMSMessageID();}if (positions.get(ActiveMQConsumerField.Name.MESSAGE) != null) {rowData[positions.get(ActiveMQConsumerField.Name.MESSAGE)] = msg.getText();}if (positions.get(ActiveMQConsumerField.Name.TIMESTAMP) != null) {rowData[positions.get(ActiveMQConsumerField.Name.TIMESTAMP)] = msg.getJMSTimestamp();}return Arrays.asList(rowData);
}

processMessageAsRows其实就是将从active mq拿到的数据塞到对应的列(这也是为什么一开始要有positions = new HashMap<>(valueMetas.size())的原因)上去。

至此,ActiveMQ Consumer插件开发的主要步骤就介绍完毕了。

这篇关于Kettle-ActiveMQ Consumer插件开发笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/945593

相关文章

基于Qt开发一个简单的OFD阅读器

《基于Qt开发一个简单的OFD阅读器》这篇文章主要为大家详细介绍了如何使用Qt框架开发一个功能强大且性能优异的OFD阅读器,文中的示例代码讲解详细,有需要的小伙伴可以参考一下... 目录摘要引言一、OFD文件格式解析二、文档结构解析三、页面渲染四、用户交互五、性能优化六、示例代码七、未来发展方向八、结论摘要

在 VSCode 中配置 C++ 开发环境的详细教程

《在VSCode中配置C++开发环境的详细教程》本文详细介绍了如何在VisualStudioCode(VSCode)中配置C++开发环境,包括安装必要的工具、配置编译器、设置调试环境等步骤,通... 目录如何在 VSCode 中配置 C++ 开发环境:详细教程1. 什么是 VSCode?2. 安装 VSCo

IDEA常用插件之代码扫描SonarLint详解

《IDEA常用插件之代码扫描SonarLint详解》SonarLint是一款用于代码扫描的插件,可以帮助查找隐藏的bug,下载并安装插件后,右键点击项目并选择“Analyze”、“Analyzewit... 目录SonajavascriptrLint 查找隐藏的bug下载安装插件扫描代码查看结果总结Sona

java如何调用kettle设置变量和参数

《java如何调用kettle设置变量和参数》文章简要介绍了如何在Java中调用Kettle,并重点讨论了变量和参数的区别,以及在Java代码中如何正确设置和使用这些变量,避免覆盖Kettle中已设置... 目录Java调用kettle设置变量和参数java代码中变量会覆盖kettle里面设置的变量总结ja

C#图表开发之Chart详解

《C#图表开发之Chart详解》C#中的Chart控件用于开发图表功能,具有Series和ChartArea两个重要属性,Series属性是SeriesCollection类型,包含多个Series对... 目录OverviChina编程ewSeries类总结OverviewC#中,开发图表功能的控件是Char

鸿蒙开发搭建flutter适配的开发环境

《鸿蒙开发搭建flutter适配的开发环境》文章详细介绍了在Windows系统上如何创建和运行鸿蒙Flutter项目,包括使用flutterdoctor检测环境、创建项目、编译HAP包以及在真机上运... 目录环境搭建创建运行项目打包项目总结环境搭建1.安装 DevEco Studio NEXT IDE

Python开发围棋游戏的实例代码(实现全部功能)

《Python开发围棋游戏的实例代码(实现全部功能)》围棋是一种古老而复杂的策略棋类游戏,起源于中国,已有超过2500年的历史,本文介绍了如何用Python开发一个简单的围棋游戏,实例代码涵盖了游戏的... 目录1. 围棋游戏概述1.1 游戏规则1.2 游戏设计思路2. 环境准备3. 创建棋盘3.1 棋盘类

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

每天认识几个maven依赖(ActiveMQ+activemq-jaxb+activesoap+activespace+adarwin)

八、ActiveMQ 1、是什么? ActiveMQ 是一个开源的消息中间件(Message Broker),由 Apache 软件基金会开发和维护。它实现了 Java 消息服务(Java Message Service, JMS)规范,并支持多种消息传递协议,包括 AMQP、MQTT 和 OpenWire 等。 2、有什么用? 可靠性:ActiveMQ 提供了消息持久性和事务支持,确保消