Files
quant/OneCurvePairTrading2.py
2025-11-01 19:44:01 +08:00

410 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import vectorbt as vbt
import akshare as ak
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime
from numba import njit
from collections import namedtuple
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 获取股票数据
print("正在获取股票数据...")
stock_00981 = ak.stock_hk_daily(symbol="00981")
stock_01347 = ak.stock_hk_daily(symbol="01347")
print("中芯国际数据列名:", stock_00981.columns.tolist())
print("华虹半导体数据列名:", stock_01347.columns.tolist())
# 数据预处理
def preprocess_data(df, symbol):
"""预处理股票数据"""
df = df.copy()
# 检查列名并重命名(如果需要)
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
elif '日期' in df.columns:
df['date'] = pd.to_datetime(df['日期'])
df.set_index('date', inplace=True)
# 重命名中文列为英文
rename_dict = {
'开盘': 'open',
'最高': 'high',
'最低': 'low',
'收盘': 'close',
'成交量': 'volume'
}
df = df.rename(columns=rename_dict)
else:
# 如果已经有英文列名,直接使用
df.index = pd.to_datetime(df.index)
df = df.sort_index()
return df[['open', 'high', 'low', 'close', 'volume']]
# 预处理数据
smic_data = preprocess_data(stock_00981, "00981")
hhic_data = preprocess_data(stock_01347, "01347")
print(f"中芯国际原始数据时间范围: {smic_data.index.min()}{smic_data.index.max()}")
print(f"华虹半导体原始数据时间范围: {hhic_data.index.min()}{hhic_data.index.max()}")
# 限制为最近半年数据
end_date = smic_data.index.max()
start_date = end_date - pd.Timedelta(days=360) # 最近半年
print(f"\n限制回测时间范围: {start_date}{end_date}")
smic_data = smic_data.loc[start_date:end_date]
hhic_data = hhic_data.loc[start_date:end_date]
print(f"限制后中芯国际数据形状: {smic_data.shape}")
print(f"限制后华虹半导体数据形状: {hhic_data.shape}")
# 对齐数据时间范围
common_index = smic_data.index.intersection(hhic_data.index)
smic_data = smic_data.loc[common_index]
hhic_data = hhic_data.loc[common_index]
print(f"\n对齐后数据时间范围: {smic_data.index.min()}{smic_data.index.max()}")
print(f"对齐后数据点数: {len(smic_data)}")
# ========== 创建价格比率作为独立资产 ==========
print("\n=== 创建价格比率作为独立资产 ===")
close_smic = smic_data['close']
close_hhic = hhic_data['close']
# 计算价格比率 - 作为独立的"股票"
price_ratio = close_smic / close_hhic
price_ratio.name = 'SMIC_HHIC_RATIO'
print(f"价格比率数据形状: {price_ratio.shape}")
print(f"价格比率统计:")
print(f" 均值: {price_ratio.mean():.4f}")
print(f" 标准差: {price_ratio.std():.4f}")
print(f" 最小值: {price_ratio.min():.4f}")
print(f" 最大值: {price_ratio.max():.4f}")
# 设置交易参数
initial_cash = 100000
commission = 0.001 # 0.1% 交易佣金
position_size = 0.5 # 每次交易仓位比例(现在只交易一个"资产"
# ========== 基于比率数据计算信号 ==========
def calculate_ratio_signals(ratio_series, window=20, num_std=2):
"""
基于价格比率计算配对交易信号
"""
# 计算布林带
ratio_ma = ratio_series.rolling(window=window).mean()
ratio_std = ratio_series.rolling(window=window).std()
upper_band = ratio_ma + num_std * ratio_std
lower_band = ratio_ma - num_std * ratio_std
# 生成交易信号
# 1: 做多价差 (买中芯/卖华虹) -> 买入比率
# -1: 做空价差 (卖中芯/买华虹) -> 卖空比率
# 0: 平仓
signals = pd.Series(0, index=ratio_series.index, name='signal')
# 当比率突破下轨时做多价差 -> 买入比率
long_condition = (ratio_series < lower_band) & (ratio_ma.notna())
signals[long_condition] = 1
# 当比率突破上轨时做空价差 -> 卖空比率
short_condition = (ratio_series > upper_band) & (ratio_ma.notna())
signals[short_condition] = -1
# 当比率回归均值时平仓
close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0)
signals[close_condition] = 0
return signals, ratio_ma, upper_band, lower_band
# 计算信号
signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals(
price_ratio, window=20, num_std=2
)
print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}")
# ========== 基于信号生成size数据现在只针对比率 ==========
def generate_ratio_size(signals, position_size=0.5):
"""
生成比率交易的size数据
返回一个与price_ratio相同形状的Series包含交易数量
"""
# 创建与price_ratio相同形状的size Series初始为0
size_series = pd.Series(0, index=signals.index, name='size')
current_position = 0
for i in range(len(signals)):
if i < 20: # 跳过布林带计算期
continue
date = signals.index[i]
signal = signals.iloc[i]
if signal == 1 and current_position != 1: # 做多价差 -> 买入比率
# 计算头寸规模(基于比率价值)
# 这里我们假设比率的"价格"就是其实际值
ratio_value = price_ratio.iloc[i]
# 买入相当于做多中芯/做空华虹
size_series.iloc[i] = position_size # 正数表示买入比率
current_position = 1
elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率
# 卖空相当于做空中芯/做多华虹
size_series.iloc[i] = -position_size # 负数表示卖空比率
current_position = -1
elif signal == 0 and current_position != 0: # 平仓
size_series.iloc[i] = 0 # 平仓
current_position = 0
return size_series
# 生成size数据
size = generate_ratio_size(signals, position_size)
print(f"size数据形状: {size.shape}")
print(f"非零交易数量: {(size != 0).sum()}")
# ========== 创建投资组合(只基于比率) ==========
print("创建基于价格比率的投资组合...")
try:
# 将price_ratio转换为DataFramevectorbt需要
ratio_close = pd.DataFrame({'RATIO': price_ratio})
portfolio = vbt.Portfolio.from_orders(
close=ratio_close, # 只传入比率数据
size=size, # 基于比率的交易信号
init_cash=initial_cash,
fees=commission,
freq='D'
)
print("基于价格比率的投资组合创建成功!")
# 计算配对交易统计
print("\n=== 基于价格比率的配对交易策略表现 ===")
# 获取组合总价值
portfolio_value = portfolio.value()
# ========== 使用vectorbt进行专业分析 ==========
print("\n=== VectorBT 专业分析(基于价格比率) ===")
# 修复:使用正确的列选择方法
try:
# 选择第一列(也是唯一的一列)
portfolio_single = portfolio['RATIO']
print(portfolio_single.stats())
fig = portfolio_single.plot(subplots=[
'orders', # 订单
'trade_pnl', # 交易盈亏
'cum_returns', # 累积收益
'drawdowns' # 回撤
])
fig.update_layout(
title='基于价格比率的配对交易详细分析',
height=800
)
fig.show()
except Exception as e:
print(f"详细分析绘制失败: {e}")
# 2. 组合价值变化
try:
fig = portfolio_value.vbt.plot(
title='基于价格比率的配对交易组合价值'
)
fig.update_layout(
xaxis_title='日期',
yaxis_title='组合价值'
)
fig.show()
except Exception as e:
print(f"组合价值变化绘制失败: {e}")
# 3. 累积收益
try:
cumulative_returns = portfolio.cumulative_returns()
fig = cumulative_returns.vbt.plot(
title='基于价格比率的配对交易累积收益率'
)
fig.update_layout(
xaxis_title='日期',
yaxis_title='累积收益'
)
fig.show()
except Exception as e:
print(f"累积收益绘制失败: {e}")
# 4. 回撤分析
try:
drawdown = portfolio.drawdown()
fig = drawdown.vbt.plot(
title='基于价格比率的配对交易回撤分析'
)
fig.update_layout(
xaxis_title='日期',
yaxis_title='回撤'
)
fig.show()
except Exception as e:
print(f"回撤分析绘制失败: {e}")
# 5. 交易分析 - 修复:使用正确的列选择
try:
trades = portfolio['RATIO'].trades
if len(trades) > 0:
fig = trades.plot_pnl()
fig.update_layout(title='基于价格比率的交易盈亏分布')
fig.show()
fig = trades.plot_duration()
fig.update_layout(title='基于价格比率的交易持续时间分布')
fig.show()
except Exception as e:
print(f"交易分析绘制失败: {e}")
# 6. 订单流分析 - 修复:使用正确的列选择
try:
fig = portfolio['RATIO'].orders.plot()
fig.update_layout(title='基于价格比率的订单流分析')
fig.show()
except Exception as e:
print(f"订单流分析绘制失败: {e}")
# ========== 绘制基于比率的技术分析图表 ==========
print("\n绘制基于比率的技术分析图表...")
# 创建详细的技术分析图表
fig, axes = plt.subplots(4, 1, figsize=(15, 16))
# 1. 价格比率和布林带 + 交易信号
axes[0].plot(price_ratio.index, price_ratio, label='价格比率(中芯/华虹)', linewidth=1.5, color='blue')
axes[0].plot(price_ratio.index, ratio_ma, label='移动平均', linewidth=1, alpha=0.7, color='orange')
axes[0].plot(price_ratio.index, upper_band, label='上轨', linewidth=1, alpha=0.7, linestyle='--', color='red')
axes[0].plot(price_ratio.index, lower_band, label='下轨', linewidth=1, alpha=0.7, linestyle='--', color='green')
axes[0].set_title('中芯国际-华虹半导体价格比率 (配对交易标的)')
axes[0].set_ylabel('价格比率')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 标记交易信号
long_signals = signals[signals == 1]
short_signals = signals[signals == -1]
if len(long_signals) > 0:
axes[0].scatter(long_signals.index, price_ratio[long_signals.index],
color='green', marker='^', s=80, label='买入比率(做多中芯/做空华虹)', zorder=5)
if len(short_signals) > 0:
axes[0].scatter(short_signals.index, price_ratio[short_signals.index],
color='red', marker='v', s=80, label='卖空比率(做空中芯/做多华虹)', zorder=5)
# 2. 交易信号
axes[1].plot(signals.index, signals, label='交易信号', linewidth=2, color='purple', drawstyle='steps-post')
axes[1].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='买入信号')
axes[1].axhline(y=-1, color='red', linestyle='--', alpha=0.5, label='卖出信号')
axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3)
axes[1].set_title('交易信号时序')
axes[1].set_ylabel('信号')
axes[1].set_ylim(-1.5, 1.5)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 4. 组合净值
if len(portfolio_value) > 0:
axes[3].plot(portfolio_value.index, portfolio_value, label='组合净值', linewidth=2, color='darkblue')
# 标记初始资金线
axes[3].axhline(y=initial_cash, color='red', linestyle='--', alpha=0.7, label=f'初始资金({initial_cash})')
axes[3].set_title('基于价格比率的配对交易组合净值')
axes[3].set_ylabel('组合价值')
axes[3].set_xlabel('日期')
axes[3].legend()
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# ========== 打印详细统计 ==========
print("\n=== 详细统计 ===")
try:
# 使用单列统计
stats = portfolio['RATIO'].stats()
def safe_get_stat(stat_dict, key, default="N/A"):
value = stat_dict.get(key, default)
if hasattr(value, 'iloc'):
return value.iloc[0] if len(value) == 1 else value
return value
print(f"开始日期: {safe_get_stat(stats, 'Start')}")
print(f"结束日期: {safe_get_stat(stats, 'End')}")
print(f"期间: {safe_get_stat(stats, 'Period')}")
print(f"总收益率: {safe_get_stat(stats, 'Total Return [%]', 'N/A')}%")
print(f"年化收益率: {safe_get_stat(stats, 'Annual Return [%]', 'N/A')}%")
print(f"年化波动率: {safe_get_stat(stats, 'Annual Volatility [%]', 'N/A')}%")
print(f"夏普比率: {safe_get_stat(stats, 'Sharpe Ratio', 'N/A')}")
print(f"最大回撤: {safe_get_stat(stats, 'Max Drawdown [%]', 'N/A')}%")
print(f"总交易次数: {safe_get_stat(stats, 'Total Trades', 'N/A')}")
print(f"胜率: {safe_get_stat(stats, 'Win Rate [%]', 'N/A')}%")
print(f"盈亏比: {safe_get_stat(stats, 'Profit Factor', 'N/A')}")
except Exception as e:
print(f"获取详细统计时出错: {e}")
# 分析每笔交易
try:
trades_df = portfolio['RATIO'].trades.records_readable
if len(trades_df) > 0:
print(f"\n交易分析:")
print(f"总交易次数: {len(trades_df)}")
if 'Duration' in trades_df.columns:
print(f"平均持仓时间: {trades_df['Duration'].mean():.1f}")
if 'PnL' in trades_df.columns:
print(f"最大单笔盈利: {trades_df['PnL'].max():.2f}")
print(f"最大单笔亏损: {trades_df['PnL'].min():.2f}")
winning_trades = trades_df[trades_df['PnL'] > 0]
losing_trades = trades_df[trades_df['PnL'] < 0]
if len(winning_trades) > 0:
print(f"平均盈利: {winning_trades['PnL'].mean():.2f}")
if len(losing_trades) > 0:
print(f"平均亏损: {losing_trades['PnL'].mean():.2f}")
except Exception as e:
print(f"分析交易时出错: {e}")
# ========== 比率数据统计摘要 ==========
print("\n=== 价格比率统计摘要 ===")
print(f"数据期间: {price_ratio.index.min()}{price_ratio.index.max()}")
print(f"数据点数: {len(price_ratio)}")
print(f"比率均值: {price_ratio.mean():.4f}")
print(f"比率标准差: {price_ratio.std():.4f}")
print(f"比率变异系数: {price_ratio.std()/price_ratio.mean():.4f}")
# 计算交易信号统计
long_count = (signals == 1).sum()
short_count = (signals == -1).sum()
total_signals = long_count + short_count
print(f"\n交易信号统计:")
print(f"做多信号次数: {long_count}")
print(f"做空信号次数: {short_count}")
print(f"总信号次数: {total_signals}")
except Exception as e:
print(f"创建投资组合时出错: {e}")
import traceback
traceback.print_exc()
print("程序执行完成!")