hive-udf-kafka批量数据导入kafka

2024-03-04 20:38
文章标签 数据 导入 批量 hive kafka udf

本文主要是介绍hive-udf-kafka批量数据导入kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景:数据存在hive中,现在需要将数据导入kafka中,为了减少中间环节,使用自定义UDF将hive数据导入到kafka中

问题:UDF时对一行的处理,批量导入就会涉及多行的问题,怎么将多行数据放到一个udf中?
解决思路:用collect_list函数将多行转成集合,在udf中循环遍历,发送到kafka

  1. package cn.kobold;
  2.  
  3. import org.apache.hadoop.hive.ql.exec.Description;
  4. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  5. import org.apache.hadoop.hive.ql.metadata.HiveException;
  6. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  7. import org.apache.hadoop.hive.serde2.objectinspector.*;
  8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  10. import org.apache.hadoop.io.IntWritable;
  11. import org.apache.kafka.clients.producer.KafkaProducer;
  12. import org.apache.kafka.clients.producer.Producer;
  13. import org.apache.kafka.clients.producer.ProducerRecord;
  14. import org.json.JSONObject;
  15.  
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.Properties;
  19.  
  20. @Description(name = "hive2kafka", value = "_FUNC_(brokerhost_and_port,topic, array<map<string,string>>) - Return ret ")
  21. public class Hive2KakfaUDF extends GenericUDF {
  22.  
  23.     private String hostAndPort;
  24.     private String topics;
  25.     private StandardListObjectInspector paramsListInspector;
  26.     private StandardMapObjectInspector paramsElementInspector;
  27.  
  28.     public ObjectInspector initialize(ObjectInspector[] arg0) throws UDFArgumentException {
  29.         if (arg0.length != 3) {
  30.             throw new UDFArgumentException(" Expecting   two  arguments:<brokerhost:port> <topic>  array<map<string,string>> ");
  31.         }
  32.         // 第一个参数验证
  33.         if (arg0[0].getCategory() == Category.PRIMITIVE
  34.                 && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  35.             if (!(arg0[0] instanceof ConstantObjectInspector)) {
  36.                 throw new UDFArgumentException("broker host:port  must be constant");
  37.             }
  38.             ConstantObjectInspector brokerhost_and_port = (ConstantObjectInspector) arg0[0];
  39.  
  40.             hostAndPort = brokerhost_and_port.getWritableConstantValue().toString();
  41.         }
  42.  
  43.         // 第二个参数验证
  44.         if (arg0[1].getCategory() == Category.PRIMITIVE
  45.                 && ((PrimitiveObjectInspector) arg0[1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  46.             if (!(arg0[1] instanceof ConstantObjectInspector)) {
  47.                 throw new UDFArgumentException("kafka topic must be constant");
  48.             }
  49.             ConstantObjectInspector topicCOI = (ConstantObjectInspector) arg0[1];
  50.  
  51.             topics = topicCOI.getWritableConstantValue().toString();
  52.         }
  53.  
  54.  
  55.         // 第三个参数验证
  56.         if (arg0[2].getCategory() != Category.LIST) {
  57.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  58.         }
  59.         ListObjectInspector third = (ListObjectInspector) arg0[2];
  60.         if (third.getListElementObjectInspector().getCategory() != Category.MAP) {
  61.             throw new UDFArgumentException(" Expecting an array<map<string,string>> field as third argument ");
  62.         }
  63.         paramsListInspector = ObjectInspectorFactory.getStandardListObjectInspector(third.getListElementObjectInspector());
  64.         paramsElementInspector = (StandardMapObjectInspector) third.getListElementObjectInspector();
  65.         System.out.println(paramsElementInspector.getMapKeyObjectInspector().getCategory());
  66.         System.out.println(paramsElementInspector.getMapValueObjectInspector().getCategory());
  67.  
  68.         return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
  69.  
  70.     }
  71.  
  72.     public Object evaluate(DeferredObject[] arg0) throws HiveException {
  73.         Properties props = new Properties();
  74.         props.put("bootstrap.servers", hostAndPort);
  75.         props.put("acks", "all");
  76.         props.put("retries", 0);
  77.         props.put("batch.size", 16384);
  78.         props.put("linger.ms", 1);
  79.         props.put("buffer.memory", 33554432);
  80.         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  81.         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  82.  
  83.         // 创建kafka生产者
  84.         Producer<String, String> producer = new KafkaProducer<String, String>(props);
  85.  
  86.         for (int i = 0; i < paramsListInspector.getListLength(arg0[2].get()); i++) {
  87.             Object row = paramsListInspector.getListElement(arg0[2].get(), i);
  88.             Map<?, ?> map = paramsElementInspector.getMap(row);
  89.             // Object obj = ObjectInspectorUtils.copyToStandardJavaObject(row,paramsElementInspector);
  90.             // 转成标准的java map,否则里面的key value字段为hadoop writable对象
  91.             Map<String, String> data = new HashMap<String,String>();
  92.             for (Map.Entry<?, ?> entry : map.entrySet()) {
  93.                 if (entry.getValue() != null && !"".equals(entry.getValue().toString())) {
  94.                     data.put(entry.getKey().toString(), entry.getValue().toString());
  95.                 }
  96.             }
  97.             JSONObject jsonObject = new JSONObject(data);
  98.  
  99.             //指定数据均匀写入3个分区中
  100.             int part = i % 2;
  101.             producer.send(new ProducerRecord<String, String>(topics, part,Integer.toString(i), jsonObject.toString()));
  102.  
  103.         }
  104.  
  105.         producer.close();
  106.  
  107.         return new IntWritable(1);
  108.     }
  109.  
  110.     public String getDisplayString(String[] strings) {
  111.         return "hive2kafka(brokerhost_and_port,topic, array<map<string,string>>)";
  112.     }
  113. }

测试SQL
第一个参数:broker所在位置
第二个参数:topic
第三个参数:将多行转成集合,每一行转成一个map

  1. SELECT g,hive2kafka('bd01:9092','bobizli_test',collect_list(map('full_name',full_name,'simple_name',simple_name))) AS result
  2. FROM
  3. (
  4. SELECT r1,pmod(ABS(hash(r1)),1000) AS g,full_name,simple_name
  5. FROM(
  6. SELECT row_number() over(PARTITION BY 1) AS r1,full_name,simple_name
  7. FROM dws_bo_final_spider_contact
  8. LIMIT 10000) tmp
  9. ) tmp2
  10. GROUP BY g;

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

这篇关于hive-udf-kafka批量数据导入kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/774375

相关文章

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

利用Python脚本实现批量将图片转换为WebP格式

《利用Python脚本实现批量将图片转换为WebP格式》Python语言的简洁语法和库支持使其成为图像处理的理想选择,本文将介绍如何利用Python实现批量将图片转换为WebP格式的脚本,WebP作为... 目录简介1. python在图像处理中的应用2. WebP格式的原理和优势2.1 WebP格式与传统

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

浅析如何保证MySQL与Redis数据一致性

《浅析如何保证MySQL与Redis数据一致性》在互联网应用中,MySQL作为持久化存储引擎,Redis作为高性能缓存层,两者的组合能有效提升系统性能,下面我们来看看如何保证两者的数据一致性吧... 目录一、数据不一致性的根源1.1 典型不一致场景1.2 关键矛盾点二、一致性保障策略2.1 基础策略:更新数

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指