hive-udf-kafka批量数据导入kafka

2024-03-04 20:38
文章标签 数据 导入 批量 hive kafka udf

本文主要是介绍hive-udf-kafka批量数据导入kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景:数据存在hive中,现在需要将数据导入kafka中,为了减少中间环节,使用自定义UDF将hive数据导入到kafka中

问题:UDF时对一行的处理,批量导入就会涉及多行的问题,怎么将多行数据放到一个udf中?
解决思路:用collect_list函数将多行转成集合,在udf中循环遍历,发送到kafka

  1. package cn.kobold;
  2.  
  3. import org.apache.hadoop.hive.ql.exec.Description;
  4. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  5. import org.apache.hadoop.hive.ql.metadata.HiveException;
  6. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  7. import org.apache.hadoop.hive.serde2.objectinspector.*;
  8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  10. import org.apache.hadoop.io.IntWritable;
  11. import org.apache.kafka.clients.producer.KafkaProducer;
  12. import org.apache.kafka.clients.producer.Producer;
  13. import org.apache.kafka.clients.producer.ProducerRecord;
  14. import org.json.JSONObject;
  15.  
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.Properties;
  19.  
  20. @Description(name = "hive2kafka", value = "_FUNC_(brokerhost_and_port,topic, array<map<string,string>>) - Return ret ")
  21. public class Hive2KakfaUDF extends GenericUDF {
  22.  
  23.     private String hostAndPort;
  24.     private String topics;
  25.     private StandardListObjectInspector paramsListInspector;
  26.     private StandardMapObjectInspector paramsElementInspector;
  27.  
  28.     public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
  29.         if (arg0.length != 3) {
  30.             throw new UDFArgumentException(" Expecting   two  arguments:<brokerhost:port> <topic>  array<map<string,string>> ");
  31.         }
  32.         // 第一个参数验证
  33.         if (arg0[0].getCategory() == Category.PRIMITIVE
  34.                 && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  35.             if (!(arg0[0] instanceof ConstantObjectInspector)) {
  36.                 throw new UDFArgumentException("broker host:port  must be constant");
  37.             }
  38.             ConstantObjectInspector brokerhost_and_port = (ConstantObjectInspector) arg0[0];
  39.  
  40.             hostAndPort = brokerhost_and_port.getWritableConstantValue().toString();
  41.         }
  42.  
  43.         // 第二个参数验证
  44.         if (arg0[1].getCategory() == Category.PRIMITIVE
  45.                 && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  46.             if (!(arg0[1] instanceof ConstantObjectInspector)) {
  47.                 throw new UDFArgumentException("kafka topic must be constant");
  48.             }
  49.             ConstantObjectInspector topicCOI = (ConstantObjectInspector) arg0[1];
  50.  
  51.             topics = topicCOI.getWritableConstantValue().toString();
  52.         }
  53.  
  54.  
  55.         // 第三个参数验证
  56.         if (arg0[2].getCategory() != Category.LIST) {
  57.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  58.         }
  59.         ListObjectInspector third = (ListObjectInspector) arg0[2];
  60.         if (third.getListElementObjectInspector().getCategory() != Category.MAP) {
  61.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  62.         }
  63.         paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector());
  64.         paramsElementInspector = (StandardMapObjectInspector) third.getListElementObjectInspector();
  65.         System.out.println(paramsElementInspector.getMapKeyObjectInspector().getCategory());
  66.         System.out.println(paramsElementInspector.getMapValueObjectInspector().getCategory());
  67.  
  68.         return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
  69.  
  70.     }
  71.  
  72.     public Object evaluate(DeferredObject[] arg0) throws HiveException {
  73.         Properties props = new Properties();
  74.         props.put("bootstrap.servers", hostAndPort);
  75.         props.put("acks", "all");
  76.         props.put("retries", 0);
  77.         props.put("batch.size", 16384);
  78.         props.put("linger.ms", 1);
  79.         props.put("buffer.memory", 33554432);
  80.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  81.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  82.  
  83.         // 创建kafka生产者
  84.         Producer<String, String> producer = new KafkaProducer<String, String>(props);
  85.  
  86.         for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) {
  87.             Object row = paramsListInspector.getListElement(arg0[2].get(), i);
  88.             Map<?, ?> map = paramsElementInspector.getMap(row);
  89.             // Object obj = ObjectInspectorUtils.copyToStandardJavaObject(row,paramsElementInspector);
  90.             // 转成标准的java map,否则里面的key value字段为hadoop writable对象
  91.             Map<String, String> data = new HashMap<String,String>();
  92.             for (Map.Entry<?, ?> entry : map.entrySet()) {
  93.                 if (entry.getValue() != null && !"".equals(entry.getValue().toString())) {
  94.                     data.put(entry.getKey().toString(), entry.getValue().toString());
  95.                 }
  96.             }
  97.             JSONObject jsonObject = new JSONObject(data);
  98.  
  99.             //指定数据均匀写入3个分区中
  100.             int part = i % 2;
  101.             producer.send(new ProducerRecord<String, String>(topics, part,Integer.toString(i), jsonObject.toString()));
  102.  
  103.         }
  104.  
  105.         producer.close();
  106.  
  107.         return new IntWritable(1);
  108.     }
  109.  
  110.     public String getDisplayString(String[] strings) {
  111.         return "hive2kafka(brokerhost_and_port,topic, array<map<string,string>>)";
  112.     }
  113. }

测试SQL
第一个参数:broker所在位置
第二个参数:topic
第三个参数:将多行转成集合,每一行转成一个map

  1. SELECT g,hive2kafka('bd01:9092','bobizli_test',collect_list(map('full_name',full_name,'simple_name',simple_name))) AS result
  2. FROM
  3. (
  4. SELECT r1,pmod(ABS(hash(r1)),1000) AS g,full_name,simple_name
  5. FROM(
  6. SELECT row_number() over(PARTITION BY 1) AS r1,full_name,simple_name
  7. FROM dws_bo_final_spider_contact
  8. LIMIT 10000) tmp
  9. ) tmp2
  10. GROUP BY g;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

这篇关于hive-udf-kafka批量数据导入kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/774375

相关文章

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

python 在pycharm下能导入外面的模块,到terminal下就不能导入

项目结构如下,在ic2ctw.py 中导入util,在pycharm下不报错,但是到terminal下运行报错  File "deal_data/ic2ctw.py", line 3, in <module>     import util 解决方案: 暂时方案:在终端下:export PYTHONPATH=/Users/fujingling/PycharmProjects/PSENe

数据时代的数字企业

1.写在前面 讨论数据治理在数字企业中的影响和必要性,并介绍数据治理的核心内容和实践方法。作者强调了数据质量、数据安全、数据隐私和数据合规等方面是数据治理的核心内容,并介绍了具体的实践措施和案例分析。企业需要重视这些方面以实现数字化转型和业务增长。 数字化转型行业小伙伴可以加入我的星球,初衷成为各位数字化转型参考库,星球内容每周更新 个人工作经验资料全部放在这里,包含数据治理、数据要

如何在Java中处理JSON数据?

如何在Java中处理JSON数据? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨在Java中如何处理JSON数据。JSON(JavaScript Object Notation)作为一种轻量级的数据交换格式,在现代应用程序中被广泛使用。Java通过多种库和API提供了处理JSON的能力,我们将深入了解其用法和最佳

两个基因相关性CPTAC蛋白组数据

目录 蛋白数据下载 ①蛋白数据下载 1,TCGA-选择泛癌数据  2,TCGA-TCPA 3,CPTAC(非TCGA) ②蛋白相关性分析 1,数据整理 2,蛋白相关性分析 PCAS在线分析 蛋白数据下载 CPTAC蛋白组学数据库介绍及数据下载分析 – 王进的个人网站 (jingege.wang) ①蛋白数据下载 可以下载泛癌蛋白数据:UCSC Xena (xena

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

BD错误集锦9——查询hive表格时出错:Wrong FS: hdfs://s233/user/../warehouse expected: hdfs://mycluster

集群环境描述:HDFS集群处于HA模式下,同时启动了YARN\JN\KAFKA\ZK。 现象: FAILED: SemanticException Unable to determine if hdfs://s233/user/hive/warehouse/mydb.db/ext_calllogs_in_hbase is encrypted: java.lang.IllegalArgument

BD错误集锦1——[Hive]ERROR StatusLogger No log4j2 configuration file found. Using default configuration:

错误描述:在使用IDEA进行jdbc方式连接到hive数据仓库时,出现以下错误:                ERROR StatusLogger No log4j2 configuration file found. 问题原因:缺少log4j2.xml文件   <?xml version="1.0" encoding="UTF-8"?><Configuration><Appender

中国341城市生态系统服务价值数据集(2000-2020年)

生态系统服务反映了人类直接或者间接从自然生态系统中获得的各种惠益,对支撑和维持人类生存和福祉起着重要基础作用。目前针对全国城市尺度的生态系统服务价值的长期评估还相对较少。我们在Xie等(2017)的静态生态系统服务当量因子表基础上,选取净初级生产力,降水量,生物迁移阻力,土壤侵蚀度和道路密度五个变量,对生态系统供给服务、调节服务、支持服务和文化服务共4大类和11小类的当量因子进行了时空调整,计算了