大数据项目之通话记录统计

2023-12-10 08:38

本文主要是介绍大数据项目之通话记录统计,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

架构图:
在这里插入图片描述

第一步:模拟生产数据

    public void produce() {try {// 读取通讯录数据List<Contact> contacts = in.read(Contact.class);while ( flg ) {// 从通讯录中随机查找2个电话号码(主叫,被叫)int call1Index = new Random().nextInt(contacts.size());int call2Index;while ( true ) {call2Index = new Random().nextInt(contacts.size());if ( call1Index != call2Index ) {break;}}Contact call1 = contacts.get(call1Index);Contact call2 = contacts.get(call2Index);// 生成随机的通话时间String startDate = "20180101000000";String endDate = "20190101000000";long startTime = DateUtil.parse(startDate, "yyyyMMddHHmmss").getTime();long endTime = DateUtil.parse(endDate, "yyyyMMddHHmmss").getTime();// 通话时间long calltime = startTime + (long)((endTime - startTime) * Math.random());// 通话时间字符串String callTimeString = DateUtil.format(new Date(calltime), "yyyyMMddHHmmss");// 生成随机的通话时长String duration = NumberUtil.format(new Random().nextInt(3000), 4);// 生成通话记录Calllog log = new Calllog(call1.getTel(), call2.getTel(), callTimeString, duration);System.out.println(log);// 将通话记录刷写到数据文件中out.write(log);}} catch ( Exception e ) {e.printStackTrace();}}

数据格式如下
在这里插入图片描述
第一列是主叫电话号码,第二列是被叫电话号码,第三列是通话开始时间,第四列是通话时长,单位秒。生成的数据放到一个文件中去。

第二步:flume收集日志并存放至kafka

启动flume来收集日志并发送到kafka(根据自己的安装目录自行修改)
flume-ng agent -c conf/ -n a1 -f conf/flume-2-kafka.conf

flume-2-kafka.conf文件内容如下

a1.sources = r1
a1.channels = c1
a1.sinks = k1a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /Users/liangjiepeng/Documents/tmpfile/bigdata/call.loga1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动kafka(启动参数根据自己配置文件位置自行修改)
kafka-server-start /usr/local/etc/kafka/server.properties

第三步:导入数据到hbase中去

启动hbase
从kafka中导出数据到hbase中去
hbase的表结构如下
在这里插入图片描述

    public void consume() {try {// 创建配置对象Properties prop = new Properties();prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));// 获取flume采集的数据KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);// 关注主题consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));// Hbase数据访问对象HBaseDao dao = new HBaseDao();// 初始化dao.init();int i = 0;// 消费数据while ( true ) {ConsumerRecords<String, String> consumerRecords = consumer.poll(10);for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.value());// 插入数据dao.insertData(consumerRecord.value());//Calllog log = new Calllog(consumerRecord.value());//dao.insertData(log);System.out.println(i++);}}} catch ( Exception e ) {e.printStackTrace();}}

为了更快地统计数据,创建了两个列族,分别代表call1是主叫还是被叫。上方produce方法中产生的数据都是放到caller族,而callee族的数据由hbase的协处理器根据放到caller族中的数据生成。

协处理器的postPut方法

public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {// 获取表Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));// 主叫用户的rowkeyString rowkey = Bytes.toString(put.getRow());// 1_133_2019_144_1010_1String[] values = rowkey.split("_");CoprocessorDao dao = new CoprocessorDao();String call1 = values[1];String call2 = values[3];String calltime = values[2];String duration = values[4];String flg = values[5];if ( "1".equals(flg) ) {// 只有主叫用户保存后才需要触发被叫用户的保存String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";// 保存数据Put calleePut = new Put(Bytes.toBytes(calleeRowkey));byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));table.put( calleePut );// 关闭表table.close();}}

使用协处理器可能会出现java.lang.OutOfMemoryError: Unable to create new native thread,这时需要增加系统进程可创建线程的最大数或者降低数据put到hbase中的速度。

第四步:分析统计并写入到mysql

目标:统计每个电话号码每天/每月/每年的通话次数和通话总时长,即mysql中一条记录要有电话号码、通话日期、通话次数、通话总时长,因为电话号码和通话日期有很多重复,把电话号码和通话日期作成外键关联到其它表中去。

格式变成如下
在这里插入图片描述
在这里插入图片描述
mapper如下
在这里插入图片描述
reducer
在这里插入图片描述
MySQLBeanOutputFormat的write方法
在这里插入图片描述
代码链接

这篇关于大数据项目之通话记录统计的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476589

相关文章

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

一文教你如何将maven项目转成web项目

《一文教你如何将maven项目转成web项目》在软件开发过程中,有时我们需要将一个普通的Maven项目转换为Web项目,以便能够部署到Web容器中运行,本文将详细介绍如何通过简单的步骤完成这一转换过程... 目录准备工作步骤一:修改​​pom.XML​​1.1 添加​​packaging​​标签1.2 添加

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

tomcat多实例部署的项目实践

《tomcat多实例部署的项目实践》Tomcat多实例是指在一台设备上运行多个Tomcat服务,这些Tomcat相互独立,本文主要介绍了tomcat多实例部署的项目实践,具有一定的参考价值,感兴趣的可... 目录1.创建项目目录,测试文China编程件2js.创建实例的安装目录3.准备实例的配置文件4.编辑实例的

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

springboot集成Deepseek4j的项目实践

《springboot集成Deepseek4j的项目实践》本文主要介绍了springboot集成Deepseek4j的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录Deepseek4j快速开始Maven 依js赖基础配置基础使用示例1. 流式返回示例2. 进阶

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个