Kettle-ActiveMQ Product插件开发笔记

2024-04-29 08:32

本文主要是介绍Kettle-ActiveMQ Product插件开发笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ActiveMQ Product插件开发笔记

概览

前提

该插件基于kettle 8.1.0.0-365 开发

如果是其他版本,不保证可用。(由于继承的BaseStreamingDialog等父类会随版本而变化)

本插件模仿官方Kafka插件源码编写:

https://github.com/pentaho/big-data-plugin/tree/master/kettle-plugins/kafka

暂不支持topic,需要的可自行修改源码(工程量应该不大)。

必备模板

相对ActiveMQ Consumer插件,Product插件相对来说简单多了。

由于Product不需要阻塞,所以就当作普通插件来开发。集成官方推荐的父类即可。

  • ActiveMQProduct extends BaseStep implements StepInterface
  • ActiveMQProductData extends BaseStepData implements StepDataInterface
  • ActiveMQProductDialog extends BaseStepDialog implements StepDialogInterface
  • ActiveMQProductMeta extends BaseStepMeta implements StepMetaInterface

ActiveMQProductMeta

关键属性的话就下面4个,是获取ActiveMQ 连接和消费数据必备的属性

/*** 连接地址*/
@Injection(name = "BROKER_URL")
private String brokerUrl;
/*** 队列名称*/
@Injection(name = "QUEUE")
private String queue;
/*** 发送的字段*/
@Injection(name = "MSG")
private String msgField;
/*** 存放xml 中的advancedConfig option*/
private Map<String, String> config = new LinkedHashMap<>();
  • msgField: 这个字段是从前一个步骤获取的。这个字段对应的值就是我们发送到AMQ的值,所以很重要。

然后就是模板方法,也是必备的:

  • getXML()
  • loadXML()
  • saveRep()
  • readRep()

ActiveMQProductDialog

继承自普通的BaseStepDialog

注意构造方法:将Object强转成BaseStepMeta 和 ActiveMQProductMeta

public ActiveMQProductDialog(Shell parent, Object in, TransMeta transMeta, String stepname) {super(parent, (BaseStepMeta) in, transMeta, stepname);this.meta = (ActiveMQProductMeta) in;
}

唯一需要实现的是open()方法。open()很多代码可以直接copy过来。

需要自己实现SetupOptions标签

其中Setup中的Message是需要从前一个步骤获取的,代码如下:

wMsgField = new ComboVar(transMeta, wSetupComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER);
props.setLook(wMsgField);
wMsgField.addModifyListener(lsMod);
FormData fdMsgField = new FormData();
fdMsgField.left = new FormAttachment(0, 0);
fdMsgField.top = new FormAttachment(wlMsgField, 5);
fdMsgField.right = new FormAttachment(0, INPUT_WIDTH);
wMsgField.setLayoutData(fdMsgField);
Listener lsMsgFocus = event -> {String current = wMsgField.getText();wMsgField.getCComboWidget().removeAll();wMsgField.setText(current);//重要的地方:从前个步骤获取字段try {RowMetaInterface rmi = transMeta.getPrevStepFields(stepname);//上一步骤的所有列-添加到下拉框中final List<ValueMetaInterface> ls = rmi.getValueMetaList();for (int i = 0; i < ls.size(); i++) {final ValueMetaBase vmb = (ValueMetaBase) ls.get(i);wMsgField.add(vmb.getName());}} catch (KettleStepException e) {e.printStackTrace();}
};
wMsgField.getCComboWidget().addListener(SWT.FocusIn, lsMsgFocus);

剩下的就没什么好说的了。

ActiveMQProduct

继承自 BaseStep,所以需要实现 init()processRow()

init

固定的格式啊

@Override
public boolean init(StepMetaInterface smi, StepDataInterface sdi) {super.init(smi, sdi);meta = (ActiveMQProductMeta) smi;data = (ActiveMQProductData) sdi;return true;
}

processRow

if (first) {//找出我们选择的Message列在上一步骤中排第几列,存储到ActiveMQProductData.msgFieldIndexdata.msgFieldIndex = getInputRowMeta().indexOfValue(environmentSubstitute(meta.getMsgField()));try {//还要创建AMQ连接,因为连接只需在刚开始时创建就行了,不要重复创建data.conn = ActiveMQFactory.getConn(meta.getActiveMQEntity());} catch (JMSException e) {//如果创建失败,就直接退出log.logError(e.getMessage(), e);setOutputDone();return false;}first = false;
}
//r表示上一步骤传递过来的数据,在初始化的时候我们已经知道要去哪一列拿目标数据了
//所以这里的content就是我们要发送的数据
String content = (String) r[data.msgFieldIndex];
TextMessage msg = session.createTextMessage(content);
producer.send(msg);
//记得提交给AMQ
session.commit();
//提交后记录+1
incrementLinesOutput();
//表示在此步骤后还可以接上另一个步骤(原封不动地把上一步骤的数据转发到下一步骤)
putRow(getInputRowMeta(), r);

这篇关于Kettle-ActiveMQ Product插件开发笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/945594

相关文章

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

每天认识几个maven依赖(ActiveMQ+activemq-jaxb+activesoap+activespace+adarwin)

八、ActiveMQ 1、是什么? ActiveMQ 是一个开源的消息中间件(Message Broker),由 Apache 软件基金会开发和维护。它实现了 Java 消息服务(Java Message Service, JMS)规范,并支持多种消息传递协议,包括 AMQP、MQTT 和 OpenWire 等。 2、有什么用? 可靠性:ActiveMQ 提供了消息持久性和事务支持,确保消

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

Linux_kernel驱动开发11

一、改回nfs方式挂载根文件系统         在产品将要上线之前,需要制作不同类型格式的根文件系统         在产品研发阶段,我们还是需要使用nfs的方式挂载根文件系统         优点:可以直接在上位机中修改文件系统内容,延长EMMC的寿命         【1】重启上位机nfs服务         sudo service nfs-kernel-server resta

【区块链 + 人才服务】区块链集成开发平台 | FISCO BCOS应用案例

随着区块链技术的快速发展,越来越多的企业开始将其应用于实际业务中。然而,区块链技术的专业性使得其集成开发成为一项挑战。针对此,广东中创智慧科技有限公司基于国产开源联盟链 FISCO BCOS 推出了区块链集成开发平台。该平台基于区块链技术,提供一套全面的区块链开发工具和开发环境,支持开发者快速开发和部署区块链应用。此外,该平台还可以提供一套全面的区块链开发教程和文档,帮助开发者快速上手区块链开发。