记录学习使用AlgoPlus接收期货实时行情,原文参考https://zhuanlan.zhihu.com/p/86082225
使用AlgoPlus接收期货实时行情
关于CTP
CTP是Comprehensive Transaction Platform的简称。CTP有MdApi和TraderApi两个独立的开放接口。
MdApi负责行情相关操作(订阅、接收)。
TraderApi负责交易相关的操作(买、卖、撤、查)。
MdApi与TraderApi方法的执行过程都是异步的,每一个请求都对应一个或多个负责接收执行结果的回调函数。例如,通过ReqOrderInsert方法向交易所发出买开仓指令,对应的回调方法OnRtnOrder可以实时接收交易所服务器发回来的执行通知。
AlgoPlus创建行情接口
MdApi是行情接口,使用时只需要传递账户参数创建一个实例就可以了。示例:
from AlgoPlus.CTP.MdApi import MdApi
class TickEngine(MdApi):
# 深度行情通知
def OnRtnDepthMarketData(self, pDepthMarketData):
print(pDepthMarketData)
# print(f"{pDepthMarketData.InstrumentID}当前最新价:{pDepthMarketData.LastPrice}")
if __name__ == '__main__':
from account_info import my_future_account_info_dict
future_account = my_future_account_info_dict['SimNow']
tick_engine = TickEngine(future_account.server_dict['MDServer']
, future_account.broker_id
, future_account.investor_id
, future_account.password
, future_account.app_id
, future_account.auth_code
, future_account.instrument_id_list
, None
, future_account.md_page_dir)
tick_engine.Join()
1、从AlgoPlus.CTP.MdApi文件中导入MdApi类。MdApi已对工作流程的前六步进行了封装。
2、TickEngine是MdApi的子类。TickEngine类主要实现收到行情的数据处理算法,示例只将收到的行情打印出来。
3、创建行情接口实例前,需要导入账户信息。示例的账户信息存放在同一个目录下的account_info.py文件中。
4、交易时间运行以上代码就可以将接收到的实时期货行情打印出来。
5、回调函数OnRtnDepthMarketData接收到的pDepthMarketData行情是DepthMarketDataField结构体的实例,在AlgoPlus.CTP.ApiStruct中被定义。以调用属性的方式可以获取行情任意字段的数值,例如pDepthMarketData.LastPrice表示最新价。DepthMarketDataField包括以下字段:
class DepthMarketDataField(BaseField):
"""深度行情"""
_fields_ = [
('TradingDay', c_char * 9) # ///交易日
, ('InstrumentID', c_char * 31) # 合约代码
, ('ExchangeID', c_char * 9) # 交易所代码
, ('ExchangeInstID', c_char * 31) # 合约在交易所的代码
, ('LastPrice', c_double) # 最新价
, ('PreSettlementPrice', c_double) # 上次结算价
, ('PreClosePrice', c_double) # 昨收盘
, ('PreOpenInterest', c_double) # 昨持仓量
, ('OpenPrice', c_double) # 今开盘
, ('HighestPrice', c_double) # 最高价
, ('LowestPrice', c_double) # 最低价
, ('Volume', c_int) # 数量
, ('Turnover', c_double) # 成交金额
, ('OpenInterest', c_double) # 持仓量
, ('ClosePrice', c_double) # 今收盘
, ('SettlementPrice', c_double) # 本次结算价
, ('UpperLimitPrice', c_double) # 涨停板价
, ('LowerLimitPrice', c_double) # 跌停板价
, ('PreDelta', c_double) # 昨虚实度
, ('CurrDelta', c_double) # 今虚实度
, ('UpdateTime', c_char * 9) # 最后修改时间
, ('UpdateMillisec', c_int) # 最后修改毫秒
, ('BidPrice1', c_double) # 申买价一
, ('BidVolume1', c_int) # 申买量一
, ('AskPrice1', c_double) # 申卖价一
, ('AskVolume1', c_int) # 申卖量一
, ('BidPrice2', c_double) # 申买价二
, ('BidVolume2', c_int) # 申买量二
, ('AskPrice2', c_double) # 申卖价二
, ('AskVolume2', c_int) # 申卖量二
, ('BidPrice3', c_double) # 申买价三
, ('BidVolume3', c_int) # 申买量三
, ('AskPrice3', c_double) # 申卖价三
, ('AskVolume3', c_int) # 申卖量三
, ('BidPrice4', c_double) # 申买价四
, ('BidVolume4', c_int) # 申买量四
, ('AskPrice4', c_double) # 申卖价四
, ('AskVolume4', c_int) # 申卖量四
, ('BidPrice5', c_double) # 申买价五
, ('BidVolume5', c_int) # 申买量五
, ('AskPrice5', c_double) # 申卖价五
, ('AskVolume5', c_int) # 申卖量五
, ('AveragePrice', c_double) # 当日均价
, ('ActionDay', c_char * 9) # 业务日期
]
说明:
1、队列是实现行情进程与策略进程之间共享数据的最简单有效的方案,也是AlgoPlus默认使用的方案。
2、每个策略对应一个队列,将这些队列的列表赋值给参数md_queue_list。
3、在OnRtnDepthMarketData中,将收到的行情放入所有队列。
策略接收行情
import time
from datetime import datetime, timedelta
from multiprocessing import Process, Queue
from AlgoPlus.CTP.TraderApi import TraderApi
from AlgoPlus.CTP.ApiStruct import *
from tick_engine import TickEngine
class TraderEngine(TraderApi):
def __init__(self, td_server, broker_id, investor_id, password, app_id, auth_code, md_queue=None
, page_dir='', private_resume_type=2, public_resume_type=2):
self.order_ref = 0 # 报单引用
self.order_time = None # 报单时间
self.order_status = b"" # 订单状态
self.Join()
# 撤单
def req_order_action(self, exchange_id, instrument_id, order_ref, order_sysid=''):
input_order_action_field = InputOrderActionField(
BrokerID=self.broker_id,
InvestorID=self.investor_id,
UserID=self.investor_id,
ExchangeID=exchange_id,
ActionFlag="0",
InstrumentID=instrument_id,
FrontID=self.front_id,
SessionID=self.session_id,
OrderSysID=order_sysid,
OrderRef=str(order_ref),
)
l_retVal = self.ReqOrderAction(input_order_action_field)
# 报单
def req_order_insert(self, exchange_id, instrument_id, order_price, order_vol, order_ref, direction, offset_flag):
input_order_field = InputOrderField(
BrokerID=self.broker_id,
InvestorID=self.investor_id,
ExchangeID=exchange_id,
InstrumentID=instrument_id,
UserID=self.investor_id,
OrderPriceType="2",
Direction=direction,
CombOffsetFlag=offset_flag,
CombHedgeFlag="1",
LimitPrice=order_price,
VolumeTotalOriginal=order_vol,
TimeCondition="3",
VolumeCondition="1",
MinVolume=1,
ContingentCondition="1",
StopPrice=0,
ForceCloseReason="0",
IsAutoSuspend=0,
OrderRef=str(order_ref),
)
l_retVal = self.ReqOrderInsert(input_order_field)
# 买开仓
def buy_open(self, exchange_ID, instrument_id, order_price, order_vol, order_ref):
self.req_order_insert(exchange_ID, instrument_id, order_price, order_vol, order_ref, '0', '0')
# 卖开仓
def sell_open(self, exchange_ID, instrument_id, order_price, order_vol, order_ref):
self.req_order_insert(exchange_ID, instrument_id, order_price, order_vol, order_ref, '1', '0')
# 买平仓
def buy_close(self, exchange_ID, instrument_id, order_price, order_vol, order_ref):
if exchange_ID == "SHFE" or exchange_ID == "INE":
self.req_order_insert(exchange_ID, instrument_id, order_price, order_vol, order_ref, '0', '3')
else:
self.req_order_insert(exchange_ID, instrument_id, order_price, order_vol, order_ref, '0', '1')
# 卖平仓
def sell_close(self, exchange_ID, instrument_id, order_price, order_vol, order_ref):
if exchange_ID == "SHFE" or exchange_ID == "INE":
self.req_order_insert(exchange_ID, instrument_id, order_price, order_vol, order_ref, '1', '3')
else:
self.req_order_insert(exchange_ID, instrument_id, order_price, order_vol, order_ref, '1', '1')
# 报单通知
def OnRtnOrder(self, pOrder):
self.order_status = pOrder.OrderStatus
if pOrder.OrderStatus == b"a":
status_msg = "未知状态!"
elif pOrder.OrderStatus == b"0":
if pOrder.Direction == b"0":
if pOrder.CombOffsetFlag == b"0":
status_msg = "买开仓已全部成交!"
else:
status_msg = "买平仓已全部成交!"
else:
if pOrder.CombOffsetFlag == b"0":
status_msg = "卖开仓已全部成交!"
else:
status_msg = "卖平仓已全部成交!"
elif pOrder.OrderStatus == b"1":
status_msg = "部分成交!"
elif pOrder.OrderStatus == b"3":
status_msg = "未成交!"
elif pOrder.OrderStatus == b"5":
status_msg = "已撤!"
else:
status_msg = "其他!"
self._write_log(f"{status_msg}=>{pOrder}")
def Join(self):
while True:
if self.status == 0:
last_md = None
# 如果队列非空,从队列中取数据
while not self.md_queue.empty():
last_md = self.md_queue.get(block=False)
if last_md:
# ############################################################################# #
if self.order_ref == 0:
# 涨停买开仓
self.order_ref += 1
self.buy_open(test_exchange_id, test_instrument_id, last_md.BidPrice1, test_vol, self.order_ref)
self.order_time = datetime.now()
self._write_log(f"=>买开仓请求!")
if self.order_ref == 1 and self.order_status == b"3" and datetime.now() - self.order_time > timedelta(seconds=3):
self.order_status = b""
self.req_order_action(test_exchange_id, test_instrument_id, self.order_ref)
self._write_log(f"=>发出撤单请求!")
if self.order_ref == 1 and self.order_status == b"0":
self.order_ref += 1
self.order_status = b""
self.sell_close(test_exchange_id, test_instrument_id, last_md.BidPrice1, test_vol, self.order_ref)
self.order_time = datetime.now()
self._write_log(f"=>买开仓已全部成交,发出卖平仓请求!")
# ############################################################################# #
if self.order_ref == 1:
if self.order_status == b"5":
print("老爷,买开仓单超过3秒未成交,已撤销,这里的测试工作已经按照您的吩咐全部完成!")
break
elif datetime.now() - self.order_time > timedelta(seconds=3):
print("买开仓执行等待中!")
elif self.order_ref == 2:
if self.order_status == b"0":
print("老爷,卖平仓单已成交,这里的测试工作已经按照您的吩咐全部完成!")
break
elif datetime.now() - self.order_time > timedelta(seconds=3):
print("卖平仓执行等待中!")
else:
time.sleep(1)
说明:
1、直接在TraderApi的子类中编写策略是最简单的方案。但是,该方案不适合单账户策略比较多的情况,因为CTP支持同时在线的终端个数有限。如果策略比较多,则创建有限个TraderApi,在独立的Strategy类与MdApi和TraderApi之间实现共享数据。
2、在Join方法中实现了策略逻辑:登录成功之后,先以排队价发开仓委托,如果挂单超过3秒未成交,则撤单并退出策略。如果开仓全部成交,则以对手价发平仓委托,等待全部成交后退出策略。
3、Join方法中的策略每次执行时,从队列中取出所有数据,以最后一笔行情的盘口价格作为委托价。
4、OnRtnOrder收到订单状态通知时更新本地订单状态、持仓手数,在策略中根据状态的变化进行后续操作。
## 多进程
# 请在这里填写需要测试的合约数据
# 警告:该例子只支持上期所品种平今仓测试
test_exchange_id = 'SHFE' # 交易所
test_instrument_id = 'ag1912' # 合约代码
test_vol = 1 # 报单手数
share_queue = Queue(maxsize=100) # 队列
if __name__ == "__main__":
import sys
sys.path.append("..")
from account_info import my_future_account_info_dict
future_account = my_future_account_info_dict['SimNow']
# 行情进程
md_process = Process(target=TickEngine, args=(future_account.server_dict['MDServer']
, future_account.broker_id
, future_account.investor_id
, future_account.password
, future_account.app_id
, future_account.auth_code
, future_account.instrument_id_list
, [share_queue]
, future_account.md_page_dir)
)
# 交易进程
trader_process = Process(target=TraderEngine, args=(future_account.server_dict['TDServer']
, future_account.broker_id
, future_account.investor_id
, future_account.password
, future_account.app_id
, future_account.auth_code
, share_queue
, future_account.td_page_dir)
)
md_process.start()
trader_process.start()
md_process.join()
trader_process.join()
说明:
1、前几节的例子中需要手动设置涨跌停价作为报单价,这里我们以实时行情的盘口价作为报单价,所以不再需要设置涨跌停价参数。
2、share_queue是一个队列,在多进程中,队列数据可以实现共享。
3、md_process和trader_process分别是行情进程和交易进程。这两个进程通过队列share_queue共享数据。
4、所有进程的join方法须在start方法之后最后调用。
5、这段代码放置在策略代码最后,执行即可看到执行结果。
6、参考这个例子可以很方便的扩展一对多、多对多的进程间数据共享模式。
AlgoPlus的设计在登录时通过查询获取初始持仓、可用资金,成交发生时自动增减持仓数量,当平仓报单时自动增加冻结数量。当报撤单、出入金时自动增减可用资金。
## 智能交易指令
买卖智能开平指令
只关注买卖,不关注平仓还是开仓,优先平仓,无持仓的情况下再开仓。
除了buyOpen、sellClose、sellOpen、buyClose、closeLong、closeShort这些指定了开平方向的指令,其他都是智能开平指令。