二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)

本文主要是介绍二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、目的

由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。

而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中

二 、原始数据格式

JSON格式比较正常,对象中包含数组

{
    "deviceNo": "39",
    "sourceDeviceType": null,
    "sn": null,
    "model": null,
    "createTime": "2024-09-03 14:10:00",
    "data": {
        "cycle": 300,
        "evaluationList": [{
            "laneNo": 1,
            "laneType": null,
            "volume": 3,
            "queueLenMax": 11.43,
            "sampleNum": 0,
            "stopAvg": 0.54,
            "delayAvg": 0.0,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 2,
            "laneType": null,
            "volume": 7,
            "queueLenMax": 23.18,
            "sampleNum": 0,
            "stopAvg": 0.47,
            "delayAvg": 10.57,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 3,
            "laneType": null,
            "volume": 9,
            "queueLenMax": 11.54,
            "sampleNum": 0,
            "stopAvg": 0.18,
            "delayAvg": 9.67,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 4,
            "laneType": null,
            "volume": 6,
            "queueLenMax": 11.36,
            "sampleNum": 0,
            "stopAvg": 0.27,
            "delayAvg": 6.83,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        }]
    }
}

三、Java代码

package com.kgc;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaKafkaEvaluation {// 添加 Kafka Producer 配置private static Properties producerProps() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.ACKS_CONFIG, "-1");props.put(ProducerConfig.RETRIES_CONFIG, "3");props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");props.put(ProducerConfig.LINGER_MS_CONFIG, "1");props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");return props;}public static void main(String[] args) {Properties prop = new Properties();prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 每一个消费,都要定义不同的Group_IDprop.put(ConsumerConfig.GROUP_ID_CONFIG, "evaluation_group");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Collections.singleton("topic_internal_data_evaluation"));ObjectMapper mapper = new ObjectMapper();// 初始化 Kafka ProducerKafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {try {JsonNode rootNode = mapper.readTree(record.value());System.out.println("原始数据"+rootNode);String device_no = rootNode.get("deviceNo").asText();String source_device_type = rootNode.get("sourceDeviceType").asText();String sn = rootNode.get("sn").asText();String model = rootNode.get("model").asText();String create_time = rootNode.get("createTime").asText();String cycle = rootNode.get("data").get("cycle").asText();JsonNode evaluationList = rootNode.get("data").get("evaluationList");for (JsonNode evaluationItem : evaluationList) {String lane_no = evaluationItem.get("laneNo").asText();String lane_type = evaluationItem.get("laneType").asText();String volume = evaluationItem.get("volume").asText();String queue_len_max = evaluationItem.get("queueLenMax").asText();String sample_num = evaluationItem.get("sampleNum").asText();String stop_avg = evaluationItem.get("stopAvg").asText();String delay_avg = evaluationItem.get("delayAvg").asText();String pass_rate = evaluationItem.get("passRate").asText();String travel_dist = evaluationItem.get("travelDist").asText();String travel_time_avg = evaluationItem.get("travelTimeAvg").asText();String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",device_no, source_device_type, sn, model, create_time, cycle,lane_no, lane_type,volume,queue_len_max,sample_num,stop_avg,delay_avg,pass_rate,travel_dist,travel_time_avg);// 发送数据到 KafkaProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_evaluation", record.key(), outputLine);producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {if (e != null) {e.printStackTrace();} else {System.out.println("The offset of the record we just sent is: " + metadata.offset());}});}} catch (Exception e) {e.printStackTrace();}}consumer.commitAsync();}}}

1、服务器IP都是   192.168.0.70

2、消费Kafka主题(数据源):topic_internal_data_evaluation

3、生产Kafka主题(目标源):topic_db_data_evaluation

4、注意:字段顺序与ODS层表结构字段顺序一致!!!

四、开启Kafka主题topic_db_data_evaluation消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.70:9092  --topic topic_db_data_evaluation  --from-beginning

五、运行测试

1、启动项目

2、消费者输出数据

然后再用Flume采集写入HDFS就行了,不过ODS层表结构需要转变

六、ODS层新表结构

create external table  if not exists  hurys_dc_ods.ods_evaluation(device_no           string        COMMENT '设备编号',source_device_type  string        COMMENT '设备类型',sn                  string        COMMENT '设备序列号 ',model               string        COMMENT '设备型号',create_time         timestamp     COMMENT '创建时间',cycle               int           COMMENT '评价数据周期',lane_no             int           COMMENT '车道编号',lane_type           int           COMMENT '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',volume              int           COMMENT '车道内过停止线流量(辆)',queue_len_max       float         COMMENT '车道内最大排队长度(m)',sample_num          int           COMMENT '评价数据计算样本量',stop_avg            float         COMMENT '车道内平均停车次数(次)',delay_avg           float         COMMENT '车道内平均延误时间(s)',pass_rate           float         COMMENT '车道内一次通过率',travel_dist         float         COMMENT '车道内检测行程距离(m)',travel_time_avg     float         COMMENT '车道内平均行程时间'
)
comment '评价数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by ','
stored as SequenceFile
;

七、Flume采集配置文件

八、运行Flume任务,检查HDFS文件、以及ODS表数据

--刷新表分区
msck repair table ods_evaluation;
--查看表分区
show partitions hurys_dc_ods.ods_evaluation;
--查看表数据
select * from hurys_dc_ods.ods_evaluation
where day='2024-09-03';

搞定,这样就不需要在Hive中解析JSON数据了!!!

这篇关于二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1135327

相关文章

详解Java如何向http/https接口发出请求

《详解Java如何向http/https接口发出请求》这篇文章主要为大家详细介绍了Java如何实现向http/https接口发出请求,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 用Java发送web请求所用到的包都在java.net下,在具体使用时可以用如下代码,你可以把它封装成一

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

JAVA系统中Spring Boot应用程序的配置文件application.yml使用详解

《JAVA系统中SpringBoot应用程序的配置文件application.yml使用详解》:本文主要介绍JAVA系统中SpringBoot应用程序的配置文件application.yml的... 目录文件路径文件内容解释1. Server 配置2. Spring 配置3. Logging 配置4. Ma

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

Java 字符数组转字符串的常用方法

《Java字符数组转字符串的常用方法》文章总结了在Java中将字符数组转换为字符串的几种常用方法,包括使用String构造函数、String.valueOf()方法、StringBuilder以及A... 目录1. 使用String构造函数1.1 基本转换方法1.2 注意事项2. 使用String.valu

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.