大数据项目之通话记录统计

2023-12-10 08:38

本文主要是介绍大数据项目之通话记录统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

架构图:
在这里插入图片描述

第一步:模拟生产数据

    public void produce() {try {// 读取通讯录数据List<Contact> contacts = in.read(Contact.class);while ( flg ) {// 从通讯录中随机查找2个电话号码(主叫,被叫)int call1Index = new Random().nextInt(contacts.size());int call2Index;while ( true ) {call2Index = new Random().nextInt(contacts.size());if ( call1Index != call2Index ) {break;}}Contact call1 = contacts.get(call1Index);Contact call2 = contacts.get(call2Index);// 生成随机的通话时间String startDate = "20180101000000";String endDate = "20190101000000";long startTime = DateUtil.parse(startDate, "yyyyMMddHHmmss").getTime();long endTime = DateUtil.parse(endDate, "yyyyMMddHHmmss").getTime();// 通话时间long calltime = startTime + (long)((endTime - startTime) * Math.random());// 通话时间字符串String callTimeString = DateUtil.format(new Date(calltime), "yyyyMMddHHmmss");// 生成随机的通话时长String duration = NumberUtil.format(new Random().nextInt(3000), 4);// 生成通话记录Calllog log = new Calllog(call1.getTel(), call2.getTel(), callTimeString, duration);System.out.println(log);// 将通话记录刷写到数据文件中out.write(log);}} catch ( Exception e ) {e.printStackTrace();}}

数据格式如下
在这里插入图片描述
第一列是主叫电话号码,第二列是被叫电话号码,第三列是通话开始时间,第四列是通话时长,单位秒。生成的数据放到一个文件中去。

第二步:flume收集日志并存放至kafka

启动flume来收集日志并发送到kafka(根据自己的安装目录自行修改)
flume-ng agent -c conf/ -n a1 -f conf/flume-2-kafka.conf

flume-2-kafka.conf文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /Users/liangjiepeng/Documents/tmpfile/bigdata/call.loga1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动kafka(启动参数根据自己配置文件位置自行修改)
kafka-server-start /usr/local/etc/kafka/server.properties

第三步:导入数据到hbase中去

启动hbase
从kafka中导出数据到hbase中去
hbase的表结构如下
在这里插入图片描述

    public void consume() {try {// 创建配置对象Properties prop = new Properties();prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));// 获取flume采集的数据KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);// 关注主题consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));// Hbase数据访问对象HBaseDao dao = new HBaseDao();// 初始化dao.init();int i = 0;// 消费数据while ( true ) {ConsumerRecords<String, String> consumerRecords = consumer.poll(10);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());// 插入数据dao.insertData(consumerRecord.value());//Calllog log = new Calllog(consumerRecord.value());//dao.insertData(log);System.out.println(i++);}}} catch ( Exception e ) {e.printStackTrace();}}

为了更快地统计数据,创建了两个列族,分别代表call1是主叫还是被叫。上方produce方法中产生的数据都是放到caller族,而callee族的数据由hbase的协处理器根据放到caller族中的数据生成。

协处理器的postPut方法

public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// 获取表Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));// 主叫用户的rowkeyString rowkey = Bytes.toString(put.getRow());// 1_133_2019_144_1010_1String[] values = rowkey.split("_");CoprocessorDao dao = new CoprocessorDao();String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String flg = values[5];if ( "1".equals(flg) ) {// 只有主叫用户保存后才需要触发被叫用户的保存String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 保存数据Put calleePut = new Put(Bytes.toBytes(calleeRowkey));byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));table.put( calleePut );// 关闭表table.close();}}

使用协处理器可能会出现java.lang.OutOfMemoryError: Unable to create new native thread,这时需要增加系统进程可创建线程的最大数或者降低数据put到hbase中的速度。

第四步:分析统计并写入到mysql

目标:统计每个电话号码每天/每月/每年的通话次数和通话总时长,即mysql中一条记录要有电话号码、通话日期、通话次数、通话总时长,因为电话号码和通话日期有很多重复,把电话号码和通话日期作成外键关联到其它表中去。

格式变成如下
在这里插入图片描述
在这里插入图片描述
mapper如下
在这里插入图片描述
reducer
在这里插入图片描述
MySQLBeanOutputFormat的write方法
在这里插入图片描述
代码链接

这篇关于大数据项目之通话记录统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476589

相关文章

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

深度解析Java项目中包和包之间的联系

《深度解析Java项目中包和包之间的联系》文章浏览阅读850次,点赞13次,收藏8次。本文详细介绍了Java分层架构中的几个关键包:DTO、Controller、Service和Mapper。_jav... 目录前言一、各大包1.DTO1.1、DTO的核心用途1.2. DTO与实体类(Entity)的区别1

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

如何在Spring Boot项目中集成MQTT协议

《如何在SpringBoot项目中集成MQTT协议》本文介绍在SpringBoot中集成MQTT的步骤,包括安装Broker、添加EclipsePaho依赖、配置连接参数、实现消息发布订阅、测试接口... 目录1. 准备工作2. 引入依赖3. 配置MQTT连接4. 创建MQTT配置类5. 实现消息发布与订阅

在Linux终端中统计非二进制文件行数的实现方法

《在Linux终端中统计非二进制文件行数的实现方法》在Linux系统中,有时需要统计非二进制文件(如CSV、TXT文件)的行数,而不希望手动打开文件进行查看,例如,在处理大型日志文件、数据文件时,了解... 目录在linux终端中统计非二进制文件的行数技术背景实现步骤1. 使用wc命令2. 使用grep命令

springboot项目打jar制作成镜像并指定配置文件位置方式

《springboot项目打jar制作成镜像并指定配置文件位置方式》:本文主要介绍springboot项目打jar制作成镜像并指定配置文件位置方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录一、上传jar到服务器二、编写dockerfile三、新建对应配置文件所存放的数据卷目录四、将配置文

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片