Spark+Hbase 亿级流量分析实战(小巧高性能的ETL)

2024-01-10 10:48

本文主要是介绍Spark+Hbase 亿级流量分析实战(小巧高性能的ETL),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在上一篇文章 大猪 已经介绍了日志存储设计方案 ,我们数据已经落地到数据中心上了,那接下来如何ETL呢?毕竟可是生产环境级别的,可不能乱来。其实只要解决几个问题即可,不必要引入很大级别的组件来做,当然了各有各的千秋,本文主要从 易懂小巧简洁高性能 这三个方面去设计出发点,顺便还实现了一个精巧的 Filebeat。

9028759-f64f78b25e312817.png

要实现的功能就是扫描每天的增量日志并写入Hbase中

9028759-1e383558758292e8.png

需要搞定下面几个不务正业的小老弟

9028759-c294a9d51452c16b.png
  1. 需要把文件中的每一行数据都取出来
  2. 能处理超过10G以上的大日志文件,并且只能占用机器一定的内存,越小越好
  3. 从上图可以看到标黄的是已经写入Hbase的数据,不能重复读取
  4. 非活跃文件不能扫,因为文件过多会影响整体读取IO性能
  5. 读取中的过程要保证增量数据不能录入,因为要保证offset的时候写入mysql稳定不跳跃
9028759-4085e8474dcefb43.png

大猪 根据线上的生产环境一一把上面的功能重新分析给实现一下。

从第一点看还是比较简单的嘛?但是我们要结合上面的 5 个问题来看才行。

总结一句话就是:要实现一个高性能而且能随时重启继续工作的 loghub ETL 程序

实际也必需这样做,因为生产环境容不得马虎,不然就等着被BOSS

9028759-a51c023de6fb202d.png
9028759-bfc932f12f86e08f.png

需要有一个读取所有日志文件方法

9028759-1ea5a6cf0b8a638f.png

还要实现一个保存并读取文件进度的方法

9028759-1853910d9211c234.png

由于不能把一个日志文件全部读入内存进行处理
所以还需要一个能根据索引一行一行接着读取数据的方法

9028759-d689a95bd39e85e4.png

最后剩下一个Hbase的连接池小工具

9028759-682336126ccefe66.png

几个核心方法已经写完了,接着是我们的主程序

def run(logPath: File, defaultOffsetDay: String): Unit = {val sdfstr = Source.fromFile(seekDayFile).getLines().mkStringval offsetDay = Option(if (sdfstr == "") null else sdfstr)//读取设置读取日期的倒数一天之后的日期文件夹val noneOffsetFold = logPath.listFiles().filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString).sortBy(f => LocalDate.parse(f.getName).toEpochDay)//读取文件夹中的所有日志文件,并取出索引进行匹配val filesPar = noneOffsetFold.flatMap(files(_, file => file.getName.endsWith(".log"))).map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length())).filter(tp2 => {//过滤出新文件,与有增量的日志文件val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())val result = offsets.asScala.filter(m => fileMd5.equals(m._1))result.isEmpty || tp2._3 > result.head._2}).parfilesPar.tasksupport = poolval willUpdateOffset = new util.HashMap[String, Long]()val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")var logTime:String = nullfilesPar.foreach(tp3 => {val hbaseClient = HbasePool.getTable//因为不能全量读取数据,所有只能一条一条读取,批量提出交给HbaseClient的客户端的mutate方式优雅处理//foreach 里面的部分就是我们的业务处理部分lines(tp3._1, tp3._2, tp3._3, () => {willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)}).foreach(line => {val jsonObject = parse(line)val time = (jsonObject \ "time").extract[Long]val data = jsonObject \ "data"val dataMap = data.values.asInstanceOf[Map[String, Any]].filter(_._2 != null).map(x => x._1 -> x._2.toString)val uid = dataMap("uid")logTime = time.getLocalDateTime.toStringval rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)val row = new Put(Bytes.toBytes(rowkey))dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))hbaseClient.mutate(row)})hbaseClient.flush()})//更新索引到文件上writeSeek(willUpdateOffset)//更新索引日期到文件上writeSeekDay(noneOffsetFold.last.getName)//把 logTime offset 写到mysql中,方便Spark+Hbase程序读取并计算}

程序很精简,没有任何没用的功能在里面,线上的生产环境就应该是这子的了。
大家还可以根据需求加入程序退出发邮件通知功能之类的。
真正去算了一下也就100行功能代码,而且占用极小的内存,都不到100M,很精很精。

9028759-31d65a54c8cfda02.png

传送门 完整ETL程序源码

心明眼亮的你、从此刻开始。

9028759-7586fee586f2075b.png
9028759-4c3677dea1cf2734.png

这篇关于Spark+Hbase 亿级流量分析实战(小巧高性能的ETL)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590528

相关文章

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程

《在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程》本文介绍了在Java中使用ModelMapper库简化Shapefile属性转JavaBean的过程,对比... 目录前言一、原始的处理办法1、使用Set方法来转换2、使用构造方法转换二、基于ModelMapper

Java实战之自助进行多张图片合成拼接

《Java实战之自助进行多张图片合成拼接》在当今数字化时代,图像处理技术在各个领域都发挥着至关重要的作用,本文为大家详细介绍了如何使用Java实现多张图片合成拼接,需要的可以了解下... 目录前言一、图片合成需求描述二、图片合成设计与实现1、编程语言2、基础数据准备3、图片合成流程4、图片合成实现三、总结前

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

nginx-rtmp-module构建流媒体直播服务器实战指南

《nginx-rtmp-module构建流媒体直播服务器实战指南》本文主要介绍了nginx-rtmp-module构建流媒体直播服务器实战指南,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. RTMP协议介绍与应用RTMP协议的原理RTMP协议的应用RTMP与现代流媒体技术的关系2

C语言小项目实战之通讯录功能

《C语言小项目实战之通讯录功能》:本文主要介绍如何设计和实现一个简单的通讯录管理系统,包括联系人信息的存储、增加、删除、查找、修改和排序等功能,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录功能介绍:添加联系人模块显示联系人模块删除联系人模块查找联系人模块修改联系人模块排序联系人模块源代码如下

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ