From aea7e7ed625c12cf44bb568b06d3939b56a72480 Mon Sep 17 00:00:00 2001 From: bingyi Date: Sat, 1 Nov 2025 17:15:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E4=BA=8E=E6=AF=94=E7=8E=87=E7=9A=84?= =?UTF-8?q?=E6=8A=80=E6=9C=AF=E5=88=86=E6=9E=90=E5=88=9D=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- OneCurvePairTrading.py | 397 ++++++++++++++++++++++++++++++++++++++ OneCurvePairTrading2.py | 409 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 806 insertions(+) create mode 100644 OneCurvePairTrading.py create mode 100644 OneCurvePairTrading2.py diff --git a/OneCurvePairTrading.py b/OneCurvePairTrading.py new file mode 100644 index 0000000..df9f128 --- /dev/null +++ b/OneCurvePairTrading.py @@ -0,0 +1,397 @@ +import vectorbt as vbt +import akshare as ak +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +import datetime +from numba import njit +from collections import namedtuple + +# 设置中文显示 +plt.rcParams['font.sans-serif'] = ['SimHei'] +plt.rcParams['axes.unicode_minus'] = False + +# 获取股票数据 +print("正在获取股票数据...") +stock_00981 = ak.stock_hk_daily(symbol="00981") +stock_01347 = ak.stock_hk_daily(symbol="01347") + +print("中芯国际数据列名:", stock_00981.columns.tolist()) +print("华虹半导体数据列名:", stock_01347.columns.tolist()) + +# 数据预处理 +def preprocess_data(df, symbol): + """预处理股票数据""" + df = df.copy() + # 检查列名并重命名(如果需要) + if 'date' in df.columns: + df['date'] = pd.to_datetime(df['date']) + df.set_index('date', inplace=True) + elif '日期' in df.columns: + df['date'] = pd.to_datetime(df['日期']) + df.set_index('date', inplace=True) + # 重命名中文列为英文 + rename_dict = { + '开盘': 'open', + '最高': 'high', + '最低': 'low', + '收盘': 'close', + '成交量': 'volume' + } + df = df.rename(columns=rename_dict) + else: + # 如果已经有英文列名,直接使用 + df.index = pd.to_datetime(df.index) + + df = df.sort_index() + return df[['open', 'high', 'low', 'close', 'volume']] + +# 预处理数据 +smic_data = preprocess_data(stock_00981, "00981") +hhic_data = preprocess_data(stock_01347, "01347") + +print(f"中芯国际原始数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") +print(f"华虹半导体原始数据时间范围: {hhic_data.index.min()} 到 {hhic_data.index.max()}") + +# 限制为最近半年数据 +end_date = smic_data.index.max() +start_date = end_date - pd.Timedelta(days=365) + +print(f"\n限制回测时间范围: {start_date} 到 {end_date}") + +smic_data = smic_data.loc[start_date:end_date] +hhic_data = hhic_data.loc[start_date:end_date] + +print(f"限制后中芯国际数据形状: {smic_data.shape}") +print(f"限制后华虹半导体数据形状: {hhic_data.shape}") + +# 对齐数据时间范围 +common_index = smic_data.index.intersection(hhic_data.index) +smic_data = smic_data.loc[common_index] +hhic_data = hhic_data.loc[common_index] + +print(f"\n对齐后数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") +print(f"对齐后数据点数: {len(smic_data)}") + +# ========== 创建价格比率作为独立资产 ========== +print("\n=== 创建价格比率作为独立资产 ===") +close_smic = smic_data['close'] +close_hhic = hhic_data['close'] + +# 计算价格比率 - 作为独立的"股票" +price_ratio = close_smic / close_hhic +price_ratio.name = 'SMIC_HHIC_RATIO' + +print(f"价格比率数据形状: {price_ratio.shape}") +print(f"价格比率统计:") +print(f" 均值: {price_ratio.mean():.4f}") +print(f" 标准差: {price_ratio.std():.4f}") +print(f" 最小值: {price_ratio.min():.4f}") +print(f" 最大值: {price_ratio.max():.4f}") + +# 设置交易参数 +initial_cash = 100000 +commission = 0.001 # 0.1% 交易佣金 +position_size = 0.5 # 每次交易仓位比例(现在只交易一个"资产") + +# ========== 基于比率数据计算信号 ========== +def calculate_ratio_signals(ratio_series, window=20, num_std=2): + """ + 基于价格比率计算配对交易信号 + """ + # 计算布林带 + ratio_ma = ratio_series.rolling(window=window).mean() + ratio_std = ratio_series.rolling(window=window).std() + + upper_band = ratio_ma + num_std * ratio_std + lower_band = ratio_ma - num_std * ratio_std + + # 生成交易信号 + # 1: 做多价差 (买中芯/卖华虹) -> 买入比率 + # -1: 做空价差 (卖中芯/买华虹) -> 卖空比率 + # 0: 平仓 + signals = pd.Series(0, index=ratio_series.index, name='signal') + + # 当比率突破下轨时做多价差 -> 买入比率 + long_condition = (ratio_series < lower_band) & (ratio_ma.notna()) + signals[long_condition] = 1 + + # 当比率突破上轨时做空价差 -> 卖空比率 + short_condition = (ratio_series > upper_band) & (ratio_ma.notna()) + signals[short_condition] = -1 + + # 当比率回归均值时平仓 + close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0) + signals[close_condition] = 0 + + return signals, ratio_ma, upper_band, lower_band + +# 计算信号 +signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals( + price_ratio, window=20, num_std=2 +) + +print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}") + +# ========== 基于信号生成size数据(现在只针对比率) ========== +def generate_ratio_size(signals, position_size=0.5): + """ + 生成比率交易的size数据 + 返回一个与price_ratio相同形状的Series,包含交易数量 + """ + # 创建与price_ratio相同形状的size Series,初始为0 + size_series = pd.Series(0, index=signals.index, name='size') + current_position = 0 + + for i in range(len(signals)): + if i < 20: # 跳过布林带计算期 + continue + + date = signals.index[i] + signal = signals.iloc[i] + + if signal == 1 and current_position != 1: # 做多价差 -> 买入比率 + # 计算头寸规模(基于比率价值) + # 这里我们假设比率的"价格"就是其实际值 + ratio_value = price_ratio.iloc[i] + # 买入相当于做多中芯/做空华虹 + size_series.iloc[i] = position_size # 正数表示买入比率 + current_position = 1 + + elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率 + # 卖空相当于做空中芯/做多华虹 + size_series.iloc[i] = -position_size # 负数表示卖空比率 + current_position = -1 + + elif signal == 0 and current_position != 0: # 平仓 + size_series.iloc[i] = 0 # 平仓 + current_position = 0 + + return size_series + +# 生成size数据 +size = generate_ratio_size(signals, position_size) + +print(f"size数据形状: {size.shape}") +print(f"非零交易数量: {(size != 0).sum()}") + +# ========== 创建投资组合(只基于比率) ========== +print("创建基于价格比率的投资组合...") +try: + # 将price_ratio转换为DataFrame(vectorbt需要) + ratio_close = pd.DataFrame({'RATIO': price_ratio}) + + portfolio = vbt.Portfolio.from_orders( + close=ratio_close, # 只传入比率数据 + size=size, # 基于比率的交易信号 + init_cash=initial_cash, + fees=commission, + freq='D' + ) + + print("基于价格比率的投资组合创建成功!") + + # 计算配对交易统计 + print("\n=== 基于价格比率的配对交易策略表现 ===") + + # 获取组合总价值 + portfolio_value = portfolio.value() + + # ========== 绘制基于比率的技术分析图表 ========== + print("\n绘制基于比率的技术分析图表...") + + portfolio['RATIO'].plot().show() + + # 创建详细的技术分析图表 + fig, axes = plt.subplots(4, 1, figsize=(15, 16)) + + # 1. 价格比率和布林带 + 交易信号 + axes[0].plot(price_ratio.index, price_ratio, label='价格比率(中芯/华虹)', linewidth=1.5, color='blue') + axes[0].plot(price_ratio.index, ratio_ma, label='移动平均', linewidth=1, alpha=0.7, color='orange') + axes[0].plot(price_ratio.index, upper_band, label='上轨', linewidth=1, alpha=0.7, linestyle='--', color='red') + axes[0].plot(price_ratio.index, lower_band, label='下轨', linewidth=1, alpha=0.7, linestyle='--', color='green') + axes[0].set_title('中芯国际-华虹半导体价格比率 (配对交易标的)') + axes[0].set_ylabel('价格比率') + axes[0].legend() + axes[0].grid(True, alpha=0.3) + + # 标记交易信号 + long_signals = signals[signals == 1] + short_signals = signals[signals == -1] + + if len(long_signals) > 0: + axes[0].scatter(long_signals.index, price_ratio[long_signals.index], + color='green', marker='^', s=80, label='买入比率(做多中芯/做空华虹)', zorder=5) + if len(short_signals) > 0: + axes[0].scatter(short_signals.index, price_ratio[short_signals.index], + color='red', marker='v', s=80, label='卖空比率(做空中芯/做多华虹)', zorder=5) + + # 2. 交易信号 + axes[1].plot(signals.index, signals, label='交易信号', linewidth=2, color='purple', drawstyle='steps-post') + axes[1].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='买入信号') + axes[1].axhline(y=-1, color='red', linestyle='--', alpha=0.5, label='卖出信号') + axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3) + axes[1].set_title('交易信号时序') + axes[1].set_ylabel('信号') + axes[1].set_ylim(-1.5, 1.5) + axes[1].legend() + axes[1].grid(True, alpha=0.3) + + + # 4. 组合净值 + if len(portfolio_value) > 0: + axes[3].plot(portfolio_value.index, portfolio_value, label='组合净值', linewidth=2, color='darkblue') + # 标记初始资金线 + axes[3].axhline(y=initial_cash, color='red', linestyle='--', alpha=0.7, label=f'初始资金({initial_cash})') + axes[3].set_title('基于价格比率的配对交易组合净值') + axes[3].set_ylabel('组合价值') + axes[3].set_xlabel('日期') + axes[3].legend() + axes[3].grid(True, alpha=0.3) + + plt.tight_layout() + plt.show() + + # ========== 使用vectorbt进行专业分析 ========== + print("\n=== VectorBT 专业分析(基于价格比率) ===") + + # 1. 组合详细分析 + try: + fig = portfolio.plot(subplots=[ + 'orders', # 订单 + 'trade_pnl', # 交易盈亏 + 'cum_returns', # 累积收益 + 'drawdowns' # 回撤 + ]) + fig.update_layout( + title='基于价格比率的配对交易详细分析', + height=800 + ) + fig.show() + except Exception as e: + print(f"详细分析绘制失败: {e}") + + # 2. 组合价值变化 + try: + fig = portfolio_value.vbt.plot( + title='基于价格比率的配对交易组合价值' + ) + fig.update_layout( + xaxis_title='日期', + yaxis_title='组合价值' + ) + fig.show() + except Exception as e: + print(f"组合价值变化绘制失败: {e}") + + # 3. 累积收益 + try: + cumulative_returns = portfolio.cumulative_returns() + fig = cumulative_returns.vbt.plot( + title='基于价格比率的配对交易累积收益率' + ) + fig.update_layout( + xaxis_title='日期', + yaxis_title='累积收益' + ) + fig.show() + except Exception as e: + print(f"累积收益绘制失败: {e}") + + # 4. 回撤分析 + try: + drawdown = portfolio.drawdown() + fig = drawdown.vbt.plot( + title='基于价格比率的配对交易回撤分析' + ) + fig.update_layout( + xaxis_title='日期', + yaxis_title='回撤' + ) + fig.show() + except Exception as e: + print(f"回撤分析绘制失败: {e}") + + # 5. 交易分析 + try: + trades = portfolio.trades + if len(trades) > 0: + fig = trades.plot_pnl() + fig.update_layout(title='基于价格比率的交易盈亏分布') + fig.show() + + fig = trades.plot_duration() + fig.update_layout(title='基于价格比率的交易持续时间分布') + fig.show() + except Exception as e: + print(f"交易分析绘制失败: {e}") + + # ========== 打印详细统计 ========== + print("\n=== 详细统计 ===") + try: + stats = portfolio.stats() + + def safe_get_stat(stat_dict, key, default="N/A"): + value = stat_dict.get(key, default) + if hasattr(value, 'iloc'): + return value.iloc[0] if len(value) == 1 else value + return value + + print(f"开始日期: {safe_get_stat(stats, 'Start')}") + print(f"结束日期: {safe_get_stat(stats, 'End')}") + print(f"期间: {safe_get_stat(stats, 'Period')}") + print(f"总收益率: {safe_get_stat(stats, 'Total Return [%]', 'N/A')}%") + print(f"年化收益率: {safe_get_stat(stats, 'Annual Return [%]', 'N/A')}%") + print(f"年化波动率: {safe_get_stat(stats, 'Annual Volatility [%]', 'N/A')}%") + print(f"夏普比率: {safe_get_stat(stats, 'Sharpe Ratio', 'N/A')}") + print(f"最大回撤: {safe_get_stat(stats, 'Max Drawdown [%]', 'N/A')}%") + print(f"总交易次数: {safe_get_stat(stats, 'Total Trades', 'N/A')}") + print(f"胜率: {safe_get_stat(stats, 'Win Rate [%]', 'N/A')}%") + print(f"盈亏比: {safe_get_stat(stats, 'Profit Factor', 'N/A')}") + + except Exception as e: + print(f"获取详细统计时出错: {e}") + + # 分析每笔交易 + try: + trades_df = portfolio.trades.records_readable + if len(trades_df) > 0: + print(f"\n交易分析:") + print(f"总交易次数: {len(trades_df)}") + if 'Duration' in trades_df.columns: + print(f"平均持仓时间: {trades_df['Duration'].mean():.1f} 天") + if 'PnL' in trades_df.columns: + print(f"最大单笔盈利: {trades_df['PnL'].max():.2f}") + print(f"最大单笔亏损: {trades_df['PnL'].min():.2f}") + winning_trades = trades_df[trades_df['PnL'] > 0] + losing_trades = trades_df[trades_df['PnL'] < 0] + if len(winning_trades) > 0: + print(f"平均盈利: {winning_trades['PnL'].mean():.2f}") + if len(losing_trades) > 0: + print(f"平均亏损: {losing_trades['PnL'].mean():.2f}") + except Exception as e: + print(f"分析交易时出错: {e}") + + # ========== 比率数据统计摘要 ========== + print("\n=== 价格比率统计摘要 ===") + print(f"数据期间: {price_ratio.index.min()} 到 {price_ratio.index.max()}") + print(f"数据点数: {len(price_ratio)}") + print(f"比率均值: {price_ratio.mean():.4f}") + print(f"比率标准差: {price_ratio.std():.4f}") + print(f"比率变异系数: {price_ratio.std()/price_ratio.mean():.4f}") + + # 计算交易信号统计 + long_count = (signals == 1).sum() + short_count = (signals == -1).sum() + total_signals = long_count + short_count + print(f"\n交易信号统计:") + print(f"做多信号次数: {long_count}") + print(f"做空信号次数: {short_count}") + print(f"总信号次数: {total_signals}") + +except Exception as e: + print(f"创建投资组合时出错: {e}") + import traceback + traceback.print_exc() + +print("程序执行完成!") \ No newline at end of file diff --git a/OneCurvePairTrading2.py b/OneCurvePairTrading2.py new file mode 100644 index 0000000..5adca07 --- /dev/null +++ b/OneCurvePairTrading2.py @@ -0,0 +1,409 @@ +import vectorbt as vbt +import akshare as ak +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +import datetime +from numba import njit +from collections import namedtuple + +# 设置中文显示 +plt.rcParams['font.sans-serif'] = ['SimHei'] +plt.rcParams['axes.unicode_minus'] = False + +# 获取股票数据 +print("正在获取股票数据...") +stock_00981 = ak.stock_hk_daily(symbol="00981") +stock_01347 = ak.stock_hk_daily(symbol="01347") + +print("中芯国际数据列名:", stock_00981.columns.tolist()) +print("华虹半导体数据列名:", stock_01347.columns.tolist()) + +# 数据预处理 +def preprocess_data(df, symbol): + """预处理股票数据""" + df = df.copy() + # 检查列名并重命名(如果需要) + if 'date' in df.columns: + df['date'] = pd.to_datetime(df['date']) + df.set_index('date', inplace=True) + elif '日期' in df.columns: + df['date'] = pd.to_datetime(df['日期']) + df.set_index('date', inplace=True) + # 重命名中文列为英文 + rename_dict = { + '开盘': 'open', + '最高': 'high', + '最低': 'low', + '收盘': 'close', + '成交量': 'volume' + } + df = df.rename(columns=rename_dict) + else: + # 如果已经有英文列名,直接使用 + df.index = pd.to_datetime(df.index) + + df = df.sort_index() + return df[['open', 'high', 'low', 'close', 'volume']] + +# 预处理数据 +smic_data = preprocess_data(stock_00981, "00981") +hhic_data = preprocess_data(stock_01347, "01347") + +print(f"中芯国际原始数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") +print(f"华虹半导体原始数据时间范围: {hhic_data.index.min()} 到 {hhic_data.index.max()}") + +# 限制为最近半年数据 +end_date = smic_data.index.max() +start_date = end_date - pd.Timedelta(days=360) # 最近半年 + +print(f"\n限制回测时间范围: {start_date} 到 {end_date}") + +smic_data = smic_data.loc[start_date:end_date] +hhic_data = hhic_data.loc[start_date:end_date] + +print(f"限制后中芯国际数据形状: {smic_data.shape}") +print(f"限制后华虹半导体数据形状: {hhic_data.shape}") + +# 对齐数据时间范围 +common_index = smic_data.index.intersection(hhic_data.index) +smic_data = smic_data.loc[common_index] +hhic_data = hhic_data.loc[common_index] + +print(f"\n对齐后数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") +print(f"对齐后数据点数: {len(smic_data)}") + +# ========== 创建价格比率作为独立资产 ========== +print("\n=== 创建价格比率作为独立资产 ===") +close_smic = smic_data['close'] +close_hhic = hhic_data['close'] + +# 计算价格比率 - 作为独立的"股票" +price_ratio = close_smic / close_hhic +price_ratio.name = 'SMIC_HHIC_RATIO' + +print(f"价格比率数据形状: {price_ratio.shape}") +print(f"价格比率统计:") +print(f" 均值: {price_ratio.mean():.4f}") +print(f" 标准差: {price_ratio.std():.4f}") +print(f" 最小值: {price_ratio.min():.4f}") +print(f" 最大值: {price_ratio.max():.4f}") + +# 设置交易参数 +initial_cash = 100000 +commission = 0.001 # 0.1% 交易佣金 +position_size = 0.5 # 每次交易仓位比例(现在只交易一个"资产") + +# ========== 基于比率数据计算信号 ========== +def calculate_ratio_signals(ratio_series, window=20, num_std=2): + """ + 基于价格比率计算配对交易信号 + """ + # 计算布林带 + ratio_ma = ratio_series.rolling(window=window).mean() + ratio_std = ratio_series.rolling(window=window).std() + + upper_band = ratio_ma + num_std * ratio_std + lower_band = ratio_ma - num_std * ratio_std + + # 生成交易信号 + # 1: 做多价差 (买中芯/卖华虹) -> 买入比率 + # -1: 做空价差 (卖中芯/买华虹) -> 卖空比率 + # 0: 平仓 + signals = pd.Series(0, index=ratio_series.index, name='signal') + + # 当比率突破下轨时做多价差 -> 买入比率 + long_condition = (ratio_series < lower_band) & (ratio_ma.notna()) + signals[long_condition] = 1 + + # 当比率突破上轨时做空价差 -> 卖空比率 + short_condition = (ratio_series > upper_band) & (ratio_ma.notna()) + signals[short_condition] = -1 + + # 当比率回归均值时平仓 + close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0) + signals[close_condition] = 0 + + return signals, ratio_ma, upper_band, lower_band + +# 计算信号 +signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals( + price_ratio, window=20, num_std=2 +) + +print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}") + +# ========== 基于信号生成size数据(现在只针对比率) ========== +def generate_ratio_size(signals, position_size=0.5): + """ + 生成比率交易的size数据 + 返回一个与price_ratio相同形状的Series,包含交易数量 + """ + # 创建与price_ratio相同形状的size Series,初始为0 + size_series = pd.Series(0, index=signals.index, name='size') + current_position = 0 + + for i in range(len(signals)): + if i < 20: # 跳过布林带计算期 + continue + + date = signals.index[i] + signal = signals.iloc[i] + + if signal == 1 and current_position != 1: # 做多价差 -> 买入比率 + # 计算头寸规模(基于比率价值) + # 这里我们假设比率的"价格"就是其实际值 + ratio_value = price_ratio.iloc[i] + # 买入相当于做多中芯/做空华虹 + size_series.iloc[i] = position_size # 正数表示买入比率 + current_position = 1 + + elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率 + # 卖空相当于做空中芯/做多华虹 + size_series.iloc[i] = -position_size # 负数表示卖空比率 + current_position = -1 + + elif signal == 0 and current_position != 0: # 平仓 + size_series.iloc[i] = 0 # 平仓 + current_position = 0 + + return size_series + +# 生成size数据 +size = generate_ratio_size(signals, position_size) + +print(f"size数据形状: {size.shape}") +print(f"非零交易数量: {(size != 0).sum()}") + +# ========== 创建投资组合(只基于比率) ========== +print("创建基于价格比率的投资组合...") +try: + # 将price_ratio转换为DataFrame(vectorbt需要) + ratio_close = pd.DataFrame({'RATIO': price_ratio}) + + portfolio = vbt.Portfolio.from_orders( + close=ratio_close, # 只传入比率数据 + size=size, # 基于比率的交易信号 + init_cash=initial_cash, + fees=commission, + freq='D' + ) + + print("基于价格比率的投资组合创建成功!") + + # 计算配对交易统计 + print("\n=== 基于价格比率的配对交易策略表现 ===") + + # 获取组合总价值 + portfolio_value = portfolio.value() + + + + # ========== 使用vectorbt进行专业分析 ========== + print("\n=== VectorBT 专业分析(基于价格比率) ===") + + # 修复:使用正确的列选择方法 + try: + # 选择第一列(也是唯一的一列) + portfolio_single = portfolio['RATIO'] + + fig = portfolio_single.plot(subplots=[ + 'orders', # 订单 + 'trade_pnl', # 交易盈亏 + 'cum_returns', # 累积收益 + 'drawdowns' # 回撤 + ]) + fig.update_layout( + title='基于价格比率的配对交易详细分析', + height=800 + ) + fig.show() + except Exception as e: + print(f"详细分析绘制失败: {e}") + + # 2. 组合价值变化 + try: + fig = portfolio_value.vbt.plot( + title='基于价格比率的配对交易组合价值' + ) + fig.update_layout( + xaxis_title='日期', + yaxis_title='组合价值' + ) + fig.show() + except Exception as e: + print(f"组合价值变化绘制失败: {e}") + + # 3. 累积收益 + try: + cumulative_returns = portfolio.cumulative_returns() + fig = cumulative_returns.vbt.plot( + title='基于价格比率的配对交易累积收益率' + ) + fig.update_layout( + xaxis_title='日期', + yaxis_title='累积收益' + ) + fig.show() + except Exception as e: + print(f"累积收益绘制失败: {e}") + + # 4. 回撤分析 + try: + drawdown = portfolio.drawdown() + fig = drawdown.vbt.plot( + title='基于价格比率的配对交易回撤分析' + ) + fig.update_layout( + xaxis_title='日期', + yaxis_title='回撤' + ) + fig.show() + except Exception as e: + print(f"回撤分析绘制失败: {e}") + + # 5. 交易分析 - 修复:使用正确的列选择 + try: + trades = portfolio['RATIO'].trades + if len(trades) > 0: + fig = trades.plot_pnl() + fig.update_layout(title='基于价格比率的交易盈亏分布') + fig.show() + + fig = trades.plot_duration() + fig.update_layout(title='基于价格比率的交易持续时间分布') + fig.show() + except Exception as e: + print(f"交易分析绘制失败: {e}") + + # 6. 订单流分析 - 修复:使用正确的列选择 + try: + fig = portfolio['RATIO'].orders.plot() + fig.update_layout(title='基于价格比率的订单流分析') + fig.show() + except Exception as e: + print(f"订单流分析绘制失败: {e}") + + # ========== 绘制基于比率的技术分析图表 ========== + print("\n绘制基于比率的技术分析图表...") + + # 创建详细的技术分析图表 + fig, axes = plt.subplots(4, 1, figsize=(15, 16)) + + # 1. 价格比率和布林带 + 交易信号 + axes[0].plot(price_ratio.index, price_ratio, label='价格比率(中芯/华虹)', linewidth=1.5, color='blue') + axes[0].plot(price_ratio.index, ratio_ma, label='移动平均', linewidth=1, alpha=0.7, color='orange') + axes[0].plot(price_ratio.index, upper_band, label='上轨', linewidth=1, alpha=0.7, linestyle='--', color='red') + axes[0].plot(price_ratio.index, lower_band, label='下轨', linewidth=1, alpha=0.7, linestyle='--', color='green') + axes[0].set_title('中芯国际-华虹半导体价格比率 (配对交易标的)') + axes[0].set_ylabel('价格比率') + axes[0].legend() + axes[0].grid(True, alpha=0.3) + + # 标记交易信号 + long_signals = signals[signals == 1] + short_signals = signals[signals == -1] + + if len(long_signals) > 0: + axes[0].scatter(long_signals.index, price_ratio[long_signals.index], + color='green', marker='^', s=80, label='买入比率(做多中芯/做空华虹)', zorder=5) + if len(short_signals) > 0: + axes[0].scatter(short_signals.index, price_ratio[short_signals.index], + color='red', marker='v', s=80, label='卖空比率(做空中芯/做多华虹)', zorder=5) + + # 2. 交易信号 + axes[1].plot(signals.index, signals, label='交易信号', linewidth=2, color='purple', drawstyle='steps-post') + axes[1].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='买入信号') + axes[1].axhline(y=-1, color='red', linestyle='--', alpha=0.5, label='卖出信号') + axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3) + axes[1].set_title('交易信号时序') + axes[1].set_ylabel('信号') + axes[1].set_ylim(-1.5, 1.5) + axes[1].legend() + axes[1].grid(True, alpha=0.3) + + + # 4. 组合净值 + if len(portfolio_value) > 0: + axes[3].plot(portfolio_value.index, portfolio_value, label='组合净值', linewidth=2, color='darkblue') + # 标记初始资金线 + axes[3].axhline(y=initial_cash, color='red', linestyle='--', alpha=0.7, label=f'初始资金({initial_cash})') + axes[3].set_title('基于价格比率的配对交易组合净值') + axes[3].set_ylabel('组合价值') + axes[3].set_xlabel('日期') + axes[3].legend() + axes[3].grid(True, alpha=0.3) + + plt.tight_layout() + plt.show() + + # ========== 打印详细统计 ========== + print("\n=== 详细统计 ===") + try: + # 使用单列统计 + stats = portfolio['RATIO'].stats() + + def safe_get_stat(stat_dict, key, default="N/A"): + value = stat_dict.get(key, default) + if hasattr(value, 'iloc'): + return value.iloc[0] if len(value) == 1 else value + return value + + print(f"开始日期: {safe_get_stat(stats, 'Start')}") + print(f"结束日期: {safe_get_stat(stats, 'End')}") + print(f"期间: {safe_get_stat(stats, 'Period')}") + print(f"总收益率: {safe_get_stat(stats, 'Total Return [%]', 'N/A')}%") + print(f"年化收益率: {safe_get_stat(stats, 'Annual Return [%]', 'N/A')}%") + print(f"年化波动率: {safe_get_stat(stats, 'Annual Volatility [%]', 'N/A')}%") + print(f"夏普比率: {safe_get_stat(stats, 'Sharpe Ratio', 'N/A')}") + print(f"最大回撤: {safe_get_stat(stats, 'Max Drawdown [%]', 'N/A')}%") + print(f"总交易次数: {safe_get_stat(stats, 'Total Trades', 'N/A')}") + print(f"胜率: {safe_get_stat(stats, 'Win Rate [%]', 'N/A')}%") + print(f"盈亏比: {safe_get_stat(stats, 'Profit Factor', 'N/A')}") + + except Exception as e: + print(f"获取详细统计时出错: {e}") + + # 分析每笔交易 + try: + trades_df = portfolio['RATIO'].trades.records_readable + if len(trades_df) > 0: + print(f"\n交易分析:") + print(f"总交易次数: {len(trades_df)}") + if 'Duration' in trades_df.columns: + print(f"平均持仓时间: {trades_df['Duration'].mean():.1f} 天") + if 'PnL' in trades_df.columns: + print(f"最大单笔盈利: {trades_df['PnL'].max():.2f}") + print(f"最大单笔亏损: {trades_df['PnL'].min():.2f}") + winning_trades = trades_df[trades_df['PnL'] > 0] + losing_trades = trades_df[trades_df['PnL'] < 0] + if len(winning_trades) > 0: + print(f"平均盈利: {winning_trades['PnL'].mean():.2f}") + if len(losing_trades) > 0: + print(f"平均亏损: {losing_trades['PnL'].mean():.2f}") + except Exception as e: + print(f"分析交易时出错: {e}") + + # ========== 比率数据统计摘要 ========== + print("\n=== 价格比率统计摘要 ===") + print(f"数据期间: {price_ratio.index.min()} 到 {price_ratio.index.max()}") + print(f"数据点数: {len(price_ratio)}") + print(f"比率均值: {price_ratio.mean():.4f}") + print(f"比率标准差: {price_ratio.std():.4f}") + print(f"比率变异系数: {price_ratio.std()/price_ratio.mean():.4f}") + + # 计算交易信号统计 + long_count = (signals == 1).sum() + short_count = (signals == -1).sum() + total_signals = long_count + short_count + print(f"\n交易信号统计:") + print(f"做多信号次数: {long_count}") + print(f"做空信号次数: {short_count}") + print(f"总信号次数: {total_signals}") + +except Exception as e: + print(f"创建投资组合时出错: {e}") + import traceback + traceback.print_exc() + +print("程序执行完成!") \ No newline at end of file