# strategy_executor.py import pandas as pd import numpy as np import vectorbt as vbt from data_fetcher import get_processed_data def calculate_ratio_signals(ratio_series, window=20, num_std=2): """ 基于价格比率计算配对交易信号 修正信号逻辑:比率低时做多价差,比率高时做空价差 """ # 计算布林带 ratio_ma = ratio_series.rolling(window=window).mean() ratio_std = ratio_series.rolling(window=window).std() upper_band = ratio_ma + num_std * ratio_std lower_band = ratio_ma - num_std * ratio_std # 生成交易信号 signals = pd.Series(0, index=ratio_series.index, name='signal') current_signal = 0 for i in range(len(ratio_series)): if i < window: # 跳过布林带计算期 continue ratio_val = ratio_series.iloc[i] ma_val = ratio_ma.iloc[i] upper_val = upper_band.iloc[i] lower_val = lower_band.iloc[i] # 当前无仓位时的开仓条件 if current_signal == 0: # 比率突破下轨:SMIC相对低估,做多价差(买入SMIC,卖空HHIC) if ratio_val < lower_val: signals.iloc[i] = 1 # 做多价差 current_signal = 1 print(f"开仓做多价差: {ratio_series.index[i]} - 比率{ratio_val:.4f} < 下轨{lower_val:.4f}") # 比率突破上轨:SMIC相对高估,做空价差(卖空SMIC,买入HHIC) elif ratio_val > upper_val: signals.iloc[i] = -1 # 做空价差 current_signal = -1 print(f"开仓做空价差: {ratio_series.index[i]} - 比率{ratio_val:.4f} > 上轨{upper_val:.4f}") # 当前有仓位时的平仓条件 elif current_signal != 0: # 当比率回归均值时平仓 if (current_signal == 1 and ratio_val >= ma_val) or \ (current_signal == -1 and ratio_val <= ma_val): signals.iloc[i] = 0 current_signal = 0 print(f"平仓: {ratio_series.index[i]} - 比率{ratio_val:.4f} 回归均线{ma_val:.4f}") return signals, ratio_ma, upper_band, lower_band def generate_ratio_size(signals, price_ratio, position_size=0.5): """ 生成比率交易的size数据 修复数据类型问题 """ # 创建与price_ratio相同形状的size Series,初始为0 size_series = pd.Series(0.0, index=signals.index, name='size') # 改为float类型 current_position = 0 for i in range(len(signals)): if i < 20: # 跳过布林带计算期 continue signal = signals.iloc[i] if signal == 1 and current_position != 1: # 做多价差 -> 买入比率 size_series.iloc[i] = float(position_size) # 明确转换为float current_position = 1 elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率 size_series.iloc[i] = -float(position_size) # 明确转换为float current_position = -1 elif signal == 0 and current_position != 0: # 平仓 size_series.iloc[i] = 0.0 # 明确转换为float current_position = 0 return size_series def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5): """ 生成真实股票交易的size数据 添加详细的调试信息来跟踪信号执行 """ # 创建空的size Series smic_size = pd.Series(0.0, index=signals.index, name='SMIC') hhic_size = pd.Series(0.0, index=signals.index, name='HHIC') current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差 smic_position = 0.0 # 当前中芯持仓数量 hhic_position = 0.0 # 当前华虹持仓数量 print(f"\n=== 交易执行调试信息 ===") print(f"信号总数: {len(signals)}") print(f"非零信号数量: {(signals != 0).sum()}") print(f"做多信号: {(signals == 1).sum()}, 做空信号: {(signals == -1).sum()}, 平仓信号: {(signals == 0).sum()}") for i in range(len(signals)): if i < 20: # 跳过布林带计算期 continue signal = signals.iloc[i] date = signals.index[i] smic_price = close_smic.iloc[i] hhic_price = close_hhic.iloc[i] # 只有信号变化时才执行交易 if signal != 0 or (signal == 0 and current_position != 0): print(f"\n日期: {date}, 信号: {signal}, 当前仓位: {current_position}") # 平仓条件:信号为0且当前有仓位 if signal == 0 and current_position != 0: print(f" 执行平仓: SMIC持仓{smic_position:.2f}, HHIC持仓{hhic_position:.2f}") # 平掉所有仓位 if smic_position != 0: smic_size.iloc[i] = -smic_position smic_position = 0.0 print(f" 平仓SMIC: {-smic_size.iloc[i]:.2f}股") if hhic_position != 0: hhic_size.iloc[i] = -hhic_position hhic_position = 0.0 print(f" 平仓HHIC: {-hhic_size.iloc[i]:.2f}股") current_position = 0 continue # 开仓条件:只有当前无仓位时才开新仓 if current_position == 0 and signal != 0: if signal == 1: # 做多价差:买入中芯,卖空华虹 # 计算每只股票的仓位价值(等市值对冲) position_value = initial_cash * position_ratio # 做多中芯国际 smic_shares = position_value / smic_price smic_size.iloc[i] = smic_shares smic_position = smic_shares # 做空华虹半导体 hhic_shares = -position_value / hhic_price hhic_size.iloc[i] = hhic_shares hhic_position = hhic_shares current_position = 1 print(f" 执行做多价差开仓:") print(f" 买入SMIC: {smic_shares:.2f}股 @{smic_price:.2f}") print(f" 卖空HHIC: {abs(hhic_shares):.2f}股 @{hhic_price:.2f}") elif signal == -1: # 做空价差:卖空中芯,买入华虹 # 计算每只股票的仓位价值(等市值对冲) position_value = initial_cash * position_ratio # 做空中芯国际 smic_shares = -position_value / smic_price smic_size.iloc[i] = smic_shares smic_position = smic_shares # 做多华虹半导体 hhic_shares = position_value / hhic_price hhic_size.iloc[i] = hhic_shares hhic_position = hhic_shares current_position = -1 print(f" 执行做空价差开仓:") print(f" 卖空SMIC: {abs(smic_shares):.2f}股 @{smic_price:.2f}") print(f" 买入HHIC: {hhic_shares:.2f}股 @{hhic_price:.2f}") # 最后检查是否有未平仓的仓位,如果有则在最后一天平仓 if current_position != 0: last_index = len(signals) - 1 last_date = signals.index[last_index] print(f"\n最终平仓: {last_date}") if smic_position != 0: smic_size.iloc[last_index] = -smic_position print(f" 平仓SMIC: {-smic_size.iloc[last_index]:.2f}股") if hhic_position != 0: hhic_size.iloc[last_index] = -hhic_position print(f" 平仓HHIC: {-hhic_size.iloc[last_index]:.2f}股") # 统计实际执行的交易 smic_trades = (smic_size != 0).sum() hhic_trades = (hhic_size != 0).sum() print(f"\n实际执行交易统计:") print(f" SMIC交易次数: {smic_trades}") print(f" HHIC交易次数: {hhic_trades}") # 创建size DataFrame size_df = pd.DataFrame({ 'SMIC': smic_size, 'HHIC': hhic_size }) return size_df def debug_signal_logic(strategy_data): """调试信号生成逻辑""" print("\n" + "="*60) print("=== 信号生成逻辑调试 ===") print("="*60) signals = strategy_data['signals'] price_ratio = strategy_data['price_ratio'] ratio_ma = strategy_data['ratio_ma'] upper_band = strategy_data['upper_band'] lower_band = strategy_data['lower_band'] # 检查关键日期的信号逻辑 key_dates = [ '2024-12-20', '2024-12-23', '2025-02-18', '2025-02-19', '2025-04-22', '2025-04-23', '2025-05-16', '2025-05-19' ] for date_str in key_dates: date = pd.Timestamp(date_str) if date in signals.index: signal = signals.loc[date] ratio_val = price_ratio.loc[date] ma_val = ratio_ma.loc[date] upper_val = upper_band.loc[date] lower_val = lower_band.loc[date] print(f"\n{date}:") print(f" 比率: {ratio_val:.4f}, 均线: {ma_val:.4f}") print(f" 上轨: {upper_val:.4f}, 下轨: {lower_val:.4f}") print(f" 信号: {signal} ({'平仓' if signal == 0 else '做多' if signal == 1 else '做空'})") # 检查信号逻辑 if signal == 1: print(f" 逻辑: 比率{ratio_val:.4f} < 下轨{lower_val:.4f} = {ratio_val < lower_val}") elif signal == -1: print(f" 逻辑: 比率{ratio_val:.4f} > 上轨{upper_val:.4f} = {ratio_val > upper_val}") elif signal == 0: print(f" 逻辑: 平仓信号") def debug_signal_consistency(strategy_data, stock_portfolio): """调试信号和交易的一致性""" print("\n" + "="*60) print("=== 信号与交易一致性调试 ===") print("="*60) signals = strategy_data['signals'] price_ratio = strategy_data['price_ratio'] ratio_ma = strategy_data['ratio_ma'] upper_band = strategy_data['upper_band'] lower_band = strategy_data['lower_band'] # 获取订单记录 orders_df = stock_portfolio.orders.records_readable print(f"\n信号数据:") print(f"信号时间范围: {signals.index[0]} 到 {signals.index[-1]}") print(f"信号总数: {len(signals)}") # 找出所有信号变化点 signal_changes = signals[signals != signals.shift(1)] print(f"\n信号变化点数量: {len(signal_changes)}") # 显示前10个信号变化点 print(f"\n前10个信号变化点:") for i, (date, signal) in enumerate(signal_changes.head(10).items()): ratio_val = price_ratio.loc[date] ma_val = ratio_ma.loc[date] upper_val = upper_band.loc[date] lower_val = lower_band.loc[date] signal_desc = "平仓" if signal == 0 else ("做多" if signal == 1 else "做空") print(f" {date}: {signal_desc} (比率: {ratio_val:.4f}, 均线: {ma_val:.4f})") # 检查订单时间与信号时间的对应关系 if 'Timestamp' in orders_df.columns: order_dates = pd.to_datetime(orders_df['Timestamp']).unique() print(f"\n订单执行日期: {len(order_dates)}个") print("订单日期:", sorted(order_dates)) # 检查哪些信号日期有对应的订单 signal_dates_with_orders = [] signal_dates_without_orders = [] for date in signal_changes.index: if date in order_dates: signal_dates_with_orders.append(date) else: signal_dates_without_orders.append(date) print(f"\n有对应订单的信号日期: {len(signal_dates_with_orders)}") print(f"无对应订单的信号日期: {len(signal_dates_without_orders)}") if signal_dates_without_orders: print("缺失订单的信号日期:", signal_dates_without_orders[:5]) def generate_strategy(): """生成配对交易策略""" # 获取数据 smic_data, hhic_data = get_processed_data() # ========== 创建价格比率作为独立资产 ========== print("\n=== 创建价格比率作为独立资产 ===") close_smic = smic_data['close'] close_hhic = hhic_data['close'] # 计算价格比率 - 作为独立的"股票" price_ratio = close_smic / close_hhic price_ratio.name = 'SMIC_HHIC_RATIO' print(f"价格比率数据形状: {price_ratio.shape}") print(f"价格比率统计:") print(f" 均值: {price_ratio.mean():.4f}") print(f" 标准差: {price_ratio.std():.4f}") print(f" 最小值: {price_ratio.min():.4f}") print(f" 最大值: {price_ratio.max():.4f}") # 设置交易参数 initial_cash = 100000 commission = 0.001 # 0.1% 交易佣金 position_size = 0.5 # 每次交易仓位比例 # 计算信号 signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals( price_ratio, window=20, num_std=2 ) print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}") # 生成比率交易的size数据(用于基于比率的回测) ratio_size = generate_ratio_size(signals, price_ratio, position_size) print(f"比率size数据形状: {ratio_size.shape}") print(f"比率非零交易数量: {(ratio_size != 0).sum()}") # 生成真实股票交易的size数据(用于真实股票回测) stock_sizes = generate_stock_sizes(signals, close_smic, close_hhic, initial_cash, position_size) print(f"股票size数据形状: {stock_sizes.shape}") print(f"中芯国际非零交易数量: {(stock_sizes['SMIC'] != 0).sum()}") print(f"华虹半导体非零交易数量: {(stock_sizes['HHIC'] != 0).sum()}") return { 'price_ratio': price_ratio, 'signals': signals, 'ratio_size': ratio_size, # 基于比率的size 'stock_sizes': stock_sizes, # 真实股票的size 'ratio_ma': ratio_ma, 'upper_band': upper_band, 'lower_band': lower_band, 'initial_cash': initial_cash, 'commission': commission, 'smic_data': smic_data, 'hhic_data': hhic_data, 'close_smic': close_smic, 'close_hhic': close_hhic } def create_portfolio(strategy_data): """创建基于价格比率的投资组合(用于分析)""" print("创建基于价格比率的投资组合...") price_ratio = strategy_data['price_ratio'] ratio_size = strategy_data['ratio_size'] initial_cash = strategy_data['initial_cash'] commission = strategy_data['commission'] try: # 将price_ratio转换为DataFrame(vectorbt需要) ratio_close = pd.DataFrame({'RATIO': price_ratio}) portfolio = vbt.Portfolio.from_orders( close=ratio_close, # 只传入比率数据 size=ratio_size, # 基于比率的交易信号 init_cash=initial_cash, fees=commission, freq='D' ) print("基于价格比率的投资组合创建成功!") return portfolio except Exception as e: print(f"创建投资组合时出错: {e}") import traceback traceback.print_exc() return None def create_real_stock_portfolio(strategy_data): """创建基于真实股票的投资组合(用于真实收益计算)""" print("创建基于真实股票的投资组合...") close_smic = strategy_data['close_smic'] close_hhic = strategy_data['close_hhic'] stock_sizes = strategy_data['stock_sizes'] initial_cash = strategy_data['initial_cash'] commission = strategy_data['commission'] try: # 创建包含两只股票收盘价的DataFrame close_df = pd.DataFrame({ 'SMIC': close_smic, 'HHIC': close_hhic }) # 创建投资组合 portfolio = vbt.Portfolio.from_orders( close=close_df, # 传入两只股票的收盘价 size=stock_sizes, # 传入两只股票的交易数量 init_cash=initial_cash, fees=commission, freq='D' ) print("基于真实股票的投资组合创建成功!") return portfolio except Exception as e: print(f"创建真实股票投资组合时出错: {e}") import traceback traceback.print_exc() return None def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5): """ 生成真实股票交易的size数据 返回两个DataFrame:smic_size和hhic_size """ # 创建空的size Series smic_size = pd.Series(0.0, index=signals.index, name='SMIC') hhic_size = pd.Series(0.0, index=signals.index, name='HHIC') current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差 for i in range(len(signals)): if i < 20: # 跳过布林带计算期 continue signal = signals.iloc[i] smic_price = close_smic.iloc[i] hhic_price = close_hhic.iloc[i] if signal == 1 and current_position != 1: # 做多价差:买入中芯,卖空华虹 # 计算每只股票的仓位价值(等市值对冲) position_value = initial_cash * position_ratio # 做多中芯国际 smic_shares = position_value / smic_price smic_size.iloc[i] = smic_shares # 做空华虹半导体 hhic_shares = -position_value / hhic_price hhic_size.iloc[i] = hhic_shares current_position = 1 elif signal == -1 and current_position != -1: # 做空价差:卖空中芯,买入华虹 # 计算每只股票的仓位价值(等市值对冲) position_value = initial_cash * position_ratio # 做空中芯国际 smic_shares = -position_value / smic_price smic_size.iloc[i] = smic_shares # 做多华虹半导体 hhic_shares = position_value / hhic_price hhic_size.iloc[i] = hhic_shares current_position = -1 elif signal == 0 and current_position != 0: # 平仓 # 平掉所有仓位 if current_position == 1: # 平掉做多价差仓位 smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0 hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0 elif current_position == -1: # 平掉做空价差仓位 smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0 hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0 current_position = 0 # 创建size DataFrame size_df = pd.DataFrame({ 'SMIC': smic_size, 'HHIC': hhic_size }) return size_df if __name__ == "__main__": strategy_data = generate_strategy() # 测试基于比率的投资组合 ratio_portfolio = create_portfolio(strategy_data) # 测试基于真实股票的投资组合 stock_portfolio = create_real_stock_portfolio(strategy_data) print("策略生成完成!")