hive-udf-kafka批量数据导入kafka

2024-03-04 20:38
文章标签 数据 导入 批量 hive kafka udf

本文主要是介绍hive-udf-kafka批量数据导入kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景:数据存在hive中,现在需要将数据导入kafka中,为了减少中间环节,使用自定义UDF将hive数据导入到kafka中

问题:UDF时对一行的处理,批量导入就会涉及多行的问题,怎么将多行数据放到一个udf中?
解决思路:用collect_list函数将多行转成集合,在udf中循环遍历,发送到kafka

  1. package cn.kobold;
  2.  
  3. import org.apache.hadoop.hive.ql.exec.Description;
  4. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  5. import org.apache.hadoop.hive.ql.metadata.HiveException;
  6. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  7. import org.apache.hadoop.hive.serde2.objectinspector.*;
  8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  10. import org.apache.hadoop.io.IntWritable;
  11. import org.apache.kafka.clients.producer.KafkaProducer;
  12. import org.apache.kafka.clients.producer.Producer;
  13. import org.apache.kafka.clients.producer.ProducerRecord;
  14. import org.json.JSONObject;
  15.  
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.Properties;
  19.  
  20. @Description(name = "hive2kafka", value = "_FUNC_(brokerhost_and_port,topic, array<map<string,string>>) - Return ret ")
  21. public class Hive2KakfaUDF extends GenericUDF {
  22.  
  23.     private String hostAndPort;
  24.     private String topics;
  25.     private StandardListObjectInspector paramsListInspector;
  26.     private StandardMapObjectInspector paramsElementInspector;
  27.  
  28.     public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
  29.         if (arg0.length != 3) {
  30.             throw new UDFArgumentException(" Expecting   two  arguments:<brokerhost:port> <topic>  array<map<string,string>> ");
  31.         }
  32.         // 第一个参数验证
  33.         if (arg0[0].getCategory() == Category.PRIMITIVE
  34.                 && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  35.             if (!(arg0[0] instanceof ConstantObjectInspector)) {
  36.                 throw new UDFArgumentException("broker host:port  must be constant");
  37.             }
  38.             ConstantObjectInspector brokerhost_and_port = (ConstantObjectInspector) arg0[0];
  39.  
  40.             hostAndPort = brokerhost_and_port.getWritableConstantValue().toString();
  41.         }
  42.  
  43.         // 第二个参数验证
  44.         if (arg0[1].getCategory() == Category.PRIMITIVE
  45.                 && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  46.             if (!(arg0[1] instanceof ConstantObjectInspector)) {
  47.                 throw new UDFArgumentException("kafka topic must be constant");
  48.             }
  49.             ConstantObjectInspector topicCOI = (ConstantObjectInspector) arg0[1];
  50.  
  51.             topics = topicCOI.getWritableConstantValue().toString();
  52.         }
  53.  
  54.  
  55.         // 第三个参数验证
  56.         if (arg0[2].getCategory() != Category.LIST) {
  57.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  58.         }
  59.         ListObjectInspector third = (ListObjectInspector) arg0[2];
  60.         if (third.getListElementObjectInspector().getCategory() != Category.MAP) {
  61.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  62.         }
  63.         paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector());
  64.         paramsElementInspector = (StandardMapObjectInspector) third.getListElementObjectInspector();
  65.         System.out.println(paramsElementInspector.getMapKeyObjectInspector().getCategory());
  66.         System.out.println(paramsElementInspector.getMapValueObjectInspector().getCategory());
  67.  
  68.         return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
  69.  
  70.     }
  71.  
  72.     public Object evaluate(DeferredObject[] arg0) throws HiveException {
  73.         Properties props = new Properties();
  74.         props.put("bootstrap.servers", hostAndPort);
  75.         props.put("acks", "all");
  76.         props.put("retries", 0);
  77.         props.put("batch.size", 16384);
  78.         props.put("linger.ms", 1);
  79.         props.put("buffer.memory", 33554432);
  80.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  81.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  82.  
  83.         // 创建kafka生产者
  84.         Producer<String, String> producer = new KafkaProducer<String, String>(props);
  85.  
  86.         for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) {
  87.             Object row = paramsListInspector.getListElement(arg0[2].get(), i);
  88.             Map<?, ?> map = paramsElementInspector.getMap(row);
  89.             // Object obj = ObjectInspectorUtils.copyToStandardJavaObject(row,paramsElementInspector);
  90.             // 转成标准的java map,否则里面的key value字段为hadoop writable对象
  91.             Map<String, String> data = new HashMap<String,String>();
  92.             for (Map.Entry<?, ?> entry : map.entrySet()) {
  93.                 if (entry.getValue() != null && !"".equals(entry.getValue().toString())) {
  94.                     data.put(entry.getKey().toString(), entry.getValue().toString());
  95.                 }
  96.             }
  97.             JSONObject jsonObject = new JSONObject(data);
  98.  
  99.             //指定数据均匀写入3个分区中
  100.             int part = i % 2;
  101.             producer.send(new ProducerRecord<String, String>(topics, part,Integer.toString(i), jsonObject.toString()));
  102.  
  103.         }
  104.  
  105.         producer.close();
  106.  
  107.         return new IntWritable(1);
  108.     }
  109.  
  110.     public String getDisplayString(String[] strings) {
  111.         return "hive2kafka(brokerhost_and_port,topic, array<map<string,string>>)";
  112.     }
  113. }

测试SQL
第一个参数:broker所在位置
第二个参数:topic
第三个参数:将多行转成集合,每一行转成一个map

  1. SELECT g,hive2kafka('bd01:9092','bobizli_test',collect_list(map('full_name',full_name,'simple_name',simple_name))) AS result
  2. FROM
  3. (
  4. SELECT r1,pmod(ABS(hash(r1)),1000) AS g,full_name,simple_name
  5. FROM(
  6. SELECT row_number() over(PARTITION BY 1) AS r1,full_name,simple_name
  7. FROM dws_bo_final_spider_contact
  8. LIMIT 10000) tmp
  9. ) tmp2
  10. GROUP BY g;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

这篇关于hive-udf-kafka批量数据导入kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/774375

相关文章

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

oracle 11g导入\导出(expdp impdp)之导入过程

《oracle11g导入导出(expdpimpdp)之导入过程》导出需使用SEC.DMP格式,无分号;建立expdir目录(E:/exp)并确保存在;导入在cmd下执行,需sys用户权限;若需修... 目录准备文件导入(impdp)1、建立directory2、导入语句 3、更改密码总结上一个环节,我们讲了

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

C#实现一键批量合并PDF文档

《C#实现一键批量合并PDF文档》这篇文章主要为大家详细介绍了如何使用C#实现一键批量合并PDF文档功能,文中的示例代码简洁易懂,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言效果展示功能实现1、添加文件2、文件分组(书签)3、定义页码范围4、自定义显示5、定义页面尺寸6、PDF批量合并7、其他方法

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映