vnpy源码学习记录(3) ----------CTP网关

2023-11-01 11:20

本文主要是介绍vnpy源码学习记录(3) ----------CTP网关,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

CTP网关

  • CTP网关
    • 1. 实例化CTP网关
    • 1.2 连接CTP
      • 连接交易服务器
      • 连接行情服务器
      • 轮询
      • 对数据的获取及处理

介绍CTP网关之前先看看网关类的抽象类:BaseGateway
BaseGateway用于创建与不同交易系统的网关连接的抽象网关类。该类必须是线程安全的(所有方法必须线程安全,对象之间没有可变的共享属性);所有方法都需要是非阻塞的;每个方法和回调方法都需满足在docstring中描述的需求;如果连接断开会自动重新连接。
所有的方法必须要实现所有的@abstractmethod
回调函数必须手动响应
on_tick on_trade on_order on_position on_account on_contract
所有传给回调函数的XxxData类必须是常量,也就是说传给on_xxx函数的对象就不该被修改了。如果使用缓存来保存数据的引用,在传递数据到on_xx方法前使用copy.copy来创建一个新对象。
default_setting和exchanges属性保存连接网关的信息和网关支持的交易所
__init__()中初始化事件引擎和网关名

self.event_engine = event_engine
self.gateway_name = gateway_name

vnpy覆盖了国内外所有交易品种的交易接口(vnpy.gateway目录下)。这里暂时只介绍CTP和RPC两种网关

CTP网关

CtpGateway vnpy/gateway/ctp/ctp_gateway.py

1. 实例化CTP网关

main_engine.add_gateway(CtpGateway)  # 将ctp添加到主引擎中
   def add_gateway(self, gateway_class: BaseGateway):gateway = gateway_class(self.event_engine)  # 实例化  self.gateways[gateway.gateway_name] = gateway 添加进网关字典#将网关支持的交易所添加到引擎          for exchange in gateway.exchanges:if exchange not in self.exchanges:self.exchanges.append(exchange)return gateway

CtpGateway继承BaseGateway,super().__init__(event_engine, "CTP") ,设置网关名为”CTP”。首先加载default_setting和exchanges信息,default_setting初始化CTP连接信息,CTP接口支持"CFFEX" “SHFE” “CZCE” “DCE” "INE"这5种交易所,加载到exchanges属性。类里面初始化了两个对象,分别对交易api和行情api:

self.td_api = CtpTdApi(self)
self.md_api = CtpMdApi(self)

CtpTdApi和CtpMdApi分别继承MdApi和TdApi。这两个类里包含了vnpy封装的ctp接口。
main_engine.connect(setting, "CTP") 连接CTP setting属性为连接的信息 格式如下:
在这里插入图片描述

1.2 连接CTP

在主引擎中调用connect方法,先找出当前的网关接口,然后通过网关接口调用网关内的connect方法:

def connect(self, setting: dict, gateway_name: str): gateway = self.get_gateway(gateway_name)if gateway:gateway.connect(setting, self)

此处在CtpGateway中调用connect()方法,先解析出setting属性,即CTP连接信息,然后处理连接地址,再分别连接交易服务器和行情服务器,并启动轮询:

def connect(self, setting: dict):user_id = setting["用户名"]password = setting["密码"]broker_id = setting["经纪商代码"]td_address = setting["交易服务器"]md_address = setting["行情服务器"]app_id = setting["产品名称"]auth_code = setting["授权编码"]product_info = setting["产品信息"]if not td_address.startswith("tcp://"):td_address = "tcp://" + td_addressif not md_address.startswith("tcp://"):md_address = "tcp://" + md_addressself.td_api.connect(td_address, user_id, password, broker_id, auth_code, app_id)self.md_api.connect(md_address, user_id, password, broker_id)self.init_query()

连接交易服务器

首先将参数设置为实例对象属性,然后对connect_status(服务器连接状态,默认为False)进行判断

if not self.connect_status:
path = get_folder_path(self.gateway_name.lower())
# 创建TdApi  存贮订阅信息文件的目录,默认为当前目录.返回创建出的用户Apiself.createFtdcTraderApi(str(path) + "\\Td")  self.subscribePrivateTopic(0)  # 订阅私有流self.subscribePublicTopic(0)  # 订阅公有流self.registerFront(address)  # 注册前置机self.init()  # 初始化交易服务器self.connect_status = True
else:self.authenticate()

方法self.init()初始化服务器,在CTP后台调用processTask方法,初始化时方法执行的操作有:交易服务器连接、交易服务器授权验证、交易服务器登录、结算信息、合约信息查询
然后将self.connect_status设置为True
上面初始化时执行的操作在vnpy中都有回调函数,分别为:onFrontConnectedonRspAuthenticateonRspUserLoginonRspSettlementInfoConfirmonRspQryInstrument
如果self.connect_status判断为True(服务器中途断开后进行重新连接时会出现该情况),则手动执行操作。先进行交易服务器授权验证self.authenticate()

self.reqAuthenticate(req, self.reqid)

然后验证方法回调之后进行登录操作。再一步步往下执行。
如中途服务器断开回调函数onFrontDisconnected将被调用,在方法里将login_status设为False
CTP中所有的请求都是按照reqXxx的格式,所有的回调都是按照onRspXxx的格式

连接行情服务器

行情服务器和交易服务器类似,不同的是交易服务器只需要连接和登录

轮询

轮询方法init_query是用于定时查询账户信息和持仓信息的。

def init_query(self):self.count = 0# 将查询账户信息和持仓信息的方法放入字典中,然后注册计时器,每秒钟调用process_timer_event方法self.query_functions = [self.query_account, self.query_position]self.event_engine.register(EVENT_TIMER, self.process_timer_event)

process_timer_event方法中,对count的操作表示每两秒执行一次process_timer_event方法,然后弹出query_functions的首项并执行,再将该方法放入query_functions的尾部:

def process_timer_event(self, event):self.count += 1if self.count < 2:returnself.count = 0func = self.query_functions.pop(0)func()self.query_functions.append(func)

CTP中所有的交易服务器的函数可以在vnpy/api/ctp/vnctp/vnctptd/vnctptd.cpp中找到
行情服务器的函数可以在vnpy/api/ctp/vnctp/vnctpmd/vnctpmd.cpp中找到

对数据的获取及处理

通过CTP接口查询到信息以后,可以对数据进行自定义处理。下面以回报的合约信息为例。
onRspQryInstrument回调函数返回了通过CTP接口查询到的合约数据,对数据做处理之后执行self.gateway.on_contract(contract)语句,调用了CtpGateway的父类BaseGateway中的on_contract()方法。
方法中对合约事件进行推送:

self.on_event(EVENT_CONTRACT, contract)

约信息就通过on_event在事件引擎中被推送(放入队列),前面在事件引擎中介绍过的_run()方法中,只要引擎启动了,引擎就会一直从队列中取出事件,并处理它。

这篇关于vnpy源码学习记录(3) ----------CTP网关的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/322605

相关文章

Spring Boot 配置文件之类型、加载顺序与最佳实践记录

《SpringBoot配置文件之类型、加载顺序与最佳实践记录》SpringBoot的配置文件是灵活且强大的工具,通过合理的配置管理,可以让应用开发和部署更加高效,无论是简单的属性配置,还是复杂... 目录Spring Boot 配置文件详解一、Spring Boot 配置文件类型1.1 applicatio

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

MySQL INSERT语句实现当记录不存在时插入的几种方法

《MySQLINSERT语句实现当记录不存在时插入的几种方法》MySQL的INSERT语句是用于向数据库表中插入新记录的关键命令,下面:本文主要介绍MySQLINSERT语句实现当记录不存在时... 目录使用 INSERT IGNORE使用 ON DUPLICATE KEY UPDATE使用 REPLACE

Python 中的异步与同步深度解析(实践记录)

《Python中的异步与同步深度解析(实践记录)》在Python编程世界里,异步和同步的概念是理解程序执行流程和性能优化的关键,这篇文章将带你深入了解它们的差异,以及阻塞和非阻塞的特性,同时通过实际... 目录python中的异步与同步:深度解析与实践异步与同步的定义异步同步阻塞与非阻塞的概念阻塞非阻塞同步

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Spring Boot中定时任务Cron表达式的终极指南最佳实践记录

《SpringBoot中定时任务Cron表达式的终极指南最佳实践记录》本文详细介绍了SpringBoot中定时任务的实现方法,特别是Cron表达式的使用技巧和高级用法,从基础语法到复杂场景,从快速启... 目录一、Cron表达式基础1.1 Cron表达式结构1.2 核心语法规则二、Spring Boot中定

Spring 中 BeanFactoryPostProcessor 的作用和示例源码分析

《Spring中BeanFactoryPostProcessor的作用和示例源码分析》Spring的BeanFactoryPostProcessor是容器初始化的扩展接口,允许在Bean实例化前... 目录一、概览1. 核心定位2. 核心功能详解3. 关键特性二、Spring 内置的 BeanFactory

Java进阶学习之如何开启远程调式

《Java进阶学习之如何开启远程调式》Java开发中的远程调试是一项至关重要的技能,特别是在处理生产环境的问题或者协作开发时,:本文主要介绍Java进阶学习之如何开启远程调式的相关资料,需要的朋友... 目录概述Java远程调试的开启与底层原理开启Java远程调试底层原理JVM参数总结&nbsMbKKXJx

国内环境搭建私有知识问答库踩坑记录(ollama+deepseek+ragflow)

《国内环境搭建私有知识问答库踩坑记录(ollama+deepseek+ragflow)》本文给大家利用deepseek模型搭建私有知识问答库的详细步骤和遇到的问题及解决办法,感兴趣的朋友一起看看吧... 目录1. 第1步大家在安装完ollama后,需要到系统环境变量中添加两个变量2. 第3步 “在cmd中

Spring Retry 实现乐观锁重试实践记录

《SpringRetry实现乐观锁重试实践记录》本文介绍了在秒杀商品SKU表中使用乐观锁和MybatisPlus配置乐观锁的方法,并分析了测试环境和生产环境的隔离级别对乐观锁的影响,通过简单验证,... 目录一、场景分析 二、简单验证 2.1、可重复读 2.2、读已提交 三、最佳实践 3.1、配置重试模板