在进行行情 tick 数据存储时,哪种数据结构查找起来更快?

2023-10-09 18:40

本文主要是介绍在进行行情 tick 数据存储时,哪种数据结构查找起来更快?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

小 T 导读:如果我们要做行情 tick 数据的存储,怎样的数据结构查找起来才会比较快?在加入 TDengine 之前,本文作者丁博在弘源泰平量化投资做量化工程师,曾经遇到过这一类存储行情 tick 数据的问题,本文会就此问题进行详细的技术解读。

本文将以标准 CTP 行情接口(http://www.sfit.com.cn/5_1_DocumentDown.htm)为例,假设行情结构为 CThostFtdcDepthMarketDataField(https://mckelv.in/python-ctp-deps/struct_c_thost_ftdc_depth_market_data_field.html),展开说明。

内存存储方案

如果你的需求仅仅是盘中实时分析,且监控的 Instrument(CTP 接口对现货、期货、期权等合约的统称, 以下简称【合约】) 总数不多,则可以直接使用内存存储。通常只有超高频交易系统才必须这么做。内存存储也有很多可选方案,其中有两大方案较为通用。

两级 map 方案

第一级 map 的类型为 std::unordered_map,键为 InstrumentID, 值为第二级 map 的指针。第二级 map 的类型为 std::map,键为行情时间戳,值为行情结构体。(注:行情时间戳需要根据 UpdateTime 和 UpdateMillisec 两个字段构造一个类型为 long 的毫秒值)。 std::unordered_map 底层依赖的数据结构是哈希表,按 key 索引速度是最快的。std::map 底层的数据结构是二叉树搜索树,可以严格按照 key 的大小顺序迭代全部或某一段数据。 总体而言这个数据结构的优势是: 快速查找某个合约某个时间点或某个时间段返回的行情。这是后续做交易信号计算的基础。

#include "ThostFtdcUserApiStruct.h"
#include "ThostFtdcUserApiDataType.h"
#include <map>
#include <unordered_map>
using namespace std;
int main()
{unordered_map<TThostFtdcInstrumentIDType, map<long, CThostFtdcDepthMarketDataField>*> tickData;
}

map + array

由于每种合约每天的标准行情 tick 总数都是固定的(个别交易所除外),因此我们可以提前初始化好一个数组来存行情。按每秒 2 个 tick 算(500 毫秒一个点),标准行情的长度可能是 28800。当收到行情通知时,行情时间距离哪个标准 tick 点最近就归为哪个 tick。比如行情时间是 9 点 50 分 20 秒 133 毫秒,那么可以当作 9 点 50 分 20 秒 0 毫秒的行情。如果出现前后两个 tick 时间大于 500 毫秒的情况,那就还需要补全中间空缺的行情,相当于边收行情边做标准化操作。这样做的优势是:

  1. 交易策略通常会依赖标准化的行情计算交易信号,收行情和标准化并作一步会更节省时间。
  2. 可以直接用数组下标索引对应时间的行情,查找的时间复杂度为 O(1)。
#include "ThostFtdcUserApiStruct.h"
#include "ThostFtdcUserApiDataType.h"
#include <unordered_map>
#include <array>
using namespace std;
int main()
{unordered_map<TThostFtdcInstrumentIDType, array<CThostFtdcDepthMarketDataField, 28800>> tickData;
}

持久化存储方案

无论是否做超高频交易,持久化存储行情都是有必要的。通常持久化存储为的是进行盘后复盘分析, 因为在大数据量下,传统的存储方案(MongoDB、MySQL、直接存文件等等)很快就会遇到性能瓶颈(无论是读还是写),不适合做盘中的计算。近年来,时序数据库(Time-Series Database)异军突起,使得盘中盘后使用一种存储方案成为可能。特别是像 TDengine 这样带有缓存功能、消息队列功能和集群功能的时序数据库,用来存行情是非常合适。下面我将以 TDengine Database 为例为大家介绍持久化存储方案。

下载 TDengine Database Server

在下载阶段,不同的系统使用的安装包也有所不同,Ubuntu 系统用 deb 包, CentOS 系统用 RPM 包。下载地址为: All Downloads – TDengine。

安装并启动

Ubuntu

sudo dpkg -i TDengine-server-2.4.0.7-Linux-x64.deb

CentOS

sudo rpm -ivh TDengine-server-2.4.0.7-Linux-x64.rpm

安装成功后,如何启动 TDengine Database 的提示信息就会自动弹出,照着操作就可以。

建行情表

由于所有行情的结构都是一样的,因此只需要一张超级表进行行情建表即可,其中每个合约对应一张子表,InstrumentID 作为子表名,交易所代码作为一个行情标签。为了方便演示,下面的示例只包含了 4 个行情字段:

  • 进入 taos 命令行
bo@RDBB:~$ taos
Welcome to the TDengine shell from Linux, Client Version:2.4.0.12
Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.
  • 执行下面的语句
create database marketdata;
use marketdata;
create stable tick(ts timestamp,updatetime binary(9),updatemillisec int,askprice1 double,bidprice1 double,askvolume1 int,bidvolume1 int
) tags (exchangeid binary(9));
  • 查看表结构
taos> desc tick;Field              |         Type         |   Length    |   Note   |
=================================================================================ts                             | TIMESTAMP            |           8 |          |updatetime                     | BINARY               |           9 |          |updatemillisec                 | INT                  |           4 |          |askprice1                      | DOUBLE               |           8 |          |bidprice1                      | DOUBLE               |           8 |          |askvolume1                     | INT                  |           4 |          |bidvolume1                     | INT                  |           4 |          |exchangeid                     | BINARY               |           9 | TAG      |
Query OK, 8 row(s) in set (0.000378s)

写入行情

#include "ThostFtdcUserApiStruct.h"
#include "ThostFtdcUserApiDataType.h"
#include "taos.h"
#include "taoserror.h"
#include <iostream>
#include <sstream>
using namespace std;
void insertTickData(TAOS* taos, CThostFtdcDepthMarketDataField &tick) {stringstream sql;// 会自动创建子表tick.InstrumentIDsql << "insert into " << tick.InstrumentID << " using tick tags("<< tick.ExchangeID << ") values(now, '" << tick.UpdateTime << "', "<< tick.UpdateMillisec << "," << tick.AskPrice1 << "," << tick.BidPrice1<< "," << tick.AskVolume1 << "," << tick.BidVolume1 << ")";TAOS_RES *res = taos_query(taos, sql.str().c_str());if (res == nullptr || taos_errno(res) != 0) {cerr << "insertTitckData failed," << taos_errno(res) << ", " << taos_errstr(res) << endl;}
}
int main()
{TAOS *taos = taos_connect("localhost", "root", "taosdata", "marketdata", 6030);// 构造测试数据CThostFtdcDepthMarketDataField tick;strcpy_s(tick.InstrumentID, "IH2209");strcpy_s(tick.UpdateTime, "14:10:32");strcpy_s(tick.ExchangeID, "DEC");tick.UpdateMillisec = 500;tick.AskPrice1 = 123.8;tick.BidPrice1 = 123.4;tick.AskVolume1 = 10;tick.BidVolume1 = 9;// 写入测试数据insertTickData(taos, tick);taos_close(taos);
}

查询最新的行情

TDengine 对每个表的最新数据都有缓存功能,无需再读磁盘,使用 last 函数就能快速获取。

#include "ThostFtdcUserApiStruct.h"
#include "ThostFtdcUserApiDataType.h"
#include "taos.h"
#include "taoserror.h"
#include <string>
#include <iostream>
using namespace std;
CThostFtdcDepthMarketDataField* getLastTick(TAOS* taos, const char* instrumentID) {string sql("select last(*) from ");sql += instrumentID;TAOS_RES* res = taos_query(taos, sql.c_str());if (res == nullptr || taos_errno(res) != 0) {cerr << "getLastTick failed," << taos_errno(res) << ", " << taos_errstr(res) << endl;return nullptr;} TAOS_ROW row = taos_fetch_row(res);        if (row == nullptr) {return nullptr;}CThostFtdcDepthMarketDataField* tick = new CThostFtdcDepthMarketDataField();//int64_t ts = *((int64_t*)row[0]);memcpy(tick->UpdateTime, row[1], 9);tick->UpdateMillisec = *(int*)row[2];tick->AskPrice1 = *((double *)row[3]);tick->BidPrice1 = *((double*)row[4]);taos_free_result(res);return tick;
}
int main() {TAOS* taos = taos_connect("localhost", "root", "taosdata", "marketdata", 6030);CThostFtdcDepthMarketDataField* tick = getLastTick(taos, "IH2209");cout << "askPrice1=" << tick->AskPrice1 << " bidPrice1=" << tick->BidPrice1 << endl;delete tick;taos_close(taos);
}

 

以上两个示例程序,展示了写入和查询的方法。结合 TDengine 内置的查询函数和按窗口聚合功能,可实现更多功能,比如:

  1. 使用 MAX、 FIRST、 MIN、 LAST 四个 SQL 函数计算 K 线上高、开、低、收四个价位。
  2. 使用 INTERVAL 和 SLIDING 查询子句和 AVG 函数计算移动均价。此处不再给出具体示例,可参考官方文档。 

从实际业务出发的实践经验分享

除了上述内容外,TDengine Database 还有非常丰富的分析函数,如果你感兴趣的话建议参考官方文档。此外,在 TDengine 的实际应用中,也有很多客户的实践是关于量化投资场景中的数据处理。

以同花顺为例,其每天都需要接收海量交易所行情数据,以确保行情数据的数据准确,但由于该部分数据过于庞大,而且使用场景颇多,因此每天会产生很多的加工数据,在组合管理(PMS)上还会使用到历史行情数据。之前他们采用的是 Postgres+LevelDB 作为数据的存储方案,但仍旧痛点频发,随后通过对数据流、行情获取模块的分析,发现目前主要存在以下两个亟需解决的问题:

  • 依赖多,稳定性较差:PMS作为多品种的投后分析服务, 需要使用到各种日线数据、当天实时行情数据、当天分钟数据等,在数据获取方面需要依赖Http以及Postgres、LevelDB等数据库。过于多的数据获取链路会导致平台可靠性降低,同时依赖于其他各个服务,导致查询问题过于复杂。
  • 性能不能满足需求: PMS作为多品种投后分析,在算法分析层面需要大量的行情获取,而且对行情获取的性能也有较大的要求,当前所有行情会占据大量分析的性能。

从业务发展的角度来讲,存储方案的改造迫在眉睫,之后同花顺开始对 ClickHouse、InfluxDB、TDengine 等数据存储方案进行调研。由于行情数据是绑定时间戳的形式,所以显然时序数据库更适用于这个业务场景,在 InfluxDB 和 TDengine 之间,由于 TDengine 的写入速度远高于 InfluxDB,且集群版开源,同时还支持包含 C/C++、Java、Python、Go 和 RESTful 在内的多种数据接口,因此成为同花顺的最终选用方案。

改造之后的性能效果提升还是非常明显的,下图是同花顺做的一张改造前后性能对比图,可以更为直观地感受到效果提升:

同时改造后,稳定性也显著增强,改造前调用数据情况共 40W 次,共出现 0.01% 的异常,改造后出现异常降低至 0.001%。

在 TDengine Database 官网的 Case 合集中,还有弘源泰平量化、同心源基金等几篇聚焦投资量化场景下数据处理难题的客户案例,由于篇幅所限,便不在此一一列举了,有需要的朋友可以去官网查找文章进行参考。如果还有投资量化场景下其他的数据处理难题,也欢迎在文章下方进行留言,我们后续可以加微信进行详细讨论和沟通。


想了解更多 TDengine Database的具体细节,欢迎大家在GitHub上查看相关源代码。

 

 

这篇关于在进行行情 tick 数据存储时,哪种数据结构查找起来更快?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/174793

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

【数据结构】——原来排序算法搞懂这些就行,轻松拿捏

前言:快速排序的实现最重要的是找基准值,下面让我们来了解如何实现找基准值 基准值的注释:在快排的过程中,每一次我们要取一个元素作为枢纽值,以这个数字来将序列划分为两部分。 在此我们采用三数取中法,也就是取左端、中间、右端三个数,然后进行排序,将中间数作为枢纽值。 快速排序实现主框架: //快速排序 void QuickSort(int* arr, int left, int rig

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi