import vectorbt as vbt import akshare as ak import pandas as pd import numpy as np import matplotlib.pyplot as plt import datetime from numba import njit from collections import namedtuple # 设置中文显示 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False # 获取股票数据 print("正在获取股票数据...") stock_00981 = ak.stock_hk_daily(symbol="00981") stock_01347 = ak.stock_hk_daily(symbol="01347") print("中芯国际数据列名:", stock_00981.columns.tolist()) print("华虹半导体数据列名:", stock_01347.columns.tolist()) # 数据预处理 def preprocess_data(df, symbol): """预处理股票数据""" df = df.copy() # 检查列名并重命名(如果需要) if 'date' in df.columns: df['date'] = pd.to_datetime(df['date']) df.set_index('date', inplace=True) elif '日期' in df.columns: df['date'] = pd.to_datetime(df['日期']) df.set_index('date', inplace=True) # 重命名中文列为英文 rename_dict = { '开盘': 'open', '最高': 'high', '最低': 'low', '收盘': 'close', '成交量': 'volume' } df = df.rename(columns=rename_dict) else: # 如果已经有英文列名,直接使用 df.index = pd.to_datetime(df.index) df = df.sort_index() return df[['open', 'high', 'low', 'close', 'volume']] # 预处理数据 smic_data = preprocess_data(stock_00981, "00981") hhic_data = preprocess_data(stock_01347, "01347") print(f"中芯国际原始数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") print(f"华虹半导体原始数据时间范围: {hhic_data.index.min()} 到 {hhic_data.index.max()}") # 限制为最近半年数据 end_date = smic_data.index.max() start_date = end_date - pd.Timedelta(days=360) # 最近半年 print(f"\n限制回测时间范围: {start_date} 到 {end_date}") smic_data = smic_data.loc[start_date:end_date] hhic_data = hhic_data.loc[start_date:end_date] print(f"限制后中芯国际数据形状: {smic_data.shape}") print(f"限制后华虹半导体数据形状: {hhic_data.shape}") # 对齐数据时间范围 common_index = smic_data.index.intersection(hhic_data.index) smic_data = smic_data.loc[common_index] hhic_data = hhic_data.loc[common_index] print(f"\n对齐后数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") print(f"对齐后数据点数: {len(smic_data)}") # ========== 创建价格比率作为独立资产 ========== print("\n=== 创建价格比率作为独立资产 ===") close_smic = smic_data['close'] close_hhic = hhic_data['close'] # 计算价格比率 - 作为独立的"股票" price_ratio = close_smic / close_hhic price_ratio.name = 'SMIC_HHIC_RATIO' print(f"价格比率数据形状: {price_ratio.shape}") print(f"价格比率统计:") print(f" 均值: {price_ratio.mean():.4f}") print(f" 标准差: {price_ratio.std():.4f}") print(f" 最小值: {price_ratio.min():.4f}") print(f" 最大值: {price_ratio.max():.4f}") # 设置交易参数 initial_cash = 100000 commission = 0.001 # 0.1% 交易佣金 position_size = 0.5 # 每次交易仓位比例(现在只交易一个"资产") # ========== 基于比率数据计算信号 ========== def calculate_ratio_signals(ratio_series, window=20, num_std=2): """ 基于价格比率计算配对交易信号 """ # 计算布林带 ratio_ma = ratio_series.rolling(window=window).mean() ratio_std = ratio_series.rolling(window=window).std() upper_band = ratio_ma + num_std * ratio_std lower_band = ratio_ma - num_std * ratio_std # 生成交易信号 # 1: 做多价差 (买中芯/卖华虹) -> 买入比率 # -1: 做空价差 (卖中芯/买华虹) -> 卖空比率 # 0: 平仓 signals = pd.Series(0, index=ratio_series.index, name='signal') # 当比率突破下轨时做多价差 -> 买入比率 long_condition = (ratio_series < lower_band) & (ratio_ma.notna()) signals[long_condition] = 1 # 当比率突破上轨时做空价差 -> 卖空比率 short_condition = (ratio_series > upper_band) & (ratio_ma.notna()) signals[short_condition] = -1 # 当比率回归均值时平仓 close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0) signals[close_condition] = 0 return signals, ratio_ma, upper_band, lower_band # 计算信号 signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals( price_ratio, window=20, num_std=2 ) print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}") # ========== 基于信号生成size数据(现在只针对比率) ========== def generate_ratio_size(signals, position_size=0.5): """ 生成比率交易的size数据 返回一个与price_ratio相同形状的Series,包含交易数量 """ # 创建与price_ratio相同形状的size Series,初始为0 size_series = pd.Series(0, index=signals.index, name='size') current_position = 0 for i in range(len(signals)): if i < 20: # 跳过布林带计算期 continue date = signals.index[i] signal = signals.iloc[i] if signal == 1 and current_position != 1: # 做多价差 -> 买入比率 # 计算头寸规模(基于比率价值) # 这里我们假设比率的"价格"就是其实际值 ratio_value = price_ratio.iloc[i] # 买入相当于做多中芯/做空华虹 size_series.iloc[i] = position_size # 正数表示买入比率 current_position = 1 elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率 # 卖空相当于做空中芯/做多华虹 size_series.iloc[i] = -position_size # 负数表示卖空比率 current_position = -1 elif signal == 0 and current_position != 0: # 平仓 size_series.iloc[i] = 0 # 平仓 current_position = 0 return size_series # 生成size数据 size = generate_ratio_size(signals, position_size) print(f"size数据形状: {size.shape}") print(f"非零交易数量: {(size != 0).sum()}") # ========== 创建投资组合(只基于比率) ========== print("创建基于价格比率的投资组合...") try: # 将price_ratio转换为DataFrame(vectorbt需要) ratio_close = pd.DataFrame({'RATIO': price_ratio}) portfolio = vbt.Portfolio.from_orders( close=ratio_close, # 只传入比率数据 size=size, # 基于比率的交易信号 init_cash=initial_cash, fees=commission, freq='D' ) print("基于价格比率的投资组合创建成功!") # 计算配对交易统计 print("\n=== 基于价格比率的配对交易策略表现 ===") # 获取组合总价值 portfolio_value = portfolio.value() # ========== 使用vectorbt进行专业分析 ========== print("\n=== VectorBT 专业分析(基于价格比率) ===") # 修复:使用正确的列选择方法 try: # 选择第一列(也是唯一的一列) portfolio_single = portfolio['RATIO'] print(portfolio_single.stats()) fig = portfolio_single.plot(subplots=[ 'orders', # 订单 'trade_pnl', # 交易盈亏 'cum_returns', # 累积收益 'drawdowns' # 回撤 ]) fig.update_layout( title='基于价格比率的配对交易详细分析', height=800 ) fig.show() except Exception as e: print(f"详细分析绘制失败: {e}") # ========== 绘制基于比率的技术分析图表 ========== print("\n绘制基于比率的技术分析图表...") # 创建详细的技术分析图表 fig, axes = plt.subplots(4, 1, figsize=(15, 16)) # 1. 价格比率和布林带 + 交易信号 axes[0].plot(price_ratio.index, price_ratio, label='价格比率(中芯/华虹)', linewidth=1.5, color='blue') axes[0].plot(price_ratio.index, ratio_ma, label='移动平均', linewidth=1, alpha=0.7, color='orange') axes[0].plot(price_ratio.index, upper_band, label='上轨', linewidth=1, alpha=0.7, linestyle='--', color='red') axes[0].plot(price_ratio.index, lower_band, label='下轨', linewidth=1, alpha=0.7, linestyle='--', color='green') axes[0].set_title('中芯国际-华虹半导体价格比率 (配对交易标的)') axes[0].set_ylabel('价格比率') axes[0].legend() axes[0].grid(True, alpha=0.3) # 标记交易信号 long_signals = signals[signals == 1] short_signals = signals[signals == -1] if len(long_signals) > 0: axes[0].scatter(long_signals.index, price_ratio[long_signals.index], color='green', marker='^', s=80, label='买入比率(做多中芯/做空华虹)', zorder=5) if len(short_signals) > 0: axes[0].scatter(short_signals.index, price_ratio[short_signals.index], color='red', marker='v', s=80, label='卖空比率(做空中芯/做多华虹)', zorder=5) # 2. 交易信号 axes[1].plot(signals.index, signals, label='交易信号', linewidth=2, color='purple', drawstyle='steps-post') axes[1].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='买入信号') axes[1].axhline(y=-1, color='red', linestyle='--', alpha=0.5, label='卖出信号') axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3) axes[1].set_title('交易信号时序') axes[1].set_ylabel('信号') axes[1].set_ylim(-1.5, 1.5) axes[1].legend() axes[1].grid(True, alpha=0.3) # 4. 组合净值 if len(portfolio_value) > 0: axes[3].plot(portfolio_value.index, portfolio_value, label='组合净值', linewidth=2, color='darkblue') # 标记初始资金线 axes[3].axhline(y=initial_cash, color='red', linestyle='--', alpha=0.7, label=f'初始资金({initial_cash})') axes[3].set_title('基于价格比率的配对交易组合净值') axes[3].set_ylabel('组合价值') axes[3].set_xlabel('日期') axes[3].legend() axes[3].grid(True, alpha=0.3) plt.tight_layout() plt.show() # ========== 打印详细统计 ========== print("\n=== 详细统计 ===") try: # 使用单列统计 stats = portfolio['RATIO'].stats() def safe_get_stat(stat_dict, key, default="N/A"): value = stat_dict.get(key, default) if hasattr(value, 'iloc'): return value.iloc[0] if len(value) == 1 else value return value print(f"开始日期: {safe_get_stat(stats, 'Start')}") print(f"结束日期: {safe_get_stat(stats, 'End')}") print(f"期间: {safe_get_stat(stats, 'Period')}") print(f"总收益率: {safe_get_stat(stats, 'Total Return [%]', 'N/A')}%") print(f"年化收益率: {safe_get_stat(stats, 'Annual Return [%]', 'N/A')}%") print(f"年化波动率: {safe_get_stat(stats, 'Annual Volatility [%]', 'N/A')}%") print(f"夏普比率: {safe_get_stat(stats, 'Sharpe Ratio', 'N/A')}") print(f"最大回撤: {safe_get_stat(stats, 'Max Drawdown [%]', 'N/A')}%") print(f"总交易次数: {safe_get_stat(stats, 'Total Trades', 'N/A')}") print(f"胜率: {safe_get_stat(stats, 'Win Rate [%]', 'N/A')}%") print(f"盈亏比: {safe_get_stat(stats, 'Profit Factor', 'N/A')}") except Exception as e: print(f"获取详细统计时出错: {e}") # 分析每笔交易 try: trades_df = portfolio['RATIO'].trades.records_readable if len(trades_df) > 0: print(f"\n交易分析:") print(f"总交易次数: {len(trades_df)}") if 'Duration' in trades_df.columns: print(f"平均持仓时间: {trades_df['Duration'].mean():.1f} 天") if 'PnL' in trades_df.columns: print(f"最大单笔盈利: {trades_df['PnL'].max():.2f}") print(f"最大单笔亏损: {trades_df['PnL'].min():.2f}") winning_trades = trades_df[trades_df['PnL'] > 0] losing_trades = trades_df[trades_df['PnL'] < 0] if len(winning_trades) > 0: print(f"平均盈利: {winning_trades['PnL'].mean():.2f}") if len(losing_trades) > 0: print(f"平均亏损: {losing_trades['PnL'].mean():.2f}") except Exception as e: print(f"分析交易时出错: {e}") # ========== 比率数据统计摘要 ========== print("\n=== 价格比率统计摘要 ===") print(f"数据期间: {price_ratio.index.min()} 到 {price_ratio.index.max()}") print(f"数据点数: {len(price_ratio)}") print(f"比率均值: {price_ratio.mean():.4f}") print(f"比率标准差: {price_ratio.std():.4f}") print(f"比率变异系数: {price_ratio.std()/price_ratio.mean():.4f}") # 计算交易信号统计 long_count = (signals == 1).sum() short_count = (signals == -1).sum() total_signals = long_count + short_count print(f"\n交易信号统计:") print(f"做多信号次数: {long_count}") print(f"做空信号次数: {short_count}") print(f"总信号次数: {total_signals}") except Exception as e: print(f"创建投资组合时出错: {e}") import traceback traceback.print_exc() print("程序执行完成!")