501 lines
19 KiB
Python
501 lines
19 KiB
Python
# strategy_executor.py
|
||
import pandas as pd
|
||
import numpy as np
|
||
import vectorbt as vbt
|
||
from data_fetcher import get_processed_data
|
||
|
||
def calculate_ratio_signals(ratio_series, window=20, num_std=2):
|
||
"""
|
||
基于价格比率计算配对交易信号
|
||
修正信号逻辑:比率低时做多价差,比率高时做空价差
|
||
"""
|
||
# 计算布林带
|
||
ratio_ma = ratio_series.rolling(window=window).mean()
|
||
ratio_std = ratio_series.rolling(window=window).std()
|
||
|
||
upper_band = ratio_ma + num_std * ratio_std
|
||
lower_band = ratio_ma - num_std * ratio_std
|
||
|
||
# 生成交易信号
|
||
signals = pd.Series(0, index=ratio_series.index, name='signal')
|
||
current_signal = 0
|
||
|
||
for i in range(len(ratio_series)):
|
||
if i < window: # 跳过布林带计算期
|
||
continue
|
||
|
||
ratio_val = ratio_series.iloc[i]
|
||
ma_val = ratio_ma.iloc[i]
|
||
upper_val = upper_band.iloc[i]
|
||
lower_val = lower_band.iloc[i]
|
||
|
||
# 当前无仓位时的开仓条件
|
||
if current_signal == 0:
|
||
# 比率突破下轨:SMIC相对低估,做多价差(买入SMIC,卖空HHIC)
|
||
if ratio_val < lower_val:
|
||
signals.iloc[i] = 1 # 做多价差
|
||
current_signal = 1
|
||
print(f"开仓做多价差: {ratio_series.index[i]} - 比率{ratio_val:.4f} < 下轨{lower_val:.4f}")
|
||
|
||
# 比率突破上轨:SMIC相对高估,做空价差(卖空SMIC,买入HHIC)
|
||
elif ratio_val > upper_val:
|
||
signals.iloc[i] = -1 # 做空价差
|
||
current_signal = -1
|
||
print(f"开仓做空价差: {ratio_series.index[i]} - 比率{ratio_val:.4f} > 上轨{upper_val:.4f}")
|
||
|
||
# 当前有仓位时的平仓条件
|
||
elif current_signal != 0:
|
||
# 当比率回归均值时平仓
|
||
if (current_signal == 1 and ratio_val >= ma_val) or \
|
||
(current_signal == -1 and ratio_val <= ma_val):
|
||
signals.iloc[i] = 0
|
||
current_signal = 0
|
||
print(f"平仓: {ratio_series.index[i]} - 比率{ratio_val:.4f} 回归均线{ma_val:.4f}")
|
||
|
||
return signals, ratio_ma, upper_band, lower_band
|
||
|
||
|
||
def generate_ratio_size(signals, price_ratio, position_size=0.5):
|
||
"""
|
||
生成比率交易的size数据
|
||
修复数据类型问题
|
||
"""
|
||
# 创建与price_ratio相同形状的size Series,初始为0
|
||
size_series = pd.Series(0.0, index=signals.index, name='size') # 改为float类型
|
||
current_position = 0
|
||
|
||
for i in range(len(signals)):
|
||
if i < 20: # 跳过布林带计算期
|
||
continue
|
||
|
||
signal = signals.iloc[i]
|
||
|
||
if signal == 1 and current_position != 1: # 做多价差 -> 买入比率
|
||
size_series.iloc[i] = float(position_size) # 明确转换为float
|
||
current_position = 1
|
||
|
||
elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率
|
||
size_series.iloc[i] = -float(position_size) # 明确转换为float
|
||
current_position = -1
|
||
|
||
elif signal == 0 and current_position != 0: # 平仓
|
||
size_series.iloc[i] = 0.0 # 明确转换为float
|
||
current_position = 0
|
||
|
||
return size_series
|
||
|
||
|
||
def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5):
|
||
"""
|
||
生成真实股票交易的size数据
|
||
添加详细的调试信息来跟踪信号执行
|
||
"""
|
||
# 创建空的size Series
|
||
smic_size = pd.Series(0.0, index=signals.index, name='SMIC')
|
||
hhic_size = pd.Series(0.0, index=signals.index, name='HHIC')
|
||
|
||
current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差
|
||
smic_position = 0.0 # 当前中芯持仓数量
|
||
hhic_position = 0.0 # 当前华虹持仓数量
|
||
|
||
print(f"\n=== 交易执行调试信息 ===")
|
||
print(f"信号总数: {len(signals)}")
|
||
print(f"非零信号数量: {(signals != 0).sum()}")
|
||
print(f"做多信号: {(signals == 1).sum()}, 做空信号: {(signals == -1).sum()}, 平仓信号: {(signals == 0).sum()}")
|
||
|
||
for i in range(len(signals)):
|
||
if i < 20: # 跳过布林带计算期
|
||
continue
|
||
|
||
signal = signals.iloc[i]
|
||
date = signals.index[i]
|
||
smic_price = close_smic.iloc[i]
|
||
hhic_price = close_hhic.iloc[i]
|
||
|
||
# 只有信号变化时才执行交易
|
||
if signal != 0 or (signal == 0 and current_position != 0):
|
||
print(f"\n日期: {date}, 信号: {signal}, 当前仓位: {current_position}")
|
||
|
||
# 平仓条件:信号为0且当前有仓位
|
||
if signal == 0 and current_position != 0:
|
||
print(f" 执行平仓: SMIC持仓{smic_position:.2f}, HHIC持仓{hhic_position:.2f}")
|
||
|
||
# 平掉所有仓位
|
||
if smic_position != 0:
|
||
smic_size.iloc[i] = -smic_position
|
||
smic_position = 0.0
|
||
print(f" 平仓SMIC: {-smic_size.iloc[i]:.2f}股")
|
||
if hhic_position != 0:
|
||
hhic_size.iloc[i] = -hhic_position
|
||
hhic_position = 0.0
|
||
print(f" 平仓HHIC: {-hhic_size.iloc[i]:.2f}股")
|
||
|
||
current_position = 0
|
||
continue
|
||
|
||
# 开仓条件:只有当前无仓位时才开新仓
|
||
if current_position == 0 and signal != 0:
|
||
if signal == 1: # 做多价差:买入中芯,卖空华虹
|
||
# 计算每只股票的仓位价值(等市值对冲)
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做多中芯国际
|
||
smic_shares = position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
smic_position = smic_shares
|
||
|
||
# 做空华虹半导体
|
||
hhic_shares = -position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
hhic_position = hhic_shares
|
||
|
||
current_position = 1
|
||
print(f" 执行做多价差开仓:")
|
||
print(f" 买入SMIC: {smic_shares:.2f}股 @{smic_price:.2f}")
|
||
print(f" 卖空HHIC: {abs(hhic_shares):.2f}股 @{hhic_price:.2f}")
|
||
|
||
elif signal == -1: # 做空价差:卖空中芯,买入华虹
|
||
# 计算每只股票的仓位价值(等市值对冲)
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做空中芯国际
|
||
smic_shares = -position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
smic_position = smic_shares
|
||
|
||
# 做多华虹半导体
|
||
hhic_shares = position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
hhic_position = hhic_shares
|
||
|
||
current_position = -1
|
||
print(f" 执行做空价差开仓:")
|
||
print(f" 卖空SMIC: {abs(smic_shares):.2f}股 @{smic_price:.2f}")
|
||
print(f" 买入HHIC: {hhic_shares:.2f}股 @{hhic_price:.2f}")
|
||
|
||
# 最后检查是否有未平仓的仓位,如果有则在最后一天平仓
|
||
if current_position != 0:
|
||
last_index = len(signals) - 1
|
||
last_date = signals.index[last_index]
|
||
print(f"\n最终平仓: {last_date}")
|
||
if smic_position != 0:
|
||
smic_size.iloc[last_index] = -smic_position
|
||
print(f" 平仓SMIC: {-smic_size.iloc[last_index]:.2f}股")
|
||
if hhic_position != 0:
|
||
hhic_size.iloc[last_index] = -hhic_position
|
||
print(f" 平仓HHIC: {-hhic_size.iloc[last_index]:.2f}股")
|
||
|
||
# 统计实际执行的交易
|
||
smic_trades = (smic_size != 0).sum()
|
||
hhic_trades = (hhic_size != 0).sum()
|
||
print(f"\n实际执行交易统计:")
|
||
print(f" SMIC交易次数: {smic_trades}")
|
||
print(f" HHIC交易次数: {hhic_trades}")
|
||
|
||
# 创建size DataFrame
|
||
size_df = pd.DataFrame({
|
||
'SMIC': smic_size,
|
||
'HHIC': hhic_size
|
||
})
|
||
|
||
return size_df
|
||
|
||
def debug_signal_logic(strategy_data):
|
||
"""调试信号生成逻辑"""
|
||
print("\n" + "="*60)
|
||
print("=== 信号生成逻辑调试 ===")
|
||
print("="*60)
|
||
|
||
signals = strategy_data['signals']
|
||
price_ratio = strategy_data['price_ratio']
|
||
ratio_ma = strategy_data['ratio_ma']
|
||
upper_band = strategy_data['upper_band']
|
||
lower_band = strategy_data['lower_band']
|
||
|
||
# 检查关键日期的信号逻辑
|
||
key_dates = [
|
||
'2024-12-20', '2024-12-23', '2025-02-18', '2025-02-19',
|
||
'2025-04-22', '2025-04-23', '2025-05-16', '2025-05-19'
|
||
]
|
||
|
||
for date_str in key_dates:
|
||
date = pd.Timestamp(date_str)
|
||
if date in signals.index:
|
||
signal = signals.loc[date]
|
||
ratio_val = price_ratio.loc[date]
|
||
ma_val = ratio_ma.loc[date]
|
||
upper_val = upper_band.loc[date]
|
||
lower_val = lower_band.loc[date]
|
||
|
||
print(f"\n{date}:")
|
||
print(f" 比率: {ratio_val:.4f}, 均线: {ma_val:.4f}")
|
||
print(f" 上轨: {upper_val:.4f}, 下轨: {lower_val:.4f}")
|
||
print(f" 信号: {signal} ({'平仓' if signal == 0 else '做多' if signal == 1 else '做空'})")
|
||
|
||
# 检查信号逻辑
|
||
if signal == 1:
|
||
print(f" 逻辑: 比率{ratio_val:.4f} < 下轨{lower_val:.4f} = {ratio_val < lower_val}")
|
||
elif signal == -1:
|
||
print(f" 逻辑: 比率{ratio_val:.4f} > 上轨{upper_val:.4f} = {ratio_val > upper_val}")
|
||
elif signal == 0:
|
||
print(f" 逻辑: 平仓信号")
|
||
|
||
def debug_signal_consistency(strategy_data, stock_portfolio):
|
||
"""调试信号和交易的一致性"""
|
||
print("\n" + "="*60)
|
||
print("=== 信号与交易一致性调试 ===")
|
||
print("="*60)
|
||
|
||
signals = strategy_data['signals']
|
||
price_ratio = strategy_data['price_ratio']
|
||
ratio_ma = strategy_data['ratio_ma']
|
||
upper_band = strategy_data['upper_band']
|
||
lower_band = strategy_data['lower_band']
|
||
|
||
# 获取订单记录
|
||
orders_df = stock_portfolio.orders.records_readable
|
||
|
||
print(f"\n信号数据:")
|
||
print(f"信号时间范围: {signals.index[0]} 到 {signals.index[-1]}")
|
||
print(f"信号总数: {len(signals)}")
|
||
|
||
# 找出所有信号变化点
|
||
signal_changes = signals[signals != signals.shift(1)]
|
||
print(f"\n信号变化点数量: {len(signal_changes)}")
|
||
|
||
# 显示前10个信号变化点
|
||
print(f"\n前10个信号变化点:")
|
||
for i, (date, signal) in enumerate(signal_changes.head(10).items()):
|
||
ratio_val = price_ratio.loc[date]
|
||
ma_val = ratio_ma.loc[date]
|
||
upper_val = upper_band.loc[date]
|
||
lower_val = lower_band.loc[date]
|
||
|
||
signal_desc = "平仓" if signal == 0 else ("做多" if signal == 1 else "做空")
|
||
print(f" {date}: {signal_desc} (比率: {ratio_val:.4f}, 均线: {ma_val:.4f})")
|
||
|
||
# 检查订单时间与信号时间的对应关系
|
||
if 'Timestamp' in orders_df.columns:
|
||
order_dates = pd.to_datetime(orders_df['Timestamp']).unique()
|
||
print(f"\n订单执行日期: {len(order_dates)}个")
|
||
print("订单日期:", sorted(order_dates))
|
||
|
||
# 检查哪些信号日期有对应的订单
|
||
signal_dates_with_orders = []
|
||
signal_dates_without_orders = []
|
||
|
||
for date in signal_changes.index:
|
||
if date in order_dates:
|
||
signal_dates_with_orders.append(date)
|
||
else:
|
||
signal_dates_without_orders.append(date)
|
||
|
||
print(f"\n有对应订单的信号日期: {len(signal_dates_with_orders)}")
|
||
print(f"无对应订单的信号日期: {len(signal_dates_without_orders)}")
|
||
|
||
if signal_dates_without_orders:
|
||
print("缺失订单的信号日期:", signal_dates_without_orders[:5])
|
||
|
||
|
||
def generate_strategy():
|
||
"""生成配对交易策略"""
|
||
# 获取数据
|
||
smic_data, hhic_data = get_processed_data()
|
||
|
||
# ========== 创建价格比率作为独立资产 ==========
|
||
print("\n=== 创建价格比率作为独立资产 ===")
|
||
close_smic = smic_data['close']
|
||
close_hhic = hhic_data['close']
|
||
|
||
# 计算价格比率 - 作为独立的"股票"
|
||
price_ratio = close_smic / close_hhic
|
||
price_ratio.name = 'SMIC_HHIC_RATIO'
|
||
|
||
print(f"价格比率数据形状: {price_ratio.shape}")
|
||
print(f"价格比率统计:")
|
||
print(f" 均值: {price_ratio.mean():.4f}")
|
||
print(f" 标准差: {price_ratio.std():.4f}")
|
||
print(f" 最小值: {price_ratio.min():.4f}")
|
||
print(f" 最大值: {price_ratio.max():.4f}")
|
||
|
||
# 设置交易参数
|
||
initial_cash = 100000
|
||
commission = 0.001 # 0.1% 交易佣金
|
||
position_size = 0.5 # 每次交易仓位比例
|
||
|
||
# 计算信号
|
||
signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals(
|
||
price_ratio, window=20, num_std=2
|
||
)
|
||
|
||
print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}")
|
||
|
||
# 生成比率交易的size数据(用于基于比率的回测)
|
||
ratio_size = generate_ratio_size(signals, price_ratio, position_size)
|
||
print(f"比率size数据形状: {ratio_size.shape}")
|
||
print(f"比率非零交易数量: {(ratio_size != 0).sum()}")
|
||
|
||
# 生成真实股票交易的size数据(用于真实股票回测)
|
||
stock_sizes = generate_stock_sizes(signals, close_smic, close_hhic, initial_cash, position_size)
|
||
print(f"股票size数据形状: {stock_sizes.shape}")
|
||
print(f"中芯国际非零交易数量: {(stock_sizes['SMIC'] != 0).sum()}")
|
||
print(f"华虹半导体非零交易数量: {(stock_sizes['HHIC'] != 0).sum()}")
|
||
|
||
return {
|
||
'price_ratio': price_ratio,
|
||
'signals': signals,
|
||
'ratio_size': ratio_size, # 基于比率的size
|
||
'stock_sizes': stock_sizes, # 真实股票的size
|
||
'ratio_ma': ratio_ma,
|
||
'upper_band': upper_band,
|
||
'lower_band': lower_band,
|
||
'initial_cash': initial_cash,
|
||
'commission': commission,
|
||
'smic_data': smic_data,
|
||
'hhic_data': hhic_data,
|
||
'close_smic': close_smic,
|
||
'close_hhic': close_hhic
|
||
}
|
||
|
||
def create_portfolio(strategy_data):
|
||
"""创建基于价格比率的投资组合(用于分析)"""
|
||
print("创建基于价格比率的投资组合...")
|
||
|
||
price_ratio = strategy_data['price_ratio']
|
||
ratio_size = strategy_data['ratio_size']
|
||
initial_cash = strategy_data['initial_cash']
|
||
commission = strategy_data['commission']
|
||
|
||
try:
|
||
# 将price_ratio转换为DataFrame(vectorbt需要)
|
||
ratio_close = pd.DataFrame({'RATIO': price_ratio})
|
||
|
||
portfolio = vbt.Portfolio.from_orders(
|
||
close=ratio_close, # 只传入比率数据
|
||
size=ratio_size, # 基于比率的交易信号
|
||
init_cash=initial_cash,
|
||
fees=commission,
|
||
freq='D'
|
||
)
|
||
|
||
print("基于价格比率的投资组合创建成功!")
|
||
return portfolio
|
||
|
||
except Exception as e:
|
||
print(f"创建投资组合时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def create_real_stock_portfolio(strategy_data):
|
||
"""创建基于真实股票的投资组合(用于真实收益计算)"""
|
||
print("创建基于真实股票的投资组合...")
|
||
|
||
close_smic = strategy_data['close_smic']
|
||
close_hhic = strategy_data['close_hhic']
|
||
stock_sizes = strategy_data['stock_sizes']
|
||
initial_cash = strategy_data['initial_cash']
|
||
commission = strategy_data['commission']
|
||
|
||
try:
|
||
# 创建包含两只股票收盘价的DataFrame
|
||
close_df = pd.DataFrame({
|
||
'SMIC': close_smic,
|
||
'HHIC': close_hhic
|
||
})
|
||
|
||
# 创建投资组合
|
||
portfolio = vbt.Portfolio.from_orders(
|
||
close=close_df, # 传入两只股票的收盘价
|
||
size=stock_sizes, # 传入两只股票的交易数量
|
||
init_cash=initial_cash,
|
||
fees=commission,
|
||
freq='D'
|
||
)
|
||
|
||
print("基于真实股票的投资组合创建成功!")
|
||
return portfolio
|
||
|
||
except Exception as e:
|
||
print(f"创建真实股票投资组合时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5):
|
||
"""
|
||
生成真实股票交易的size数据
|
||
返回两个DataFrame:smic_size和hhic_size
|
||
"""
|
||
# 创建空的size Series
|
||
smic_size = pd.Series(0.0, index=signals.index, name='SMIC')
|
||
hhic_size = pd.Series(0.0, index=signals.index, name='HHIC')
|
||
|
||
current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差
|
||
|
||
for i in range(len(signals)):
|
||
if i < 20: # 跳过布林带计算期
|
||
continue
|
||
|
||
signal = signals.iloc[i]
|
||
smic_price = close_smic.iloc[i]
|
||
hhic_price = close_hhic.iloc[i]
|
||
|
||
if signal == 1 and current_position != 1: # 做多价差:买入中芯,卖空华虹
|
||
# 计算每只股票的仓位价值(等市值对冲)
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做多中芯国际
|
||
smic_shares = position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
|
||
# 做空华虹半导体
|
||
hhic_shares = -position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
|
||
current_position = 1
|
||
|
||
elif signal == -1 and current_position != -1: # 做空价差:卖空中芯,买入华虹
|
||
# 计算每只股票的仓位价值(等市值对冲)
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做空中芯国际
|
||
smic_shares = -position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
|
||
# 做多华虹半导体
|
||
hhic_shares = position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
|
||
current_position = -1
|
||
|
||
elif signal == 0 and current_position != 0: # 平仓
|
||
# 平掉所有仓位
|
||
if current_position == 1: # 平掉做多价差仓位
|
||
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
|
||
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
|
||
elif current_position == -1: # 平掉做空价差仓位
|
||
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
|
||
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
|
||
|
||
current_position = 0
|
||
|
||
# 创建size DataFrame
|
||
size_df = pd.DataFrame({
|
||
'SMIC': smic_size,
|
||
'HHIC': hhic_size
|
||
})
|
||
|
||
return size_df
|
||
|
||
|
||
if __name__ == "__main__":
|
||
strategy_data = generate_strategy()
|
||
|
||
# 测试基于比率的投资组合
|
||
ratio_portfolio = create_portfolio(strategy_data)
|
||
|
||
# 测试基于真实股票的投资组合
|
||
stock_portfolio = create_real_stock_portfolio(strategy_data)
|
||
|
||
print("策略生成完成!") |