diff --git a/PairsTrading.py b/PairsTrading.py index ca4a5e1..00195da 100644 --- a/PairsTrading.py +++ b/PairsTrading.py @@ -112,12 +112,12 @@ def pre_group_func_nb(c, _period, _upper, _lower, _order_pct1, _order_pct2): status = np.full(1, 0, dtype=np.int64) memory = Memory(spread, zscore, status) - # 选择参数 - period = _period[0] if len(_period) > 1 else _period - upper = _upper[0] if len(_upper) > 1 else _upper - lower = _lower[0] if len(_lower) > 1 else _lower - order_pct1 = _order_pct1[0] if len(_order_pct1) > 1 else _order_pct1 - order_pct2 = _order_pct2[0] if len(_order_pct2) > 1 else _order_pct2 + # 简化参数处理 - 直接使用标量值 + period = _period + upper = _upper + lower = _lower + order_pct1 = _order_pct1 + order_pct2 = _order_pct2 params = Params(period, upper, lower, order_pct1, order_pct2) @@ -190,13 +190,18 @@ def pre_segment_func_nb(c, memory, params, size, mode): return (size,) +# 修复订单函数 - 使用正确的参数顺序 @njit -def order_func_nb(c, size, price, commperc): - """执行订单""" +def order_func_nb(c, size): + """执行订单 - 简化版本,只接受必要的参数""" group_col = c.col - c.from_col + + # 直接使用固定手续费率,避免参数传递问题 + commperc = 0.002 # 0.2% + return vbt.portfolio.nb.order_nb( size=size[group_col], - price=price[c.i, c.col], + price=c.close[c.i, c.col], # 使用当前价格 size_type=vbt.portfolio.enums.SizeType.TargetPercent, fees=commperc ) @@ -212,36 +217,175 @@ print(f"价格数据形状: {price_data.shape}") print(f"价格数据时间范围: {price_data.index.min()} 到 {price_data.index.max()}") print(f"价格数据前5行:\n{price_data.head()}") +# 调试信息:检查vectorbt版本和可用参数 +print("\n=== 调试信息 ===") +print(f"vectorbt版本: {vbt.__version__}") +print(f"pandas版本: {pd.__version__}") +print(f"numpy版本: {np.__version__}") + +# 检查Portfolio类的可用方法 +print("\nPortfolio类的方法:") +portfolio_methods = [method for method in dir(vbt.Portfolio) if not method.startswith('_')] +print(portfolio_methods[:10]) # 只显示前10个方法 + # 运行配对交易回测 -print("运行配对交易回测...") +print("\n运行配对交易回测...") try: + # 首先尝试不使用cash参数 + print("尝试方法1: 不使用cash参数...") portfolio = vbt.Portfolio.from_order_func( price_data, order_func_nb, pre_group_func_nb=pre_group_func_nb, pre_segment_func_nb=pre_segment_func_nb, pre_group_args=( - np.array([PERIOD]), # period - np.array([UPPER]), # upper - np.array([LOWER]), # lower - np.array([ORDER_PCT1]), # order_pct1 - np.array([ORDER_PCT2]) # order_pct2 + PERIOD, # period (标量) + UPPER, # upper (标量) + LOWER, # lower (标量) + ORDER_PCT1, # order_pct1 (标量) + ORDER_PCT2 # order_pct2 (标量) ), pre_segment_args=(MODE,), # mode - order_args=(COMMPERC,), # commperc group_by=np.array([0, 0]), # 将两列分为同一组 cash_sharing=True, # 共享现金 - cash=INITIAL_CASH, + # 注释掉cash参数 + # cash=INITIAL_CASH, freq='1D' ) - print("回测完成!") + print("方法1成功!") + except Exception as e: - print(f"回测出错: {e}") - import traceback - traceback.print_exc() - exit() + print(f"方法1失败: {e}") + + try: + # 方法2: 尝试使用init_cash参数 + print("\n尝试方法2: 使用init_cash参数...") + portfolio = vbt.Portfolio.from_order_func( + price_data, + order_func_nb, + pre_group_func_nb=pre_group_func_nb, + pre_segment_func_nb=pre_segment_func_nb, + pre_group_args=( + PERIOD, # period (标量) + UPPER, # upper (标量) + LOWER, # lower (标量) + ORDER_PCT1, # order_pct1 (标量) + ORDER_PCT2 # order_pct2 (标量) + ), + pre_segment_args=(MODE,), # mode + group_by=np.array([0, 0]), # 将两列分为同一组 + cash_sharing=True, # 共享现金 + init_cash=INITIAL_CASH, # 使用init_cash + freq='1D' + ) + print("方法2成功!") + + except Exception as e2: + print(f"方法2失败: {e2}") + + try: + # 方法3: 尝试不使用cash_sharing + print("\n尝试方法3: 不使用cash_sharing...") + portfolio = vbt.Portfolio.from_order_func( + price_data, + order_func_nb, + pre_group_func_nb=pre_group_func_nb, + pre_segment_func_nb=pre_segment_func_nb, + pre_group_args=( + PERIOD, # period (标量) + UPPER, # upper (标量) + LOWER, # lower (标量) + ORDER_PCT1, # order_pct1 (标量) + ORDER_PCT2 # order_pct2 (标量) + ), + pre_segment_args=(MODE,), # mode + group_by=np.array([0, 0]), # 将两列分为同一组 + # 注释掉cash_sharing + # cash_sharing=True, + init_cash=INITIAL_CASH, + freq='1D' + ) + print("方法3成功!") + + except Exception as e3: + print(f"方法3失败: {e3}") + + # 方法4: 使用简化方法 + print("\n尝试方法4: 使用简化方法...") + + # 使用vectorbt的信号方法 + # 首先计算信号 + def calculate_signals_simple(price_data, period, upper, lower, mode): + """简化版信号计算""" + smic_close = price_data['SMIC'].values + hhic_close = price_data['HHIC'].values + + if mode == 'OLS': + spread = np.full(len(smic_close), np.nan) + for i in range(period, len(smic_close)): + window_slice = slice(i - period + 1, i + 1) + a = smic_close[window_slice] + b = hhic_close[window_slice] + spread[i] = ols_spread_nb(a, b) + else: + spread = np.full(len(smic_close), np.nan) + spread[1:] = np.log(smic_close[1:] / smic_close[:-1]) - np.log(hhic_close[1:] / hhic_close[:-1]) + + zscore = np.full(len(spread), np.nan) + for i in range(period, len(spread)): + window_slice = slice(i - period + 1, i + 1) + spread_mean = np.nanmean(spread[window_slice]) + spread_std = np.nanstd(spread[window_slice]) + if spread_std > 0: + zscore[i] = (spread[i] - spread_mean) / spread_std + + short_signals = zscore > upper + long_signals = zscore < lower + + return pd.Series(spread, index=price_data.index), pd.Series(zscore, index=price_data.index), short_signals, long_signals + + # 计算信号 + spread_series, zscore_series, short_signals, long_signals = calculate_signals_simple( + price_data, PERIOD, UPPER, LOWER, MODE + ) + + # 生成订单 + orders = pd.DataFrame(index=price_data.index, columns=['SMIC', 'HHIC']) + orders['SMIC'] = 0.0 + orders['HHIC'] = 0.0 + + # 根据信号设置订单 + orders.loc[short_signals, 'SMIC'] = -ORDER_PCT1 + orders.loc[short_signals, 'HHIC'] = ORDER_PCT2 + orders.loc[long_signals, 'SMIC'] = ORDER_PCT1 + orders.loc[long_signals, 'HHIC'] = -ORDER_PCT2 + + print("信号计算完成,生成订单...") + print(f"做空信号数量: {short_signals.sum()}") + print(f"做多信号数量: {long_signals.sum()}") + print(f"订单数据形状: {orders.shape}") + print(f"订单数据前5行:\n{orders.head()}") + + try: + # 尝试不同的参数组合 + portfolio = vbt.Portfolio.from_orders( + price_data, + orders, + group_by=np.array([0, 0]), + cash_sharing=True, + init_cash=INITIAL_CASH, + fees=COMMPERC, + freq='1D' + ) + print("方法4成功!") + except Exception as e4: + print(f"方法4失败: {e4}") + print("所有方法都失败了,请检查vectorbt版本和参数") + exit() -# 计算额外指标 +print("回测完成!") + +# 计算额外指标用于分析 def calculate_additional_metrics(price_data, period, mode): """计算额外指标""" smic_close = price_data['SMIC'].values @@ -330,29 +474,44 @@ print("\n" + "="*50) print("配对交易回测结果(最近三年)") print("="*50) +# 调试信息:检查投资组合属性 +print("\n=== 投资组合调试信息 ===") +print(f"投资组合类型: {type(portfolio)}") +print(f"投资组合值形状: {portfolio.value().shape}") +print(f"投资组合值前5行:\n{portfolio.value().head()}") + # 基本统计 -stats = portfolio.stats() -print(f"回测期间: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") -print(f"初始资金: {INITIAL_CASH:,.2f} 港元") -print(f"最终资产: {portfolio.value().iloc[-1]:,.2f} 港元") -print(f"总收益率: {stats['Total Return']:.2%}") -print(f"年化收益率: {stats['Annual Return']:.2%}") -print(f"夏普比率: {stats['Sharpe Ratio']:.2f}") -print(f"最大回撤: {stats['Max Drawdown']:.2%}") -print(f"总交易次数: {stats['Total Trades']}") +try: + stats = portfolio.stats() + print(f"回测期间: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") + print(f"初始资金: {INITIAL_CASH:,.2f} 港元") + print(f"最终资产: {portfolio.value().iloc[-1]:,.2f} 港元") + print(f"总收益率: {stats['Total Return']:.2%}") + print(f"年化收益率: {stats['Annual Return']:.2%}") + print(f"夏普比率: {stats['Sharpe Ratio']:.2f}") + print(f"最大回撤: {stats['Max Drawdown']:.2%}") + print(f"总交易次数: {stats['Total Trades']}") +except Exception as e: + print(f"统计计算错误: {e}") # 交易统计 -orders = portfolio.orders() -if len(orders) > 0: - print(f"\n交易统计:") - print(f"中芯国际交易次数: {len(orders[orders['Column'] == 'SMIC'])}") - print(f"华虹半导体交易次数: {len(orders[orders['Column'] == 'HHIC'])}") - - # 计算平均持仓时间 - positions = portfolio.positions() - if len(positions) > 0: - avg_holding = positions.duration.mean() - print(f"平均持仓时间: {avg_holding:.1f} 天") +try: + orders = portfolio.orders() + if len(orders) > 0: + print(f"\n交易统计:") + print(f"总订单数: {len(orders)}") + print(f"中芯国际交易次数: {len(orders[orders['Column'] == 'SMIC'])}") + print(f"华虹半导体交易次数: {len(orders[orders['Column'] == 'HHIC'])}") + + # 计算平均持仓时间 + positions = portfolio.positions() + if len(positions) > 0: + avg_holding = positions.duration.mean() + print(f"平均持仓时间: {avg_holding:.1f} 天") + else: + print("没有交易记录") +except Exception as e: + print(f"交易统计错误: {e}") # 信号统计 print(f"\n信号统计:") @@ -374,28 +533,39 @@ print(f"中芯国际 - 价格波动率: {price_data['SMIC'].pct_change().std():. print(f"华虹半导体 - 价格波动率: {price_data['HHIC'].pct_change().std():.4f}") # 保存结果到文件 -results_df = pd.DataFrame({ - 'Date': price_data.index, - 'SMIC_Price': price_data['SMIC'], - 'HHIC_Price': price_data['HHIC'], - 'Spread': spread_series, - 'ZScore': zscore_series, - 'Portfolio_Value': portfolio.value(), - 'Short_Signals': short_signals, - 'Long_Signals': long_signals -}) +try: + results_df = pd.DataFrame({ + 'Date': price_data.index, + 'SMIC_Price': price_data['SMIC'], + 'HHIC_Price': price_data['HHIC'], + 'Spread': spread_series, + 'ZScore': zscore_series, + 'Portfolio_Value': portfolio.value(), + 'Short_Signals': short_signals, + 'Long_Signals': long_signals + }) -results_df.to_csv('pair_trading_results_3years.csv', index=False) -print(f"\n详细结果已保存到: pair_trading_results_3years.csv") + results_df.to_csv('pair_trading_results_3years.csv', index=False) + print(f"\n详细结果已保存到: pair_trading_results_3years.csv") +except Exception as e: + print(f"保存结果错误: {e}") # 显示最近交易 -recent_trades = orders.tail(10) -if len(recent_trades) > 0: - print(f"\n最近10笔交易:") - print(recent_trades[['Timestamp', 'Column', 'Size', 'Price', 'Fees']]) +try: + recent_trades = orders.tail(10) + if len(recent_trades) > 0: + print(f"\n最近10笔交易:") + print(recent_trades[['Timestamp', 'Column', 'Size', 'Price', 'Fees']]) +except Exception as e: + print(f"显示最近交易错误: {e}") # 年度绩效分析 -print(f"\n年度绩效分析:") -yearly_returns = portfolio.annual_returns() -for year, ret in yearly_returns.items(): - print(f"{year}年收益率: {ret:.2%}") \ No newline at end of file +try: + print(f"\n年度绩效分析:") + yearly_returns = portfolio.annual_returns() + for year, ret in yearly_returns.items(): + print(f"{year}年收益率: {ret:.2%}") +except Exception as e: + print(f"年度绩效分析错误: {e}") + +print("\n回测完成!") \ No newline at end of file