本文主要是介绍vnpy源码学习记录(3) ----------CTP网关,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
CTP网关
- CTP网关
- 1. 实例化CTP网关
- 1.2 连接CTP
- 连接交易服务器
- 连接行情服务器
- 轮询
- 对数据的获取及处理
介绍CTP网关之前先看看网关类的抽象类:BaseGateway
BaseGateway用于创建与不同交易系统的网关连接的抽象网关类。该类必须是线程安全的(所有方法必须线程安全,对象之间没有可变的共享属性);所有方法都需要是非阻塞的;每个方法和回调方法都需满足在docstring中描述的需求;如果连接断开会自动重新连接。
所有的方法必须要实现所有的@abstractmethod
回调函数必须手动响应
on_tick
on_trade
on_order
on_position
on_account
on_contract
所有传给回调函数的XxxData类必须是常量,也就是说传给on_xxx函数的对象就不该被修改了。如果使用缓存来保存数据的引用,在传递数据到on_xx方法前使用copy.copy来创建一个新对象。
default_setting和exchanges属性保存连接网关的信息和网关支持的交易所
__init__()
中初始化事件引擎和网关名
self.event_engine = event_engine
self.gateway_name = gateway_name
vnpy覆盖了国内外所有交易品种的交易接口(vnpy.gateway目录下)。这里暂时只介绍CTP和RPC两种网关
CTP网关
CtpGateway vnpy/gateway/ctp/ctp_gateway.py
1. 实例化CTP网关
main_engine.add_gateway(CtpGateway) # 将ctp添加到主引擎中
def add_gateway(self, gateway_class: BaseGateway):gateway = gateway_class(self.event_engine) # 实例化 self.gateways[gateway.gateway_name] = gateway 添加进网关字典#将网关支持的交易所添加到引擎 for exchange in gateway.exchanges:if exchange not in self.exchanges:self.exchanges.append(exchange)return gateway
CtpGateway继承BaseGateway,super().__init__(event_engine, "CTP")
,设置网关名为”CTP”。首先加载default_setting和exchanges信息,default_setting初始化CTP连接信息,CTP接口支持"CFFEX" “SHFE” “CZCE” “DCE” "INE"这5种交易所,加载到exchanges属性。类里面初始化了两个对象,分别对交易api和行情api:
self.td_api = CtpTdApi(self)
self.md_api = CtpMdApi(self)
CtpTdApi和CtpMdApi分别继承MdApi和TdApi。这两个类里包含了vnpy封装的ctp接口。
main_engine.connect(setting, "CTP")
连接CTP setting属性为连接的信息 格式如下:
1.2 连接CTP
在主引擎中调用connect方法,先找出当前的网关接口,然后通过网关接口调用网关内的connect方法:
def connect(self, setting: dict, gateway_name: str): gateway = self.get_gateway(gateway_name)if gateway:gateway.connect(setting, self)
此处在CtpGateway中调用connect()方法,先解析出setting属性,即CTP连接信息,然后处理连接地址,再分别连接交易服务器和行情服务器,并启动轮询:
def connect(self, setting: dict):user_id = setting["用户名"]password = setting["密码"]broker_id = setting["经纪商代码"]td_address = setting["交易服务器"]md_address = setting["行情服务器"]app_id = setting["产品名称"]auth_code = setting["授权编码"]product_info = setting["产品信息"]if not td_address.startswith("tcp://"):td_address = "tcp://" + td_addressif not md_address.startswith("tcp://"):md_address = "tcp://" + md_addressself.td_api.connect(td_address, user_id, password, broker_id, auth_code, app_id)self.md_api.connect(md_address, user_id, password, broker_id)self.init_query()
连接交易服务器
首先将参数设置为实例对象属性,然后对connect_status(服务器连接状态,默认为False)进行判断
if not self.connect_status:
path = get_folder_path(self.gateway_name.lower())
# 创建TdApi 存贮订阅信息文件的目录,默认为当前目录.返回创建出的用户Apiself.createFtdcTraderApi(str(path) + "\\Td") self.subscribePrivateTopic(0) # 订阅私有流self.subscribePublicTopic(0) # 订阅公有流self.registerFront(address) # 注册前置机self.init() # 初始化交易服务器self.connect_status = True
else:self.authenticate()
方法self.init()
初始化服务器,在CTP后台调用processTask方法,初始化时方法执行的操作有:交易服务器连接、交易服务器授权验证、交易服务器登录、结算信息、合约信息查询
然后将self.connect_status设置为True
上面初始化时执行的操作在vnpy中都有回调函数,分别为:onFrontConnected
、onRspAuthenticate
、onRspUserLogin
、onRspSettlementInfoConfirm
、onRspQryInstrument
如果self.connect_status判断为True(服务器中途断开后进行重新连接时会出现该情况),则手动执行操作。先进行交易服务器授权验证self.authenticate()
:
self.reqAuthenticate(req, self.reqid)
然后验证方法回调之后进行登录操作。再一步步往下执行。
如中途服务器断开回调函数onFrontDisconnected将被调用,在方法里将login_status设为False
CTP中所有的请求都是按照reqXxx的格式,所有的回调都是按照onRspXxx的格式
连接行情服务器
行情服务器和交易服务器类似,不同的是交易服务器只需要连接和登录
轮询
轮询方法init_query是用于定时查询账户信息和持仓信息的。
def init_query(self):self.count = 0# 将查询账户信息和持仓信息的方法放入字典中,然后注册计时器,每秒钟调用process_timer_event方法self.query_functions = [self.query_account, self.query_position]self.event_engine.register(EVENT_TIMER, self.process_timer_event)
process_timer_event方法中,对count的操作表示每两秒执行一次process_timer_event方法,然后弹出query_functions的首项并执行,再将该方法放入query_functions的尾部:
def process_timer_event(self, event):self.count += 1if self.count < 2:returnself.count = 0func = self.query_functions.pop(0)func()self.query_functions.append(func)
CTP中所有的交易服务器的函数可以在vnpy/api/ctp/vnctp/vnctptd/vnctptd.cpp中找到
行情服务器的函数可以在vnpy/api/ctp/vnctp/vnctpmd/vnctpmd.cpp中找到
对数据的获取及处理
通过CTP接口查询到信息以后,可以对数据进行自定义处理。下面以回报的合约信息为例。
onRspQryInstrument
回调函数返回了通过CTP接口查询到的合约数据,对数据做处理之后执行self.gateway.on_contract(contract)
语句,调用了CtpGateway的父类BaseGateway中的on_contract()
方法。
方法中对合约事件进行推送:
self.on_event(EVENT_CONTRACT, contract)
约信息就通过on_event在事件引擎中被推送(放入队列),前面在事件引擎中介绍过的_run()方法中,只要引擎启动了,引擎就会一直从队列中取出事件,并处理它。
这篇关于vnpy源码学习记录(3) ----------CTP网关的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!