ES实现百亿级数据实时分析实战案例

2024-09-06 20:08

本文主要是介绍ES实现百亿级数据实时分析实战案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

背景

我们小组前段时间接到一个需求,希望能够按照小时为单位,看到每个实验中各种特征(单个或组合)的覆盖率、正样本占比、负样本占比。我简单解释一下这三种指标的定义:

  • 覆盖率:所有样本中出现某一特征的样本的比例

  • 正样本占比:所有出现该特征的样本中,正样本的比例

  • 负样本占比:所有出现该特征的样本中,负样本的比例

光看这三个指标,大家可能会觉得这个需求很简单,无非就是一个简单的筛选、聚合而已。

如果真的这么简单,我也没必要写这篇文章单独记录了。问题的关键就在于,每小时有将近1亿的数据量,而我们需要保存7天的数据,数据总量预计超过了100亿

技术方案

在了解清楚需求后,我们小组马上对技术方案展开讨论,讨论过程中出现了3种方案:

  • 第一种:用Spark流式计算,计算每一种可能单个或组合特征的相关指标

  • 第二种:收到客户端请求后,遍历HDFS中相关数据,进行离线计算

  • 第三种:将数据按照实验+小时分索引存入ES,收到客户端请求后,实时计算返回

首先,第一种方案直接被diss,原因是一个实验一般会出现几百、上千个特征,而这些特征的组合何止几亿种,全部计算的话,可行性暂且不论,光是对资源的消耗就无法承受。

第二种方案,虽然技术上是可行的,但离线计算所需时间较长,对用户来说,体验并不理想。并且,为了计算目标1%的数据而要遍历所有数据,对资源也存在很大浪费。

第三种方案,将数据按照实验+小时分索引后,可以将每个索引包含的数据量降到1000万以下,再借助ES在查询、聚合方面高效的能力,应该可以实现秒级响应,并且用户体验也会非常好。

技术方案由此确定。

技术架构

1.用Spark从Kafka中接入原始数据,之后对数据进行解析,转换成我们的目标格式

2.将数据按照实验+小时分索引存入ES中

3.接受到用户请求后,将请求按照实验+特征+小时组合,创建多个异步任务,由这些异步任务并行从ES中过滤并聚合相关数据,得到结果

4.将异步任务的结果进行合并,返回给前端进行展示

代码实现

异步任务

// 启动并行任务final Map<String,List<Future<GetCoverageTask.Result>>> futures = Maps.newHashMap();for(String metric : metrics) { // 遍历要计算的指标final SampleRatio sampleRatio = getSampleRatio(metric);for (String exptId : expts) { // 遍历目标实验列表for (String id : features) { // 遍历要分析的特征final String name = getMetricsName(exptId, sampleRatio, id);final List<Future<GetCoverageTask.Result>> resultList = Lists.newArrayList();for (Date hour : coveredHours) { // 将时间按照小时进行拆分final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);// 启动并行任务final Future<GetCoverageTask.Result> future = TaskExecutor.submit(task);resultList.add(future);}futures.put(name, resultList);}}}final QueryRes queryRes = new QueryRes();final Iterator<Map.Entry<String, List<Future<GetCoverageTask.Result>>>> it = futures.entrySet().iterator();while (it.hasNext()){// 省略结果处理流程}

指标计算

// 1\. 对文档进行聚合运行,分别得到基础文档的数量,以及目标文档数量final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);// 2\. 得到覆盖率final String indexName = getIndexName(exptId, hour);final Search search = new Search.Builder(searchBuilder.toString()).addIndex(indexName).addType(getType()).build();final SearchResult result = jestClient.execute(search);if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){// 请求出错log.warn(result.getErrorMessage());return 0f;}final MetricAggregation aggregations = result.getAggregations();// 3\. 解析结果final long dividend ;if(SampleRatio.ALL == sampleRatio){dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();}else {dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();}// 防止出现被除数为0时程序异常if(dividend <= 0){return 0f;}long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();return divisor / (float)dividend;

聚合

int label = 0;final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);// 包含指定特征的正样本数量final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();final List<QueryBuilder> must = boolQuery.must();// 计算样本数量TermQueryBuilder labelQuery = null;if(SampleRatio.POSITIVE == sampleRatio) {// 计算正样本数量label = 1;labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);must.add(labelQuery);}else if(SampleRatio.NEGATIVE == sampleRatio) {// 计算负样本数量labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);must.add(labelQuery);}must.add(existsQuery);final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());existsCountAgg.field(fieldName);final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);filterAgg.subAggregation(existsCountAgg);return filterAgg;
上线效果

上线后表现完全满足预期,平均请求耗时在3秒左右,用户体验良好。感谢各位小伙伴的辛苦付出~~

下图是ES中部分索引的信息:

突破性能瓶颈!ElasticSearch百亿级数据检索优化案例

ElasticSearch读写底层原理及性能调优

一文俯瞰Elasticsearch核心原理

文章不错?点个【在看】吧! ????

这篇关于ES实现百亿级数据实时分析实战案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142996

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置