DolphinDB 高效清洗数据实例

2024-03-21 19:12

本文主要是介绍DolphinDB 高效清洗数据实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

如何利用 DolphinDB 高效清洗数据
ETL (Extraction-Transformation-Loading) 是商业智能、数据仓库以及数据挖掘的核心。随着数据量越来越庞大,不可避免会产生残缺、错误或者重复的数据,数据清洗的目的就是找出并消除这些不符合规范的数据。ETL 通过将源数据抽取(Extract)到临时中间层后进行清洗和转换(Transform),然后加载(Load)到数据仓库中,实现数据质量的提升。例如高频交易场景下,以深交所为例,单日的股票逐笔交易数据量在6000万行、8GB左右。获取源数据后,往往需要对海量高频数据按照一定的处理规则进行加工和转换,输出一份高质量、具备一致性并满足业务需求的数据,才能为后续的因子挖掘、策略研究奠定坚实的基础。在数据仓库项目中,几行代码高效实现数据分析的前提,往往是需要几十行、几百行的预处理代码进行数据清洗,因此 ETL 的效率至关重要。传统的数据清洗工具(Python, mysql, Java 等技术栈),性能会随着数据量增大而下降,并且无法处理 TB 级别以上的数据。但通过 DolphinDB 进行数据清洗,可以显著提高性能。本文将通过一个 SQL 优化案例,介绍如何基于 DolphinDB 分布式计算能力,将 ETL 过程的性能提高70余倍。本文将通过以下步骤来介绍优化过程:从4.5小时到3.5分钟,如何利用 DolphinDB 高效清洗数据
1 场景简述与数据准备
2 常规思路
2.1 优化前的代码
2.2 性能瓶颈分析
3 如何优化 - 3.1 降低问题空间复杂度 - 3.2 提高处理速度
4 优化的实现
4.1 优化后的代码
4.2 优势分析
5 总结
6 附录
1 场景简述与数据准备
试想我们从第三方获取了如下的原始交易数据表(trade),每天包含约3000只股票、6000万行的逐笔成交记录。在量化分析场景下,这样的源数据通常在格式、内容上都与具体的使用需求有一定距离。因此为了方便后续阶段的投研应用,我们需要对这份原始数据进行清洗、转化。这一工作通常由团队内的 ETL 工程师来完成。字段名称	数据类型	注释
securityID	STRING	股票编号
tradingdate	DATE	交易日
tradingtime	TIMESTAMP	交易时间
tradetype	SYMBOL	成交类型
recid	INT	消息记录号
tradeprice	DOUBLE	成交价格
tradevolume	INT	成交数量
buyorderid	INT	买单ID
sellorderid	INT	卖单ID
unix	TIMESTAMP	unix时间戳
img根据业务团队需求,我们要基于这份逐笔成交数据,转化字段类型,增加股票后缀、交易金额、交易类型,过滤交易失败记录等。这一过程涉及大量的历史数据处理,处理的数据集规模十分庞大(以深交所逐笔成交数据为例,压缩前一年的存储大小为1.7 T),这就要求 ETL 处理程序具备高性能。部分处理需求如下:逐笔交易数据的 tradingdate 字段从 DATE 类型改为 INT 类型
时间戳 tradingtime 改为 LONG 类型
增加 BSFlag 交易类型字段
增加交易金额
……
清洗后的数据写入目标表。源表和目标表均使用 OLAP 引擎,采用日期值分区 + 股票哈希分区(20个哈希分区)的组合分区方式。具体建库建表及模拟数据脚本参考附录数据准备脚本。开发环境配置如下服务器环境
CPU 类型:Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
逻辑 CPU 总数:16
内存:256 GB
OS:64 位 CentOS Linux 7 (Core)
DolphinDB server 部署
服务器版本:2.00.6
服务器部署模式:高可用集群(数据节点 * 3,控制节点 * 32 常规思路
2.1 优化前的代码
常规情况下会怎么做呢?通常我们会依次遍历数据集,对数据集的每个分片处理,然后汇总输出。具体来说,分为如下几个步骤:(1) 按交易日、股票 ID 将原始数据进行分片;data = [cut1, cut2, ... , cutN](2) 依次遍历每个分片 cutK,使用 step1(代码第8行)至 stepM(代码第19行),共 M 个步骤来完成每个分片的清洗转化,并存储至内存对象 tradingdf;(3) 将转化完成之后的内存对象 tradingdf 存储至 DFS 表。代码如下:def genDataV1(date1, dateN){tradeSrc = loadTable("dfs://originData", "trade")tradeTgt = loadTable("dfs://formatData", "trade")for (aDate in date1..dateN){tradeSecurityID = (exec distinct(securityID) from tradeSrc where tradingdate = aDate).shuffle()for (m in tradeSecurityID){		tradingdf = select  * from tradeSrc where securityID = m and tradingdate = aDate    tradingdf["symbol"] = m + "SZ"        //print("stock " + m + ",date is " + aDate + ",tradingdf size " + tradingdf.size())  tradingdf["buysellflag"] =iif(tradingdf["sellorderid"] > tradingdf["buyorderid"],"S", "B")tradingdf["tradeamount"] = tradingdf["tradevolume"] * tradingdf["tradeprice"]tradingdf = tradingdf[(tradingdf["tradetype"] == "0") || (tradingdf["tradetype"] == "F")]tradingdf = select symbol,tradingdate, tradingtime, recid, tradeprice, tradevolume, tradeamount, buyorderid, sellorderid, buysellflag, unix from tradingdftradingdf = select * from tradingdf order by symbol, tradingtime, recid     tradingdf.replaceColumn!("tradingdate", toIntDate(::date(tradingdf.tradingDate)))            tradingtime = string(exec tradingtime from tradingdf)tradingdf.replaceColumn!(`tradingtime, tradingtime)unix = long(exec unix from tradingdf)tradingdf.replaceColumn!(`unix, unix)                                             tradeTgt.append!(tradingdf)	      		}}
}
通常我们使用 Python、MySQL、Java、中间件(如 Kettle)作为清洗数据的 ETL 工具时,受限于它们的单点性能。按上述代码执行20个交易日的数据需要4.5小时,处理速度十分缓慢。2.2 性能瓶颈分析
回顾上述常规处理方式可以看到,优化前的代码效率较低,主要有以下几个原因:(1) 双重 for 循环常规的代码按股票、日期执行了双重循环,时间复杂度为:t = O(N) * O(M) * t0 = O(MN) * t0N: 交易日
M: 股票数量
t0: 最内层处理逻辑的耗时
指定日期和股票,测试最内层 for 循环执行一次的耗时为400 ms 左右。整体代码执行预估耗时为:t ~= 20 * 0.4 * 3000 = 6.7 小时(2) 数据访问次数太多由上述代码可以看出,从 step1 (代码第8)到 stepM(代码第19行)对同一个数据集操作了 M 次,这就导致代码执行起来速度较慢。而有一些操作可以同时处理,例如过滤数据与排序可以在一次操作中完成。(3) 单点计算上例中从 tradingdf 赋值语句开始:tradingdf=select * from loadTable("dfs://test", 'szl2_stock_trade_daily') where symbol = m and tradingDate = date之后的代码仅在单个节点进行计算,没有发挥出 DolphinDB 分布式、高并发计算的优势,从而当数据量不断增长,性能也就有所下降。3 如何优化
优化过程可以借鉴一个简单而强大的公式:完成一个计算问题的时间 t,满足:t = S / VS:问题的空间复杂度:一次任务的数据量。V:数据处理速度:每秒处理多少条记录。所以缩短 t 的两个核心思路就是:(1) 降低问题空间复杂度,(2) 提高数据处理速度。3.1 降低问题空间复杂度
在 DolphinDB 中,可以通过分区剪枝、列存、索引等技术手段来降低问题空间复杂度。分区剪枝
将时序数据进行分区(一般选择时间字段),当使用分区字段作为 where 条件查询数据时,可以根据分区设定,只读取对应分区文件,达到分区剪枝的效果。列存
如 snapshot 快照数据表,包括上百个字段,但一次聚合查询往往只需要其中几个字段。可以利用 DolphinDB OLAP 引擎的列存技术,在查询时只读取需要的列文件,将极大地减少磁盘 IO 。索引
当分布式表使用 TSDB 引擎时,且查询语句命中 Sort Key 时,可以通过扫描稀疏索引文件,来查询对应的数据块ID。进而只读取对应数据块,从而避免全表扫描。3.2 提高处理速度
在 DolphinDB 中,可以通过提高批处理数据量的大小、利用多线程并发、使用分布式处理等方式,来提高数据处理速度。合理的批数据量
DolphinDB 以分区为单位管理海量数据,通常数据处理以一个分区为单位,每个分区的大小为100MB-500MB(压缩前)左右比较合适。从 Python 和 MySQL 等技术栈迁移至 DolphinDB 时,需要充分理解分区在 DolphinDB 中的重要性,以便充分发挥 DolphinDB 的性能。推荐阅读分区数据库设计和操作。多线程
现代操作系统及软件使用多线程技术来充分发挥多核服务器的硬件优势。DolphinDB 大量应用了多线程技术,就分布式 SQL 而言,会使用多个线程来并发处理分区数据。分布式
DolphinDB 可以使用多个服务器构建一个水平扩展的分布式集群,并且支持分布式事务。当执行一个分布式 SQL 时,通过 map-merge-reduce 的模型来执行分布式计算。其中 map 阶段会自动调度到集群各个节点,充分利用集群的硬件资源。4 优化的实现
基于前文的分析,我们从减少问题空间复杂度、提高处理速度的角度出发,综合考虑后给出以下几个优化方法,提高 ETL 过程效率:提高并行度
减少数据访问的次数
使用向量化处理
4.1 优化后的代码
按天批量处理所有股票数据,DolphinDB 会将这一天20个分区内的3000个股票的数据,生成20个子任务并行处理,自动调度至集群中的各个节点,进行分布式计算。img优化后代码如下:def transformData(tradeDate){tradeSrc = loadTable("dfs://originData", "trade")tradeTgt = loadTable("dfs://formatData", "trade")data = select securityID + "SZ" as securityID,toIntDate(tradingdate) as  tradingdate,tradingtime$STRING as tradingtime,recid as recid ,tradeprice,tradevolume,tradevolume * tradeprice as tradeamount     	,buyorderid as buyrecid,sellorderid as sellrecid,iif(sellorderid>  buyorderid,"S", "B") as buysellflag    	,unix$LONG as unixfrom tradeSrcwhere tradingdate = tradeDate and tradetype in ["0", "F"]tradeTgt.append!(data)pnodeRun(flushOLAPCache)
}allDays = 2022.05.01..2022.05.20
for(aDate in allDays){jobId = "transform_"+ strReplace(aDate$STRING, ".", "")	jobDesc = "transform data"submitJob(jobId, jobDesc, transformData, aDate)
}
以上代码处理一天内3000只股票的数据,大约需要40秒。通过 submitJob 的方式同时处理20个交易日的数据,在 maxBatchJobWorker=16(一般设置为 CPU 核数) 的设置下,总共耗时210秒,性能提升74倍。4.2 优势分析
相对于原始代码,我们提供的优化代码有以下几点进步:分布式,并行度高
以上 select 部分的代码是分布式、并行执行的;进一步地,可以通过 submitJob 的方式并行执行多个任务。数据访问次数少
所有的处理逻辑,包括过滤数据、类型转换、增加派生字段,通过一次 read 即可完成,无需反复读取数据和对数据进行转换。img向量化
OLAP 引擎采用列式存储,一列数据读取到内存中以 vector 形式存储,因此,SQL 运算采用向量化方式,十分高效。

参考:https://gitee.com/dolphindb/Tutorials_CN/blob/master/data_ETL.md

这篇关于DolphinDB 高效清洗数据实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/833641

相关文章

前端原生js实现拖拽排课效果实例

《前端原生js实现拖拽排课效果实例》:本文主要介绍如何实现一个简单的课程表拖拽功能,通过HTML、CSS和JavaScript的配合,我们实现了课程项的拖拽、放置和显示功能,文中通过实例代码介绍的... 目录1. 效果展示2. 效果分析2.1 关键点2.2 实现方法3. 代码实现3.1 html部分3.2

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

Tomcat高效部署与性能优化方式

《Tomcat高效部署与性能优化方式》本文介绍了如何高效部署Tomcat并进行性能优化,以确保Web应用的稳定运行和高效响应,高效部署包括环境准备、安装Tomcat、配置Tomcat、部署应用和启动T... 目录Tomcat高效部署与性能优化一、引言二、Tomcat高效部署三、Tomcat性能优化总结Tom

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

Python利用自带模块实现屏幕像素高效操作

《Python利用自带模块实现屏幕像素高效操作》这篇文章主要为大家详细介绍了Python如何利用自带模块实现屏幕像素高效操作,文中的示例代码讲解详,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1、获取屏幕放缩比例2、获取屏幕指定坐标处像素颜色3、一个简单的使用案例4、总结1、获取屏幕放缩比例from

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate