Files
quant/gogogo/strategy_executor.py
2025-11-15 17:25:43 +08:00

501 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# strategy_executor.py
import pandas as pd
import numpy as np
import vectorbt as vbt
from data_fetcher import get_processed_data
def calculate_ratio_signals(ratio_series, window=20, num_std=2):
"""
基于价格比率计算配对交易信号
修正信号逻辑:比率低时做多价差,比率高时做空价差
"""
# 计算布林带
ratio_ma = ratio_series.rolling(window=window).mean()
ratio_std = ratio_series.rolling(window=window).std()
upper_band = ratio_ma + num_std * ratio_std
lower_band = ratio_ma - num_std * ratio_std
# 生成交易信号
signals = pd.Series(0, index=ratio_series.index, name='signal')
current_signal = 0
for i in range(len(ratio_series)):
if i < window: # 跳过布林带计算期
continue
ratio_val = ratio_series.iloc[i]
ma_val = ratio_ma.iloc[i]
upper_val = upper_band.iloc[i]
lower_val = lower_band.iloc[i]
# 当前无仓位时的开仓条件
if current_signal == 0:
# 比率突破下轨SMIC相对低估做多价差买入SMIC卖空HHIC
if ratio_val < lower_val:
signals.iloc[i] = 1 # 做多价差
current_signal = 1
print(f"开仓做多价差: {ratio_series.index[i]} - 比率{ratio_val:.4f} < 下轨{lower_val:.4f}")
# 比率突破上轨SMIC相对高估做空价差卖空SMIC买入HHIC
elif ratio_val > upper_val:
signals.iloc[i] = -1 # 做空价差
current_signal = -1
print(f"开仓做空价差: {ratio_series.index[i]} - 比率{ratio_val:.4f} > 上轨{upper_val:.4f}")
# 当前有仓位时的平仓条件
elif current_signal != 0:
# 当比率回归均值时平仓
if (current_signal == 1 and ratio_val >= ma_val) or \
(current_signal == -1 and ratio_val <= ma_val):
signals.iloc[i] = 0
current_signal = 0
print(f"平仓: {ratio_series.index[i]} - 比率{ratio_val:.4f} 回归均线{ma_val:.4f}")
return signals, ratio_ma, upper_band, lower_band
def generate_ratio_size(signals, price_ratio, position_size=0.5):
"""
生成比率交易的size数据
修复数据类型问题
"""
# 创建与price_ratio相同形状的size Series初始为0
size_series = pd.Series(0.0, index=signals.index, name='size') # 改为float类型
current_position = 0
for i in range(len(signals)):
if i < 20: # 跳过布林带计算期
continue
signal = signals.iloc[i]
if signal == 1 and current_position != 1: # 做多价差 -> 买入比率
size_series.iloc[i] = float(position_size) # 明确转换为float
current_position = 1
elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率
size_series.iloc[i] = -float(position_size) # 明确转换为float
current_position = -1
elif signal == 0 and current_position != 0: # 平仓
size_series.iloc[i] = 0.0 # 明确转换为float
current_position = 0
return size_series
def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5):
"""
生成真实股票交易的size数据
添加详细的调试信息来跟踪信号执行
"""
# 创建空的size Series
smic_size = pd.Series(0.0, index=signals.index, name='SMIC')
hhic_size = pd.Series(0.0, index=signals.index, name='HHIC')
current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差
smic_position = 0.0 # 当前中芯持仓数量
hhic_position = 0.0 # 当前华虹持仓数量
print(f"\n=== 交易执行调试信息 ===")
print(f"信号总数: {len(signals)}")
print(f"非零信号数量: {(signals != 0).sum()}")
print(f"做多信号: {(signals == 1).sum()}, 做空信号: {(signals == -1).sum()}, 平仓信号: {(signals == 0).sum()}")
for i in range(len(signals)):
if i < 20: # 跳过布林带计算期
continue
signal = signals.iloc[i]
date = signals.index[i]
smic_price = close_smic.iloc[i]
hhic_price = close_hhic.iloc[i]
# 只有信号变化时才执行交易
if signal != 0 or (signal == 0 and current_position != 0):
print(f"\n日期: {date}, 信号: {signal}, 当前仓位: {current_position}")
# 平仓条件信号为0且当前有仓位
if signal == 0 and current_position != 0:
print(f" 执行平仓: SMIC持仓{smic_position:.2f}, HHIC持仓{hhic_position:.2f}")
# 平掉所有仓位
if smic_position != 0:
smic_size.iloc[i] = -smic_position
smic_position = 0.0
print(f" 平仓SMIC: {-smic_size.iloc[i]:.2f}")
if hhic_position != 0:
hhic_size.iloc[i] = -hhic_position
hhic_position = 0.0
print(f" 平仓HHIC: {-hhic_size.iloc[i]:.2f}")
current_position = 0
continue
# 开仓条件:只有当前无仓位时才开新仓
if current_position == 0 and signal != 0:
if signal == 1: # 做多价差:买入中芯,卖空华虹
# 计算每只股票的仓位价值(等市值对冲)
position_value = initial_cash * position_ratio
# 做多中芯国际
smic_shares = position_value / smic_price
smic_size.iloc[i] = smic_shares
smic_position = smic_shares
# 做空华虹半导体
hhic_shares = -position_value / hhic_price
hhic_size.iloc[i] = hhic_shares
hhic_position = hhic_shares
current_position = 1
print(f" 执行做多价差开仓:")
print(f" 买入SMIC: {smic_shares:.2f}股 @{smic_price:.2f}")
print(f" 卖空HHIC: {abs(hhic_shares):.2f}股 @{hhic_price:.2f}")
elif signal == -1: # 做空价差:卖空中芯,买入华虹
# 计算每只股票的仓位价值(等市值对冲)
position_value = initial_cash * position_ratio
# 做空中芯国际
smic_shares = -position_value / smic_price
smic_size.iloc[i] = smic_shares
smic_position = smic_shares
# 做多华虹半导体
hhic_shares = position_value / hhic_price
hhic_size.iloc[i] = hhic_shares
hhic_position = hhic_shares
current_position = -1
print(f" 执行做空价差开仓:")
print(f" 卖空SMIC: {abs(smic_shares):.2f}股 @{smic_price:.2f}")
print(f" 买入HHIC: {hhic_shares:.2f}股 @{hhic_price:.2f}")
# 最后检查是否有未平仓的仓位,如果有则在最后一天平仓
if current_position != 0:
last_index = len(signals) - 1
last_date = signals.index[last_index]
print(f"\n最终平仓: {last_date}")
if smic_position != 0:
smic_size.iloc[last_index] = -smic_position
print(f" 平仓SMIC: {-smic_size.iloc[last_index]:.2f}")
if hhic_position != 0:
hhic_size.iloc[last_index] = -hhic_position
print(f" 平仓HHIC: {-hhic_size.iloc[last_index]:.2f}")
# 统计实际执行的交易
smic_trades = (smic_size != 0).sum()
hhic_trades = (hhic_size != 0).sum()
print(f"\n实际执行交易统计:")
print(f" SMIC交易次数: {smic_trades}")
print(f" HHIC交易次数: {hhic_trades}")
# 创建size DataFrame
size_df = pd.DataFrame({
'SMIC': smic_size,
'HHIC': hhic_size
})
return size_df
def debug_signal_logic(strategy_data):
"""调试信号生成逻辑"""
print("\n" + "="*60)
print("=== 信号生成逻辑调试 ===")
print("="*60)
signals = strategy_data['signals']
price_ratio = strategy_data['price_ratio']
ratio_ma = strategy_data['ratio_ma']
upper_band = strategy_data['upper_band']
lower_band = strategy_data['lower_band']
# 检查关键日期的信号逻辑
key_dates = [
'2024-12-20', '2024-12-23', '2025-02-18', '2025-02-19',
'2025-04-22', '2025-04-23', '2025-05-16', '2025-05-19'
]
for date_str in key_dates:
date = pd.Timestamp(date_str)
if date in signals.index:
signal = signals.loc[date]
ratio_val = price_ratio.loc[date]
ma_val = ratio_ma.loc[date]
upper_val = upper_band.loc[date]
lower_val = lower_band.loc[date]
print(f"\n{date}:")
print(f" 比率: {ratio_val:.4f}, 均线: {ma_val:.4f}")
print(f" 上轨: {upper_val:.4f}, 下轨: {lower_val:.4f}")
print(f" 信号: {signal} ({'平仓' if signal == 0 else '做多' if signal == 1 else '做空'})")
# 检查信号逻辑
if signal == 1:
print(f" 逻辑: 比率{ratio_val:.4f} < 下轨{lower_val:.4f} = {ratio_val < lower_val}")
elif signal == -1:
print(f" 逻辑: 比率{ratio_val:.4f} > 上轨{upper_val:.4f} = {ratio_val > upper_val}")
elif signal == 0:
print(f" 逻辑: 平仓信号")
def debug_signal_consistency(strategy_data, stock_portfolio):
"""调试信号和交易的一致性"""
print("\n" + "="*60)
print("=== 信号与交易一致性调试 ===")
print("="*60)
signals = strategy_data['signals']
price_ratio = strategy_data['price_ratio']
ratio_ma = strategy_data['ratio_ma']
upper_band = strategy_data['upper_band']
lower_band = strategy_data['lower_band']
# 获取订单记录
orders_df = stock_portfolio.orders.records_readable
print(f"\n信号数据:")
print(f"信号时间范围: {signals.index[0]}{signals.index[-1]}")
print(f"信号总数: {len(signals)}")
# 找出所有信号变化点
signal_changes = signals[signals != signals.shift(1)]
print(f"\n信号变化点数量: {len(signal_changes)}")
# 显示前10个信号变化点
print(f"\n前10个信号变化点:")
for i, (date, signal) in enumerate(signal_changes.head(10).items()):
ratio_val = price_ratio.loc[date]
ma_val = ratio_ma.loc[date]
upper_val = upper_band.loc[date]
lower_val = lower_band.loc[date]
signal_desc = "平仓" if signal == 0 else ("做多" if signal == 1 else "做空")
print(f" {date}: {signal_desc} (比率: {ratio_val:.4f}, 均线: {ma_val:.4f})")
# 检查订单时间与信号时间的对应关系
if 'Timestamp' in orders_df.columns:
order_dates = pd.to_datetime(orders_df['Timestamp']).unique()
print(f"\n订单执行日期: {len(order_dates)}")
print("订单日期:", sorted(order_dates))
# 检查哪些信号日期有对应的订单
signal_dates_with_orders = []
signal_dates_without_orders = []
for date in signal_changes.index:
if date in order_dates:
signal_dates_with_orders.append(date)
else:
signal_dates_without_orders.append(date)
print(f"\n有对应订单的信号日期: {len(signal_dates_with_orders)}")
print(f"无对应订单的信号日期: {len(signal_dates_without_orders)}")
if signal_dates_without_orders:
print("缺失订单的信号日期:", signal_dates_without_orders[:5])
def generate_strategy():
"""生成配对交易策略"""
# 获取数据
smic_data, hhic_data = get_processed_data()
# ========== 创建价格比率作为独立资产 ==========
print("\n=== 创建价格比率作为独立资产 ===")
close_smic = smic_data['close']
close_hhic = hhic_data['close']
# 计算价格比率 - 作为独立的"股票"
price_ratio = close_smic / close_hhic
price_ratio.name = 'SMIC_HHIC_RATIO'
print(f"价格比率数据形状: {price_ratio.shape}")
print(f"价格比率统计:")
print(f" 均值: {price_ratio.mean():.4f}")
print(f" 标准差: {price_ratio.std():.4f}")
print(f" 最小值: {price_ratio.min():.4f}")
print(f" 最大值: {price_ratio.max():.4f}")
# 设置交易参数
initial_cash = 100000
commission = 0.001 # 0.1% 交易佣金
position_size = 0.5 # 每次交易仓位比例
# 计算信号
signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals(
price_ratio, window=20, num_std=2
)
print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}")
# 生成比率交易的size数据用于基于比率的回测
ratio_size = generate_ratio_size(signals, price_ratio, position_size)
print(f"比率size数据形状: {ratio_size.shape}")
print(f"比率非零交易数量: {(ratio_size != 0).sum()}")
# 生成真实股票交易的size数据用于真实股票回测
stock_sizes = generate_stock_sizes(signals, close_smic, close_hhic, initial_cash, position_size)
print(f"股票size数据形状: {stock_sizes.shape}")
print(f"中芯国际非零交易数量: {(stock_sizes['SMIC'] != 0).sum()}")
print(f"华虹半导体非零交易数量: {(stock_sizes['HHIC'] != 0).sum()}")
return {
'price_ratio': price_ratio,
'signals': signals,
'ratio_size': ratio_size, # 基于比率的size
'stock_sizes': stock_sizes, # 真实股票的size
'ratio_ma': ratio_ma,
'upper_band': upper_band,
'lower_band': lower_band,
'initial_cash': initial_cash,
'commission': commission,
'smic_data': smic_data,
'hhic_data': hhic_data,
'close_smic': close_smic,
'close_hhic': close_hhic
}
def create_portfolio(strategy_data):
"""创建基于价格比率的投资组合(用于分析)"""
print("创建基于价格比率的投资组合...")
price_ratio = strategy_data['price_ratio']
ratio_size = strategy_data['ratio_size']
initial_cash = strategy_data['initial_cash']
commission = strategy_data['commission']
try:
# 将price_ratio转换为DataFramevectorbt需要
ratio_close = pd.DataFrame({'RATIO': price_ratio})
portfolio = vbt.Portfolio.from_orders(
close=ratio_close, # 只传入比率数据
size=ratio_size, # 基于比率的交易信号
init_cash=initial_cash,
fees=commission,
freq='D'
)
print("基于价格比率的投资组合创建成功!")
return portfolio
except Exception as e:
print(f"创建投资组合时出错: {e}")
import traceback
traceback.print_exc()
return None
def create_real_stock_portfolio(strategy_data):
"""创建基于真实股票的投资组合(用于真实收益计算)"""
print("创建基于真实股票的投资组合...")
close_smic = strategy_data['close_smic']
close_hhic = strategy_data['close_hhic']
stock_sizes = strategy_data['stock_sizes']
initial_cash = strategy_data['initial_cash']
commission = strategy_data['commission']
try:
# 创建包含两只股票收盘价的DataFrame
close_df = pd.DataFrame({
'SMIC': close_smic,
'HHIC': close_hhic
})
# 创建投资组合
portfolio = vbt.Portfolio.from_orders(
close=close_df, # 传入两只股票的收盘价
size=stock_sizes, # 传入两只股票的交易数量
init_cash=initial_cash,
fees=commission,
freq='D'
)
print("基于真实股票的投资组合创建成功!")
return portfolio
except Exception as e:
print(f"创建真实股票投资组合时出错: {e}")
import traceback
traceback.print_exc()
return None
def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5):
"""
生成真实股票交易的size数据
返回两个DataFramesmic_size和hhic_size
"""
# 创建空的size Series
smic_size = pd.Series(0.0, index=signals.index, name='SMIC')
hhic_size = pd.Series(0.0, index=signals.index, name='HHIC')
current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差
for i in range(len(signals)):
if i < 20: # 跳过布林带计算期
continue
signal = signals.iloc[i]
smic_price = close_smic.iloc[i]
hhic_price = close_hhic.iloc[i]
if signal == 1 and current_position != 1: # 做多价差:买入中芯,卖空华虹
# 计算每只股票的仓位价值(等市值对冲)
position_value = initial_cash * position_ratio
# 做多中芯国际
smic_shares = position_value / smic_price
smic_size.iloc[i] = smic_shares
# 做空华虹半导体
hhic_shares = -position_value / hhic_price
hhic_size.iloc[i] = hhic_shares
current_position = 1
elif signal == -1 and current_position != -1: # 做空价差:卖空中芯,买入华虹
# 计算每只股票的仓位价值(等市值对冲)
position_value = initial_cash * position_ratio
# 做空中芯国际
smic_shares = -position_value / smic_price
smic_size.iloc[i] = smic_shares
# 做多华虹半导体
hhic_shares = position_value / hhic_price
hhic_size.iloc[i] = hhic_shares
current_position = -1
elif signal == 0 and current_position != 0: # 平仓
# 平掉所有仓位
if current_position == 1: # 平掉做多价差仓位
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
elif current_position == -1: # 平掉做空价差仓位
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
current_position = 0
# 创建size DataFrame
size_df = pd.DataFrame({
'SMIC': smic_size,
'HHIC': hhic_size
})
return size_df
if __name__ == "__main__":
strategy_data = generate_strategy()
# 测试基于比率的投资组合
ratio_portfolio = create_portfolio(strategy_data)
# 测试基于真实股票的投资组合
stock_portfolio = create_real_stock_portfolio(strategy_data)
print("策略生成完成!")