从零开始搭建链上dex自动化价差套利程序(11)

2023-12-09 19:45

本文主要是介绍从零开始搭建链上dex自动化价差套利程序(11),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

风险控制

需要将仓位杠杆控制到3倍以内,由于dydx与apex没有获取仓位杠杆的接口,但是每次发送交易的数额可以决定,故而可以设置每次发送总仓位1.5倍杠杆的数额,然后设置一个变量保证每个方向上的交易不超过2次,即可保证总仓位始终小于3倍杠杆

细节:

send_order_apex(client_apex, symbol=“BTC-USDC”, side=“BUY”,type=“MARKET”,size=“0.004”, expirationEpochSeconds=currentTime+100,price=’58888’, limitFeeRate=limitFeeRate)

在apex市价交易参数里,price代表可接受的价格,故而当卖出时,此price要尽可能的调低,否则会失败,同理买进时要尽可能的高。

同时将价差计算修改为:

 # 计算价差
    spread1 = ((float(b_first_price_apex) -float(s_first_price_dydx))/float(b_first_price_apex))*100
    spread2 = ((float(b_first_price_dydx) - float(s_first_price_apex))/float(b_first_price_dydx))*100 

因为如果在apex卖,dydx买的话,apex的卖价应该大于dydx的买价,apex的卖价由apex买一价决定,dydx买价由dydx卖一价决定。反之同理。

代码修改如下:

get_depth_data_btc.py

"""
这是一个用来计算 APEX 和 dydx 之间的 BTCUSDC 价差的模块。
可以调用 calculate_spread 函数来返回两个交易所的卖一价、买一价和价差。
"""import asyncio
from apexpro.http_public import HttpPublic
from dydx3 import Client
from dydx3.constants import MARKET_BTC_USD# 定义交易对列表
symbol = 'BTCUSDC'
market = MARKET_BTC_USD# 定义异步函数来获取 APEX 的价格
async def get_apex_price():# 初始化API客户端apexclient = HttpPublic("https://pro.apex.exchange")# 获取深度数据trades_data = apexclient.depth(symbol=symbol)['data']# 返回卖一价和买一价return trades_data['a'][0][0], trades_data['b'][0][0], trades_data['a'][0][1], trades_data['b'][0][1]# 定义异步函数来获取 dydx 的价格
async def get_dydx_price():# 初始化API客户端dydxclient = Client(host='https://api.dydx.exchange')# 获取深度数据orderbook_response = dydxclient.public.get_orderbook(market=market)orderbook_data = orderbook_response.data# 返回卖一价和买一价return orderbook_data['asks'][0]['price'], orderbook_data['bids'][0]['price'], orderbook_data['asks'][0]['size'], orderbook_data['bids'][0]['size']# 定义异步函数来计算价差
async def calculate_spread():# 创建两个任务,分别获取 APEX 和 dydx 的价格task1 = asyncio.create_task(get_apex_price())task2 = asyncio.create_task(get_dydx_price())# 等待两个任务完成,并获取结果s_first_price_apex, b_first_price_apex,s_first_size_apex,b_first_size_apex = await task1s_first_price_dydx, b_first_price_dydx,s_first_size_dydx,b_first_size_dydx   = await task2# 计算价差spread1 = ((float(b_first_price_apex) - float(s_first_price_dydx))/float(b_first_price_apex))*100spread2 = ((float(b_first_price_dydx) - float(s_first_price_apex))/float(b_first_price_dydx))*100return s_first_price_apex,b_first_price_apex,s_first_price_dydx,b_first_price_dydx,s_first_size_apex,b_first_size_apex,s_first_size_dydx,b_first_size_dydx,spread1,spread2if __name__ == '__main__':# 创建事件循环loop = asyncio.get_event_loop()# 运行异步函数loop.run_until_complete(calculate_spread())# 关闭事件循环loop.close()

place_order_btc.py

from init_apex_client import init_client
import asyncio
from send_order_apex import send_order_apex
from init_dydx_client import init_dydx_client
from send_order_dydx import send_order_dydx
from dydx3.constants import MARKET_BTC_USD
from dydx3.constants import ORDER_SIDE_BUY,ORDER_SIDE_SELL
from dydx3.constants import ORDER_TYPE_MARKET,ORDER_TYPE_LIMIT
from get_depth_data_btc import calculate_spread
import time#价格设置需要更精确,不然发不出去!# 初始化apex客户端
client_apex = init_client()
configs = client_apex.configs()
# 获取apex用户和账户信息
client_apex.get_user()
client_apex.get_account()# 初始化dydx客户端
client_dydx = init_dydx_client()
# 获取我们的dydx仓位 ID
account_response = client_dydx.private.get_account()
position_id = account_response.data['account']['positionId']async def arbitrage():arbitrage_count = 0while True:# 计算价差s_first_price_apex,b_first_price_apex,s_first_price_dydx,b_first_price_dydx,s_first_size_apex,b_first_size_apex,s_first_size_dydx,b_first_size_dydx,spread1,spread2 = await calculate_spread()# 根据价差判断是否发送交易if spread1 > 0.7:if arbitrage_count <2:currentTime = time.time()limitFeeRate = client_apex.account['takerFeeRate']task_apex_sell = asyncio.create_task(send_order_apex(client_apex, symbol="BTC-USDC", side="SELL",type="MARKET", size="0.004", expirationEpochSeconds=currentTime+100,price='18888', limitFeeRate=limitFeeRate))task_dydx_buy = asyncio.create_task(send_order_dydx(client_dydx, position_id, MARKET_BTC_USD, ORDER_SIDE_BUY, ORDER_TYPE_LIMIT,True, '0.004', b_first_price_dydx, '0.0015', currentTime+100))orderResult1 = await task_apex_sellorderResult2 = await task_dydx_buyarbitrage_count += 1if arbitrage_count >=2:print('above leverage ,stop')print(orderResult1,orderResult2)if spread2 > 0.7: if arbitrage_count >-2:currentTime = time.time()# 异步地发送一个apex市价买单和一个dydx市价卖单limitFeeRate = client_apex.account['takerFeeRate']task_apex_buy = asyncio.create_task(send_order_apex(client_apex, symbol="BTC-USDC", side="BUY",type="MARKET", size="0.004", expirationEpochSeconds=currentTime+100,price='58888', limitFeeRate=limitFeeRate))task_dydx_sell = asyncio.create_task(send_order_dydx(client_dydx, position_id, MARKET_BTC_USD, ORDER_SIDE_SELL, ORDER_TYPE_LIMIT,True, '0.004', s_first_price_dydx, '0.0015', currentTime+100))orderResult1 = await task_apex_buyorderResult2 = await task_dydx_sellarbitrage_count -= 1if arbitrage_count <=-2:print('above leverage ,stop')print(orderResult1,orderResult2)# 延时一秒,避免过于频繁await asyncio.sleep(1)# 运行异步函数
asyncio.run(arbitrage())

持续运行:

写一个脚本确保因为各种异常程序退出后能够重启:

import subprocess
import timedef run_program():# 这里替换为你需要执行的程序命令process = subprocess.Popen(["python", "place_order_btc.py"])  # 例如:python your_program.pyreturn processif __name__ == "__main__":while True:program = run_program()while program.poll() is None:# 程序正在运行time.sleep(5)  # 每5秒检查一次程序状态# 程序已终止,等待一段时间后重启print("程序已终止,重新启动中...")time.sleep(3)  # 等待3秒

这篇关于从零开始搭建链上dex自动化价差套利程序(11)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/474860

相关文章

在不同系统间迁移Python程序的方法与教程

《在不同系统间迁移Python程序的方法与教程》本文介绍了几种将Windows上编写的Python程序迁移到Linux服务器上的方法,包括使用虚拟环境和依赖冻结、容器化技术(如Docker)、使用An... 目录使用虚拟环境和依赖冻结1. 创建虚拟环境2. 冻结依赖使用容器化技术(如 docker)1. 创

本地搭建DeepSeek-R1、WebUI的完整过程及访问

《本地搭建DeepSeek-R1、WebUI的完整过程及访问》:本文主要介绍本地搭建DeepSeek-R1、WebUI的完整过程及访问的相关资料,DeepSeek-R1是一个开源的人工智能平台,主... 目录背景       搭建准备基础概念搭建过程访问对话测试总结背景       最近几年,人工智能技术

10个Python自动化办公的脚本分享

《10个Python自动化办公的脚本分享》在日常办公中,我们常常会被繁琐、重复的任务占据大量时间,本文为大家分享了10个实用的Python自动化办公案例及源码,希望对大家有所帮助... 目录1. 批量处理 Excel 文件2. 自动发送邮件3. 批量重命名文件4. 数据清洗5. 生成 PPT6. 自动化测试

10个Python Excel自动化脚本分享

《10个PythonExcel自动化脚本分享》在数据处理和分析的过程中,Excel文件是我们日常工作中常见的格式,本文将分享10个实用的Excel自动化脚本,希望可以帮助大家更轻松地掌握这些技能... 目录1. Excel单元格批量填充2. 设置行高与列宽3. 根据条件删除行4. 创建新的Excel工作表5

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加

MySQL分表自动化创建的实现方案

《MySQL分表自动化创建的实现方案》在数据库应用场景中,随着数据量的不断增长,单表存储数据可能会面临性能瓶颈,例如查询、插入、更新等操作的效率会逐渐降低,分表是一种有效的优化策略,它将数据分散存储在... 目录一、项目目的二、实现过程(一)mysql 事件调度器结合存储过程方式1. 开启事件调度器2. 创

Python Invoke自动化任务库的使用

《PythonInvoke自动化任务库的使用》Invoke是一个强大的Python库,用于编写自动化脚本,本文就来介绍一下PythonInvoke自动化任务库的使用,具有一定的参考价值,感兴趣的可以... 目录什么是 Invoke?如何安装 Invoke?Invoke 基础1. 运行测试2. 构建文档3.

Windows自动化Python pyautogui RPA操作实现

《Windows自动化PythonpyautoguiRPA操作实现》本文详细介绍了使用Python的pyautogui库进行Windows自动化操作的实现方法,文中通过示例代码介绍的非常详细,对大... 目录依赖包睡眠:鼠标事件:杀死进程:获取所有窗口的名称:显示窗口:根据图片找元素:输入文字:打开应用:依

Mycat搭建分库分表方式

《Mycat搭建分库分表方式》文章介绍了如何使用分库分表架构来解决单表数据量过大带来的性能和存储容量限制的问题,通过在一对主从复制节点上配置数据源,并使用分片算法将数据分配到不同的数据库表中,可以有效... 目录分库分表解决的问题分库分表架构添加数据验证结果 总结分库分表解决的问题单表数据量过大带来的性能

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步