347 lines
13 KiB
Python
347 lines
13 KiB
Python
import vectorbt as vbt
|
||
import akshare as ak
|
||
import pandas as pd
|
||
import numpy as np
|
||
import matplotlib.pyplot as plt
|
||
import datetime
|
||
from numba import njit
|
||
from collections import namedtuple
|
||
|
||
# 设置中文显示
|
||
plt.rcParams['font.sans-serif'] = ['SimHei']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
# 获取股票数据
|
||
print("正在获取股票数据...")
|
||
stock_00981 = ak.stock_hk_daily(symbol="00981")
|
||
stock_01347 = ak.stock_hk_daily(symbol="01347")
|
||
|
||
print("中芯国际数据列名:", stock_00981.columns.tolist())
|
||
print("华虹半导体数据列名:", stock_01347.columns.tolist())
|
||
|
||
# 数据预处理
|
||
def preprocess_data(df, symbol):
|
||
"""预处理股票数据"""
|
||
df = df.copy()
|
||
# 检查列名并重命名(如果需要)
|
||
if 'date' in df.columns:
|
||
df['date'] = pd.to_datetime(df['date'])
|
||
df.set_index('date', inplace=True)
|
||
elif '日期' in df.columns:
|
||
df['date'] = pd.to_datetime(df['日期'])
|
||
df.set_index('date', inplace=True)
|
||
# 重命名中文列为英文
|
||
rename_dict = {
|
||
'开盘': 'open',
|
||
'最高': 'high',
|
||
'最低': 'low',
|
||
'收盘': 'close',
|
||
'成交量': 'volume'
|
||
}
|
||
df = df.rename(columns=rename_dict)
|
||
else:
|
||
# 如果已经有英文列名,直接使用
|
||
df.index = pd.to_datetime(df.index)
|
||
|
||
df = df.sort_index()
|
||
return df[['open', 'high', 'low', 'close', 'volume']]
|
||
|
||
# 预处理数据
|
||
smic_data = preprocess_data(stock_00981, "00981")
|
||
hhic_data = preprocess_data(stock_01347, "01347")
|
||
|
||
print(f"中芯国际原始数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}")
|
||
print(f"华虹半导体原始数据时间范围: {hhic_data.index.min()} 到 {hhic_data.index.max()}")
|
||
|
||
# 限制为最近半年数据
|
||
end_date = smic_data.index.max()
|
||
start_date = end_date - pd.Timedelta(days=360) # 最近半年
|
||
|
||
print(f"\n限制回测时间范围: {start_date} 到 {end_date}")
|
||
|
||
smic_data = smic_data.loc[start_date:end_date]
|
||
hhic_data = hhic_data.loc[start_date:end_date]
|
||
|
||
print(f"限制后中芯国际数据形状: {smic_data.shape}")
|
||
print(f"限制后华虹半导体数据形状: {hhic_data.shape}")
|
||
|
||
# 对齐数据时间范围
|
||
common_index = smic_data.index.intersection(hhic_data.index)
|
||
smic_data = smic_data.loc[common_index]
|
||
hhic_data = hhic_data.loc[common_index]
|
||
|
||
print(f"\n对齐后数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}")
|
||
print(f"对齐后数据点数: {len(smic_data)}")
|
||
|
||
# ========== 创建价格比率作为独立资产 ==========
|
||
print("\n=== 创建价格比率作为独立资产 ===")
|
||
close_smic = smic_data['close']
|
||
close_hhic = hhic_data['close']
|
||
|
||
# 计算价格比率 - 作为独立的"股票"
|
||
price_ratio = close_smic / close_hhic
|
||
price_ratio.name = 'SMIC_HHIC_RATIO'
|
||
|
||
print(f"价格比率数据形状: {price_ratio.shape}")
|
||
print(f"价格比率统计:")
|
||
print(f" 均值: {price_ratio.mean():.4f}")
|
||
print(f" 标准差: {price_ratio.std():.4f}")
|
||
print(f" 最小值: {price_ratio.min():.4f}")
|
||
print(f" 最大值: {price_ratio.max():.4f}")
|
||
|
||
# 设置交易参数
|
||
initial_cash = 100000
|
||
commission = 0.001 # 0.1% 交易佣金
|
||
position_size = 0.5 # 每次交易仓位比例(现在只交易一个"资产")
|
||
|
||
# ========== 基于比率数据计算信号 ==========
|
||
def calculate_ratio_signals(ratio_series, window=20, num_std=2):
|
||
"""
|
||
基于价格比率计算配对交易信号
|
||
"""
|
||
# 计算布林带
|
||
ratio_ma = ratio_series.rolling(window=window).mean()
|
||
ratio_std = ratio_series.rolling(window=window).std()
|
||
|
||
upper_band = ratio_ma + num_std * ratio_std
|
||
lower_band = ratio_ma - num_std * ratio_std
|
||
|
||
# 生成交易信号
|
||
# 1: 做多价差 (买中芯/卖华虹) -> 买入比率
|
||
# -1: 做空价差 (卖中芯/买华虹) -> 卖空比率
|
||
# 0: 平仓
|
||
signals = pd.Series(0, index=ratio_series.index, name='signal')
|
||
|
||
# 当比率突破下轨时做多价差 -> 买入比率
|
||
long_condition = (ratio_series < lower_band) & (ratio_ma.notna())
|
||
signals[long_condition] = 1
|
||
|
||
# 当比率突破上轨时做空价差 -> 卖空比率
|
||
short_condition = (ratio_series > upper_band) & (ratio_ma.notna())
|
||
signals[short_condition] = -1
|
||
|
||
# 当比率回归均值时平仓
|
||
close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0)
|
||
signals[close_condition] = 0
|
||
|
||
return signals, ratio_ma, upper_band, lower_band
|
||
|
||
# 计算信号
|
||
signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals(
|
||
price_ratio, window=20, num_std=2
|
||
)
|
||
|
||
print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}")
|
||
|
||
# ========== 基于信号生成size数据(现在只针对比率) ==========
|
||
def generate_ratio_size(signals, position_size=0.5):
|
||
"""
|
||
生成比率交易的size数据
|
||
返回一个与price_ratio相同形状的Series,包含交易数量
|
||
"""
|
||
# 创建与price_ratio相同形状的size Series,初始为0
|
||
size_series = pd.Series(0, index=signals.index, name='size')
|
||
current_position = 0
|
||
|
||
for i in range(len(signals)):
|
||
if i < 20: # 跳过布林带计算期
|
||
continue
|
||
|
||
date = signals.index[i]
|
||
signal = signals.iloc[i]
|
||
|
||
if signal == 1 and current_position != 1: # 做多价差 -> 买入比率
|
||
# 计算头寸规模(基于比率价值)
|
||
# 这里我们假设比率的"价格"就是其实际值
|
||
ratio_value = price_ratio.iloc[i]
|
||
# 买入相当于做多中芯/做空华虹
|
||
size_series.iloc[i] = position_size # 正数表示买入比率
|
||
current_position = 1
|
||
|
||
elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率
|
||
# 卖空相当于做空中芯/做多华虹
|
||
size_series.iloc[i] = -position_size # 负数表示卖空比率
|
||
current_position = -1
|
||
|
||
elif signal == 0 and current_position != 0: # 平仓
|
||
size_series.iloc[i] = 0 # 平仓
|
||
current_position = 0
|
||
|
||
return size_series
|
||
|
||
# 生成size数据
|
||
size = generate_ratio_size(signals, position_size)
|
||
|
||
print(f"size数据形状: {size.shape}")
|
||
print(f"非零交易数量: {(size != 0).sum()}")
|
||
|
||
# ========== 创建投资组合(只基于比率) ==========
|
||
print("创建基于价格比率的投资组合...")
|
||
try:
|
||
# 将price_ratio转换为DataFrame(vectorbt需要)
|
||
ratio_close = pd.DataFrame({'RATIO': price_ratio})
|
||
|
||
portfolio = vbt.Portfolio.from_orders(
|
||
close=ratio_close, # 只传入比率数据
|
||
size=size, # 基于比率的交易信号
|
||
init_cash=initial_cash,
|
||
fees=commission,
|
||
freq='D'
|
||
)
|
||
|
||
print("基于价格比率的投资组合创建成功!")
|
||
|
||
# 计算配对交易统计
|
||
print("\n=== 基于价格比率的配对交易策略表现 ===")
|
||
|
||
# 获取组合总价值
|
||
portfolio_value = portfolio.value()
|
||
|
||
|
||
# ========== 使用vectorbt进行专业分析 ==========
|
||
print("\n=== VectorBT 专业分析(基于价格比率) ===")
|
||
|
||
# 修复:使用正确的列选择方法
|
||
try:
|
||
# 选择第一列(也是唯一的一列)
|
||
portfolio_single = portfolio['RATIO']
|
||
|
||
print(portfolio_single.stats())
|
||
|
||
fig = portfolio_single.plot(subplots=[
|
||
'orders', # 订单
|
||
'trade_pnl', # 交易盈亏
|
||
'cum_returns', # 累积收益
|
||
'drawdowns' # 回撤
|
||
])
|
||
fig.update_layout(
|
||
title='基于价格比率的配对交易详细分析',
|
||
height=800
|
||
)
|
||
fig.show()
|
||
except Exception as e:
|
||
print(f"详细分析绘制失败: {e}")
|
||
|
||
# ========== 绘制基于比率的技术分析图表 ==========
|
||
print("\n绘制基于比率的技术分析图表...")
|
||
|
||
# 创建详细的技术分析图表
|
||
fig, axes = plt.subplots(4, 1, figsize=(15, 16))
|
||
|
||
# 1. 价格比率和布林带 + 交易信号
|
||
axes[0].plot(price_ratio.index, price_ratio, label='价格比率(中芯/华虹)', linewidth=1.5, color='blue')
|
||
axes[0].plot(price_ratio.index, ratio_ma, label='移动平均', linewidth=1, alpha=0.7, color='orange')
|
||
axes[0].plot(price_ratio.index, upper_band, label='上轨', linewidth=1, alpha=0.7, linestyle='--', color='red')
|
||
axes[0].plot(price_ratio.index, lower_band, label='下轨', linewidth=1, alpha=0.7, linestyle='--', color='green')
|
||
axes[0].set_title('中芯国际-华虹半导体价格比率 (配对交易标的)')
|
||
axes[0].set_ylabel('价格比率')
|
||
axes[0].legend()
|
||
axes[0].grid(True, alpha=0.3)
|
||
|
||
# 标记交易信号
|
||
long_signals = signals[signals == 1]
|
||
short_signals = signals[signals == -1]
|
||
|
||
if len(long_signals) > 0:
|
||
axes[0].scatter(long_signals.index, price_ratio[long_signals.index],
|
||
color='green', marker='^', s=80, label='买入比率(做多中芯/做空华虹)', zorder=5)
|
||
if len(short_signals) > 0:
|
||
axes[0].scatter(short_signals.index, price_ratio[short_signals.index],
|
||
color='red', marker='v', s=80, label='卖空比率(做空中芯/做多华虹)', zorder=5)
|
||
|
||
# 2. 交易信号
|
||
axes[1].plot(signals.index, signals, label='交易信号', linewidth=2, color='purple', drawstyle='steps-post')
|
||
axes[1].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='买入信号')
|
||
axes[1].axhline(y=-1, color='red', linestyle='--', alpha=0.5, label='卖出信号')
|
||
axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3)
|
||
axes[1].set_title('交易信号时序')
|
||
axes[1].set_ylabel('信号')
|
||
axes[1].set_ylim(-1.5, 1.5)
|
||
axes[1].legend()
|
||
axes[1].grid(True, alpha=0.3)
|
||
|
||
|
||
# 4. 组合净值
|
||
if len(portfolio_value) > 0:
|
||
axes[3].plot(portfolio_value.index, portfolio_value, label='组合净值', linewidth=2, color='darkblue')
|
||
# 标记初始资金线
|
||
axes[3].axhline(y=initial_cash, color='red', linestyle='--', alpha=0.7, label=f'初始资金({initial_cash})')
|
||
axes[3].set_title('基于价格比率的配对交易组合净值')
|
||
axes[3].set_ylabel('组合价值')
|
||
axes[3].set_xlabel('日期')
|
||
axes[3].legend()
|
||
axes[3].grid(True, alpha=0.3)
|
||
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
# ========== 打印详细统计 ==========
|
||
print("\n=== 详细统计 ===")
|
||
try:
|
||
# 使用单列统计
|
||
stats = portfolio['RATIO'].stats()
|
||
|
||
def safe_get_stat(stat_dict, key, default="N/A"):
|
||
value = stat_dict.get(key, default)
|
||
if hasattr(value, 'iloc'):
|
||
return value.iloc[0] if len(value) == 1 else value
|
||
return value
|
||
|
||
print(f"开始日期: {safe_get_stat(stats, 'Start')}")
|
||
print(f"结束日期: {safe_get_stat(stats, 'End')}")
|
||
print(f"期间: {safe_get_stat(stats, 'Period')}")
|
||
print(f"总收益率: {safe_get_stat(stats, 'Total Return [%]', 'N/A')}%")
|
||
print(f"年化收益率: {safe_get_stat(stats, 'Annual Return [%]', 'N/A')}%")
|
||
print(f"年化波动率: {safe_get_stat(stats, 'Annual Volatility [%]', 'N/A')}%")
|
||
print(f"夏普比率: {safe_get_stat(stats, 'Sharpe Ratio', 'N/A')}")
|
||
print(f"最大回撤: {safe_get_stat(stats, 'Max Drawdown [%]', 'N/A')}%")
|
||
print(f"总交易次数: {safe_get_stat(stats, 'Total Trades', 'N/A')}")
|
||
print(f"胜率: {safe_get_stat(stats, 'Win Rate [%]', 'N/A')}%")
|
||
print(f"盈亏比: {safe_get_stat(stats, 'Profit Factor', 'N/A')}")
|
||
|
||
except Exception as e:
|
||
print(f"获取详细统计时出错: {e}")
|
||
|
||
# 分析每笔交易
|
||
try:
|
||
trades_df = portfolio['RATIO'].trades.records_readable
|
||
if len(trades_df) > 0:
|
||
print(f"\n交易分析:")
|
||
print(f"总交易次数: {len(trades_df)}")
|
||
if 'Duration' in trades_df.columns:
|
||
print(f"平均持仓时间: {trades_df['Duration'].mean():.1f} 天")
|
||
if 'PnL' in trades_df.columns:
|
||
print(f"最大单笔盈利: {trades_df['PnL'].max():.2f}")
|
||
print(f"最大单笔亏损: {trades_df['PnL'].min():.2f}")
|
||
winning_trades = trades_df[trades_df['PnL'] > 0]
|
||
losing_trades = trades_df[trades_df['PnL'] < 0]
|
||
if len(winning_trades) > 0:
|
||
print(f"平均盈利: {winning_trades['PnL'].mean():.2f}")
|
||
if len(losing_trades) > 0:
|
||
print(f"平均亏损: {losing_trades['PnL'].mean():.2f}")
|
||
except Exception as e:
|
||
print(f"分析交易时出错: {e}")
|
||
|
||
# ========== 比率数据统计摘要 ==========
|
||
print("\n=== 价格比率统计摘要 ===")
|
||
print(f"数据期间: {price_ratio.index.min()} 到 {price_ratio.index.max()}")
|
||
print(f"数据点数: {len(price_ratio)}")
|
||
print(f"比率均值: {price_ratio.mean():.4f}")
|
||
print(f"比率标准差: {price_ratio.std():.4f}")
|
||
print(f"比率变异系数: {price_ratio.std()/price_ratio.mean():.4f}")
|
||
|
||
# 计算交易信号统计
|
||
long_count = (signals == 1).sum()
|
||
short_count = (signals == -1).sum()
|
||
total_signals = long_count + short_count
|
||
print(f"\n交易信号统计:")
|
||
print(f"做多信号次数: {long_count}")
|
||
print(f"做空信号次数: {short_count}")
|
||
print(f"总信号次数: {total_signals}")
|
||
|
||
except Exception as e:
|
||
print(f"创建投资组合时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
print("程序执行完成!") |