// FutureDataReceive.cpp : 定义控制台应用程序的入口点。 // #include "stdafx.h" #include "ThostFtdcMdSpiImpl.hpp" #include "ThostFtdcTraderSpiImpl.hpp" #include "future_helper.h" #include "TinyConfig.h" #include "FileMap.h" #include <boost/lockfree/queue.hpp> #include <iostream> #include <conio.h> CFileMap g_fileMap; TinyConfig g_config; CThostFtdcTraderSpiImpl g_trade; vector<shared_ptr<CThostFtdcMdSpiImpl>> g_vecQuote; int g_nPreTradingDay; int g_nTradingDay; bool g_bThread = true ; boost::lockfree::queue<CThostFtdcDepthMarketDataField> g_buffer(1000); shared_ptr< thread > g_thdDealData = nullptr ; vector<CThostFtdcDepthMarketDataField> g_vecBuffer; void OnLog( const char * p) { printf (p); ofstream ofs_log( "./log.data.receive.txt" , ios_base::app); ofs_log << future_helper::get_local_datetime().c_str() << " " << p; ofs_log.close(); } #pragma region trade module ///trade call back bool g_bTradeLoginIn = false ; void OnTradeComplete() { int nTradingDay = atoi (g_trade.GetTradingDay()); if (g_nTradingDay != 0 && g_nTradingDay != nTradingDay) g_nPreTradingDay = g_nTradingDay; g_nTradingDay = nTradingDay; g_bTradeLoginIn = true ; printf ( "交易日:%d-%d\n" , g_nPreTradingDay, g_nTradingDay); } void OnTradeRspMessage( const char * from, const char * msg, future_msg_type msg_type) { OnLog(future_helper::format_string( "%s%s:%s\n" , msg_type == future_msg_type::CTP_SUCCESS ? "" : "*" , from, msg).c_str()); } void TradeLogin() { int retry_time = 3; while (1) { g_bTradeLoginIn = false ; g_trade.SetCallBackFunc(OnTradeComplete, nullptr , OnTradeRspMessage); g_trade.SetAuthenticate(g_config.GetValue( "Trade" , "product" ).c_str(), g_config.GetValue( "Trade" , "id" ).c_str(), g_config.GetValue( "Trade" , "code" ).c_str()); g_trade.InitParams(g_config.GetValue( "Trade" , "ip" ).c_str(), g_config.GetValue( "Trade" , "broker" ).c_str(), g_config.GetValue( "Trade" , "user" ).c_str(), g_config.GetValue( "Trade" , "password" ).c_str()); int tick = GetTickCount(); while (1) { if (g_bTradeLoginIn) break ; this_thread::sleep_for(chrono::milliseconds(100)); if (GetTickCount() - tick > 60 * 1000) break ; } g_trade.Release(); g_fileMap.SetTradingDay(g_nPreTradingDay, g_nTradingDay); if (g_bTradeLoginIn) break ; retry_time--; OnLog( "交易 登录超时[1分钟],重新登录\n" ); if (retry_time == 0) { OnLog( "交易 登录次数达到3次,不再尝试重新登录!\n" ); break ; } } } // #pragma endregion trade module ///market call back int g_nMarketLoginIn = 0; void OnMarketComplete() { g_nMarketLoginIn++; } void OnMarketRspMessage( const char * from, const char * msg, future_msg_type msg_type) { OnLog(future_helper::format_string( "%s%s:%s\n" , msg_type == future_msg_type::CTP_SUCCESS ? "" : "*" , from, msg).c_str()); } void OnDepthMarketData(CThostFtdcDepthMarketDataField* p) { if (!p) return ; g_buffer.push(*p); } void MarketLogin() { int retry_time = 3; std::vector<CThostFtdcInstrumentField> vecTradingCode; g_trade.GetTradingCode(vecTradingCode, THOST_FTDC_PC_Futures); g_trade.GetTradingCode(vecTradingCode, THOST_FTDC_PC_Options); printf ( "订阅合约:%d\n" , vecTradingCode.size()); g_fileMap.SetTradingCode(vecTradingCode); while (1) { g_nMarketLoginIn = 0; for ( int i = 1; i <= 3; i++) { string this_ip = g_config.GetValue( "Market" , future_helper::format_string( "ip%d" , i).c_str()); if (this_ip == "" ) break ; auto pMdSpi = make_shared<CThostFtdcMdSpiImpl>(); g_vecQuote.push_back(pMdSpi); pMdSpi->SetTradingCode(vecTradingCode); pMdSpi->SetCallBackFunc(OnDepthMarketData, OnMarketRspMessage, nullptr , OnMarketComplete); pMdSpi->InitParams(this_ip.c_str(), "8888" , "88888888" , "88888888" ); } int tick = GetTickCount(); while (1) { if (g_nMarketLoginIn == g_vecQuote.size()) break ; this_thread::sleep_for(chrono::milliseconds(100)); if (GetTickCount() - tick > 60 * 1000) break ; } if (g_nMarketLoginIn > 0 && g_nMarketLoginIn != g_vecQuote.size()) { int nUnconnectIndex = 1; OnLog( "***********************\n" ); OnLog( "*有未能正常登录的行情账号:\n" ); for ( auto iter = g_vecQuote.begin(); iter != g_vecQuote.end(); iter++) { if (!(*iter)->IsConnected()) { OnLog(future_helper::format_string( "%d、%s\n" , nUnconnectIndex, (*iter)->GetFrontIP()).c_str()); nUnconnectIndex++; } } OnLog( "***********************\n" ); OnLog( "已有行情连接成功,未连接的不尝试重连\n" ); } if (g_nMarketLoginIn > 0) break ; OnLog( "行情 登录超时[1分钟],重新登录\n" ); retry_time--; if (retry_time == 0) { OnLog( "行情 登录次数达到3次,不再尝试重新登录!\n" ); break ; } } } void ThreadConsumeTick() { OnLog( "处理线程已启动\n" ); map<string, CThostFtdcDepthMarketDataField> m_mapShot; while (g_bThread) { if (g_buffer.empty()) { this_thread::sleep_for(chrono::milliseconds(5)); continue ; } CThostFtdcDepthMarketDataField data; g_buffer.pop(data); if (data.UpdateTime[2] == ':' ) { data.UpdateTime[2] = data.UpdateTime[3]; data.UpdateTime[3] = data.UpdateTime[4]; data.UpdateTime[4] = data.UpdateTime[6]; data.UpdateTime[5] = data.UpdateTime[7]; data.UpdateTime[6] = 0; } if ( atoi (data.UpdateTime) > 180000) { _snprintf_s(data.TradingDay, 9, "%d" , g_nPreTradingDay); } else { _snprintf_s(data.TradingDay, 9, "%d" , g_nTradingDay); } bool bNewTick = false ; auto find_shot = m_mapShot.find(data.InstrumentID); if (find_shot == m_mapShot.end()) { bNewTick = true ; m_mapShot[data.InstrumentID] = data; } else { long long llThis = future_helper::to_longlong( atoi (data.TradingDay), atoi (data.UpdateTime)) * 1000 + data.UpdateMillisec; long long llLast = future_helper::to_longlong( atoi (find_shot->second.TradingDay), atoi (find_shot->second.UpdateTime)) * 1000 + find_shot->second.UpdateMillisec; if (llThis > llLast || find_shot->second.Volume < data.Volume) { //郑商所没有毫秒 bNewTick = true ; find_shot->second = data; } } if (bNewTick && g_fileMap.IsOK()) { g_fileMap.AddDepthData(&data); } else if (bNewTick) { g_vecBuffer.push_back(data); } } OnLog( "处理线程已退出\n" ); } //bReset == true:清空共享内存中的tick数据 void OutPutTickData( bool bReset) { unsigned int * exist_size; CThostFtdcDepthMarketDataField* p = g_fileMap.GetDepthData(&exist_size); if (!p || *exist_size <= 0) { OnLog( "共享内存中无任何tick数据\n从内存中导出\n" ); if (g_vecBuffer.size() == 0) { OnLog( "内存中无任何tick数据\n" ); return ; } future_helper::safe_create_floder((g_config.GetValue( "Path" , "tick_normal" ) + "\\" + future_helper::to_string(g_nTradingDay / 10000)).c_str()); ofstream ofs_tick(g_config.GetValue( "Path" , "tick_normal" ) + "\\" + future_helper::to_string(g_nTradingDay / 10000) + "\\data_" + future_helper::to_string(g_nTradingDay) + ".txt" , ios_base::out); for ( auto & field : g_vecBuffer) { char szBuf[1024]; _snprintf_s(szBuf, 1024, //交易日,最后修改时间,最后修改毫秒,合约代码, //最新价,上次结算价,昨收盘,昨持仓量, //今开盘,最高价,最低价,数量,成交金额,持仓量, //涨停板价,跌停板价, //申买价一,申买量一,申卖价一,申卖量一, //当日均价 "%s,%s,%d,%s,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%d,%.4f\n" , field.TradingDay, field.UpdateTime, field.UpdateMillisec, field.InstrumentID, field.LastPrice, field.PreSettlementPrice, field.PreClosePrice, field.PreOpenInterest, field.OpenPrice, field.HighestPrice, field.LowestPrice, field.Volume, field.Turnover, field.OpenInterest, field.UpperLimitPrice, field.LowerLimitPrice, field.BidPrice1, field.BidVolume1, field.AskPrice1, field.AskVolume1, field.AveragePrice); ofs_tick << szBuf; } return ; } OnLog(future_helper::format_string( "输出共享内存中的tick数据 %d 个\n" , *exist_size).c_str()); future_helper::safe_create_floder((g_config.GetValue( "Path" , "tick_normal" ) + "\\" + future_helper::to_string(g_nTradingDay / 10000)).c_str()); ofstream ofs_tick(g_config.GetValue( "Path" , "tick_normal" ) + "\\" + future_helper::to_string(g_nTradingDay / 10000) + "\\data_" + future_helper::to_string(g_nTradingDay) + ".txt" , ios_base::out); for ( int i = 0; i < *exist_size; i++) { auto & field = *(CThostFtdcDepthMarketDataField*)(p + i); char szBuf[1024]; _snprintf_s(szBuf, 1024, //交易日,最后修改时间,最后修改毫秒,合约代码, //最新价,上次结算价,昨收盘,昨持仓量, //今开盘,最高价,最低价,数量,成交金额,持仓量, //涨停板价,跌停板价, //申买价一,申买量一,申卖价一,申卖量一, //当日均价 "%s,%s,%d,%s,%.3f,%.4f,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%.4f,%.4f,%.4f,%.4f,%d,%.4f,%d,%.4f" , field.TradingDay, field.UpdateTime, field.UpdateMillisec, field.InstrumentID, future_helper::check_double(field.LastPrice), future_helper::check_double(field.PreSettlementPrice), future_helper::check_double(field.PreClosePrice), future_helper::check_double(field.PreOpenInterest), future_helper::check_double(field.OpenPrice), future_helper::check_double(field.HighestPrice), future_helper::check_double(field.LowestPrice), field.Volume, future_helper::check_double(field.Turnover), future_helper::check_double(field.OpenInterest), future_helper::check_double(field.UpperLimitPrice), future_helper::check_double(field.LowerLimitPrice), future_helper::check_double(field.BidPrice1), field.BidVolume1, future_helper::check_double(field.AskPrice1), field.AskVolume1, future_helper::check_double(field.AveragePrice)); ofs_tick << szBuf << endl; } ofs_tick.flush(); ofs_tick.close(); if (bReset) *exist_size = 0; } void Open() { printf ( "%s\n" , future_helper::get_local_datetime().c_str()); g_nMarketLoginIn = 0; OnLog( "区间交易开始!\n" ); if (!g_thdDealData) { g_bThread = true ; g_thdDealData = make_shared< thread >(ThreadConsumeTick); } if (!g_fileMap.CreateFileMap(FILE_MAP_KEY)) { OnLog( "共享内存创建失败!直接存文件...\n" ); } else { g_fileMap.InitDefaultRange(); } TradeLogin(); unsigned int * exist_count; auto pTick = g_fileMap.GetDepthData(&exist_count); int local_time = atoi (future_helper::get_local_time( false ).c_str()); if ((!pTick || *exist_count == 0) && (local_time < 151500 || local_time > 180000)) { ifstream ifs_tick(g_config.GetValue( "Path" , "tick_normal" ) + "\\" + future_helper::to_string(g_nTradingDay / 10000) + "\\data_" + future_helper::to_string(g_nTradingDay) + ".txt" , ios_base::in); if (ifs_tick.is_open()) { OnLog( "本地加载tick到共享内存:" ); int tick_count = 0; CThostFtdcDepthMarketDataField data; char szLine[1024]; while (ifs_tick.getline(szLine, 1024)) { future_helper::LineToStruct(szLine, data); g_fileMap.AddDepthData(&data); tick_count++; } OnLog(future_helper::format_string( "%d条\n" , tick_count).c_str()); } } MarketLogin(); } void Close() { printf ( "%s\n" , future_helper::get_local_datetime().c_str()); int close_count = 0; for ( auto & md : g_vecQuote) { if (md->IsConnected()) { close_count++; md->ReleaseAPI(); } } g_vecQuote.clear(); if (close_count > 0) { OnLog(future_helper::format_string( "区间交易结束!关闭行情接收:%d\n" , close_count).c_str()); if (g_thdDealData && g_thdDealData->joinable()) { g_bThread = false ; g_thdDealData->join(); g_thdDealData = nullptr ; } } g_nMarketLoginIn = 0; OnLog(future_helper::format_string( "共享内存使用%d/%d(mb)=%.2f%%%%\n" , g_fileMap.GetTotalUsedSpace() / 1024 / 1024, MAX_PAGE_SIZE / 1024 / 1024, ( double )g_fileMap.GetTotalUsedSpace() * 100 / MAX_PAGE_SIZE).c_str()); } int main() { if (!g_config.Open((future_helper::GetWorkingDir() + "\\system.ini" ).c_str())) { OnLog( "system.ini打开失败\n" ); system ( "pause" ); } OnLog( "执行开启共享内存测试..." ); if (!g_fileMap.CreateFileMap(FILE_MAP_KEY)) { OnLog( "失败!" ); system ( "pause" ); } else { OnLog( "成功!\n" ); g_fileMap.Release(); } OnLog( "==========start==========\n" ); printf ( "初次启动需要输入前交易日,用来更新夜盘的日期..." ); scanf_s( "%d" , &g_nPreTradingDay); int night_tick_count = -1; int last_local_time = atoi (future_helper::get_local_time( false ).c_str()); while (1) { if (_kbhit() != 0) { printf ( "**********不要长时间阻塞此处,这样将导致软件无法正常工作!***********\n" ); printf ( "**********记得输入完之后按回车键哦! ***********\n" ); string str; std::getline(std::cin, str); if (str == "quit" ) { Close(); break ; } else if (str == "help" ) { printf ( "quit:退出\n" ); printf ( "help:帮助\n" ); printf ( "open:手动开启接收\n" ); printf ( "close:手动关闭接收\n" ); printf ( "tradingday:交易日\n" ); printf ( "tradingcode:可交易合约\n" ); printf ( "tickdata:输出接收到的tick数据\n" ); printf ( "sharememory:共享内存使用率\n" ); } else if (str == "open" ) { Open(); } else if (str == "close" ) { Close(); } else if (str == "tradingday" ) { printf ( "交易日:%d-%d\n" , g_nPreTradingDay, g_nTradingDay); } else if (str == "tradingcode" ) { printf ( "期货可交易合约:\n" ); map<string, vector<CThostFtdcInstrumentField>> mapClasses; vector<CThostFtdcInstrumentField> vec; g_trade.GetTradingCode(vec, THOST_FTDC_PC_Futures); for ( auto & item : vec) { mapClasses[item.ProductID].push_back(item); } for ( auto iter = mapClasses.begin(); iter != mapClasses.end(); iter++) { printf ( "%s:" , iter->first.c_str()); for ( auto & item : iter->second) { printf ( "%s " , item.InstrumentID); } printf ( "\n" ); } } else if (str == "tickdata" ) { OutPutTickData( false ); } else if (str == "sharememory" ) { unsigned int * exist_size; CThostFtdcDepthMarketDataField* p = g_fileMap.GetDepthData(&exist_size); OnLog(future_helper::format_string( "tick数据个数:%d\n共享内存使用%d/%d(mb)=%.2f%%%%\n" , p ? *exist_size : 0, g_fileMap.GetTotalUsedSpace() / 1024 / 1024, MAX_PAGE_SIZE / 1024 / 1024, ( double )g_fileMap.GetTotalUsedSpace() * 100 / MAX_PAGE_SIZE).c_str()); } } int local_time = atoi (future_helper::get_local_time( false ).c_str()); if ((local_time >= 23500 && last_local_time < 23500) || (local_time >= 155000 && last_local_time < 155000)) { unsigned int * exist_count; auto pTick = g_fileMap.GetDepthData(&exist_count); Close(); if ((local_time >= 155000 && last_local_time < 155000)){ OnLog( "交易日结束了?..." ); if (night_tick_count == -1 || *exist_count > night_tick_count) { OnLog( "是的!\n" ); OutPutTickData( true ); if (night_tick_count != -1) { OnLog( "打开TickToKline.exe\n" ); ::ShellExecute(NULL, "open" , (future_helper::GetWorkingDir() + "\\TickToKline.exe" ).c_str(), "1" , NULL, SW_SHOW); } night_tick_count = -1; } else { OnLog(future_helper::format_string( "没有!(共享内存的数据没有比夜盘多%d-%d)!\n" , night_tick_count, *exist_count).c_str()); } } else { OnLog( "夜盘结束了?..." ); if (night_tick_count == -1 && *exist_count > 0) { OnLog( "是的!输出一次夜盘数据\n" ); night_tick_count = *exist_count; OutPutTickData( false ); } else { OnLog( "现在不是夜盘结束时间!(没有夜盘数据 或者 已经输出过一次夜盘数据了)\n" ); } } } if ((local_time >= 82000 && last_local_time < 82000) || (local_time >= 202000 && last_local_time < 202000)) { Close(); Open(); } last_local_time = local_time; this_thread::sleep_for(chrono::seconds(1)); } return 0; } |