数据获取,策略执行,结果查看拆分

This commit is contained in:
2025-11-01 22:49:53 +08:00
parent 6d8974b828
commit 77fbebe231
4 changed files with 428 additions and 1 deletions

3
.gitignore vendored
View File

@ -1 +1,2 @@
**/*.csv **/*.csv
__pycache__/

79
gogogo/data_fetcher.py Normal file
View File

@ -0,0 +1,79 @@
# data_fetcher.py
import akshare as ak
import pandas as pd
import datetime
def fetch_stock_data():
"""获取股票数据"""
print("正在获取股票数据...")
stock_00981 = ak.stock_hk_daily(symbol="00981")
stock_01347 = ak.stock_hk_daily(symbol="01347")
print("中芯国际数据列名:", stock_00981.columns.tolist())
print("华虹半导体数据列名:", stock_01347.columns.tolist())
return stock_00981, stock_01347
def preprocess_data(df, symbol):
"""预处理股票数据"""
df = df.copy()
# 检查列名并重命名(如果需要)
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
elif '日期' in df.columns:
df['date'] = pd.to_datetime(df['日期'])
df.set_index('date', inplace=True)
# 重命名中文列为英文
rename_dict = {
'开盘': 'open',
'最高': 'high',
'最低': 'low',
'收盘': 'close',
'成交量': 'volume'
}
df = df.rename(columns=rename_dict)
else:
# 如果已经有英文列名,直接使用
df.index = pd.to_datetime(df.index)
df = df.sort_index()
return df[['open', 'high', 'low', 'close', 'volume']]
def get_processed_data():
"""获取并处理数据"""
# 获取原始数据
stock_00981, stock_01347 = fetch_stock_data()
# 预处理数据
smic_data = preprocess_data(stock_00981, "00981")
hhic_data = preprocess_data(stock_01347, "01347")
print(f"中芯国际原始数据时间范围: {smic_data.index.min()}{smic_data.index.max()}")
print(f"华虹半导体原始数据时间范围: {hhic_data.index.min()}{hhic_data.index.max()}")
# 限制为最近一年数据
end_date = smic_data.index.max()
start_date = end_date - pd.Timedelta(days=360)
print(f"\n限制回测时间范围: {start_date}{end_date}")
smic_data = smic_data.loc[start_date:end_date]
hhic_data = hhic_data.loc[start_date:end_date]
print(f"限制后中芯国际数据形状: {smic_data.shape}")
print(f"限制后华虹半导体数据形状: {hhic_data.shape}")
# 对齐数据时间范围
common_index = smic_data.index.intersection(hhic_data.index)
smic_data = smic_data.loc[common_index]
hhic_data = hhic_data.loc[common_index]
print(f"\n对齐后数据时间范围: {smic_data.index.min()}{smic_data.index.max()}")
print(f"对齐后数据点数: {len(smic_data)}")
return smic_data, hhic_data
if __name__ == "__main__":
smic_data, hhic_data = get_processed_data()
print("数据获取和预处理完成!")

225
gogogo/result_visualizer.py Normal file
View File

@ -0,0 +1,225 @@
# result_visualizer.py
import vectorbt as vbt
import pandas as pd
import matplotlib.pyplot as plt
from strategy_executor import execute_strategy
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
def create_portfolio(strategy_data):
"""创建投资组合"""
print("创建基于价格比率的投资组合...")
price_ratio = strategy_data['price_ratio']
size = strategy_data['size']
initial_cash = strategy_data['initial_cash']
commission = strategy_data['commission']
try:
# 将price_ratio转换为DataFramevectorbt需要
ratio_close = pd.DataFrame({'RATIO': price_ratio})
portfolio = vbt.Portfolio.from_orders(
close=ratio_close, # 只传入比率数据
size=size, # 基于比率的交易信号
init_cash=initial_cash,
fees=commission,
freq='D'
)
print("基于价格比率的投资组合创建成功!")
return portfolio
except Exception as e:
print(f"创建投资组合时出错: {e}")
import traceback
traceback.print_exc()
return None
def plot_results(strategy_data, portfolio):
"""绘制结果图表"""
price_ratio = strategy_data['price_ratio']
signals = strategy_data['signals']
ratio_ma = strategy_data['ratio_ma']
upper_band = strategy_data['upper_band']
lower_band = strategy_data['lower_band']
initial_cash = strategy_data['initial_cash']
# ========== 绘制基于比率的技术分析图表 ==========
print("\n绘制基于比率的技术分析图表...")
# 创建详细的技术分析图表
fig, axes = plt.subplots(4, 1, figsize=(15, 16))
# 1. 价格比率和布林带 + 交易信号
axes[0].plot(price_ratio.index, price_ratio, label='价格比率(中芯/华虹)', linewidth=1.5, color='blue')
axes[0].plot(price_ratio.index, ratio_ma, label='移动平均', linewidth=1, alpha=0.7, color='orange')
axes[0].plot(price_ratio.index, upper_band, label='上轨', linewidth=1, alpha=0.7, linestyle='--', color='red')
axes[0].plot(price_ratio.index, lower_band, label='下轨', linewidth=1, alpha=0.7, linestyle='--', color='green')
axes[0].set_title('中芯国际-华虹半导体价格比率 (配对交易标的)')
axes[0].set_ylabel('价格比率')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 标记交易信号
long_signals = signals[signals == 1]
short_signals = signals[signals == -1]
if len(long_signals) > 0:
axes[0].scatter(long_signals.index, price_ratio[long_signals.index],
color='green', marker='^', s=80, label='买入比率(做多中芯/做空华虹)', zorder=5)
if len(short_signals) > 0:
axes[0].scatter(short_signals.index, price_ratio[short_signals.index],
color='red', marker='v', s=80, label='卖空比率(做空中芯/做多华虹)', zorder=5)
# 2. 交易信号
axes[1].plot(signals.index, signals, label='交易信号', linewidth=2, color='purple', drawstyle='steps-post')
axes[1].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='买入信号')
axes[1].axhline(y=-1, color='red', linestyle='--', alpha=0.5, label='卖出信号')
axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3)
axes[1].set_title('交易信号时序')
axes[1].set_ylabel('信号')
axes[1].set_ylim(-1.5, 1.5)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# 3. 仓位变化
positions = signals.replace({1: '做多', -1: '做空', 0: '空仓'})
axes[2].plot(positions.index, positions, label='仓位状态', linewidth=2, color='darkorange', drawstyle='steps-post')
axes[2].set_title('仓位变化')
axes[2].set_ylabel('仓位')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
# 4. 组合净值
if portfolio is not None:
portfolio_value = portfolio.value()
if len(portfolio_value) > 0:
axes[3].plot(portfolio_value.index, portfolio_value, label='组合净值', linewidth=2, color='darkblue')
# 标记初始资金线
axes[3].axhline(y=initial_cash, color='red', linestyle='--', alpha=0.7, label=f'初始资金({initial_cash})')
axes[3].set_title('基于价格比率的配对交易组合净值')
axes[3].set_ylabel('组合价值')
axes[3].set_xlabel('日期')
axes[3].legend()
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def print_statistics(strategy_data, portfolio):
"""打印统计结果"""
price_ratio = strategy_data['price_ratio']
signals = strategy_data['signals']
print("\n=== 基于价格比率的配对交易策略表现 ===")
if portfolio is not None:
# ========== 使用vectorbt进行专业分析 ==========
print("\n=== VectorBT 专业分析(基于价格比率) ===")
try:
# 选择第一列(也是唯一的一列)
portfolio_single = portfolio['RATIO']
print(portfolio_single.stats())
# 绘制vectorbt图表
fig = portfolio_single.plot(subplots=[
'orders', # 订单
'trade_pnl', # 交易盈亏
'cum_returns', # 累积收益
'drawdowns' # 回撤
])
fig.update_layout(
title='基于价格比率的配对交易详细分析',
height=800
)
fig.show()
except Exception as e:
print(f"详细分析绘制失败: {e}")
# ========== 打印详细统计 ==========
print("\n=== 详细统计 ===")
try:
# 使用单列统计
stats = portfolio['RATIO'].stats()
def safe_get_stat(stat_dict, key, default="N/A"):
value = stat_dict.get(key, default)
if hasattr(value, 'iloc'):
return value.iloc[0] if len(value) == 1 else value
return value
print(f"开始日期: {safe_get_stat(stats, 'Start')}")
print(f"结束日期: {safe_get_stat(stats, 'End')}")
print(f"期间: {safe_get_stat(stats, 'Period')}")
print(f"总收益率: {safe_get_stat(stats, 'Total Return [%]', 'N/A')}%")
print(f"年化收益率: {safe_get_stat(stats, 'Annual Return [%]', 'N/A')}%")
print(f"年化波动率: {safe_get_stat(stats, 'Annual Volatility [%]', 'N/A')}%")
print(f"夏普比率: {safe_get_stat(stats, 'Sharpe Ratio', 'N/A')}")
print(f"最大回撤: {safe_get_stat(stats, 'Max Drawdown [%]', 'N/A')}%")
print(f"总交易次数: {safe_get_stat(stats, 'Total Trades', 'N/A')}")
print(f"胜率: {safe_get_stat(stats, 'Win Rate [%]', 'N/A')}%")
print(f"盈亏比: {safe_get_stat(stats, 'Profit Factor', 'N/A')}")
except Exception as e:
print(f"获取详细统计时出错: {e}")
# 分析每笔交易
try:
trades_df = portfolio['RATIO'].trades.records_readable
if len(trades_df) > 0:
print(f"\n交易分析:")
print(f"总交易次数: {len(trades_df)}")
if 'Duration' in trades_df.columns:
print(f"平均持仓时间: {trades_df['Duration'].mean():.1f}")
if 'PnL' in trades_df.columns:
print(f"最大单笔盈利: {trades_df['PnL'].max():.2f}")
print(f"最大单笔亏损: {trades_df['PnL'].min():.2f}")
winning_trades = trades_df[trades_df['PnL'] > 0]
losing_trades = trades_df[trades_df['PnL'] < 0]
if len(winning_trades) > 0:
print(f"平均盈利: {winning_trades['PnL'].mean():.2f}")
if len(losing_trades) > 0:
print(f"平均亏损: {losing_trades['PnL'].mean():.2f}")
except Exception as e:
print(f"分析交易时出错: {e}")
# ========== 比率数据统计摘要 ==========
print("\n=== 价格比率统计摘要 ===")
print(f"数据期间: {price_ratio.index.min()}{price_ratio.index.max()}")
print(f"数据点数: {len(price_ratio)}")
print(f"比率均值: {price_ratio.mean():.4f}")
print(f"比率标准差: {price_ratio.std():.4f}")
print(f"比率变异系数: {price_ratio.std()/price_ratio.mean():.4f}")
# 计算交易信号统计
long_count = (signals == 1).sum()
short_count = (signals == -1).sum()
total_signals = long_count + short_count
print(f"\n交易信号统计:")
print(f"做多信号次数: {long_count}")
print(f"做空信号次数: {short_count}")
print(f"总信号次数: {total_signals}")
def main():
"""主函数"""
# 执行策略
strategy_data = execute_strategy()
# 创建投资组合
portfolio = create_portfolio(strategy_data)
# 绘制结果
plot_results(strategy_data, portfolio)
# 打印统计
print_statistics(strategy_data, portfolio)
print("程序执行完成!")
if __name__ == "__main__":
main()

122
gogogo/strategy_executor.py Normal file
View File

@ -0,0 +1,122 @@
# strategy_executor.py
import pandas as pd
import numpy as np
from data_fetcher import get_processed_data
def calculate_ratio_signals(ratio_series, window=20, num_std=2):
"""
基于价格比率计算配对交易信号
"""
# 计算布林带
ratio_ma = ratio_series.rolling(window=window).mean()
ratio_std = ratio_series.rolling(window=window).std()
upper_band = ratio_ma + num_std * ratio_std
lower_band = ratio_ma - num_std * ratio_std
# 生成交易信号
# 1: 做多价差 (买中芯/卖华虹) -> 买入比率
# -1: 做空价差 (卖中芯/买华虹) -> 卖空比率
# 0: 平仓
signals = pd.Series(0, index=ratio_series.index, name='signal')
# 当比率突破下轨时做多价差 -> 买入比率
long_condition = (ratio_series < lower_band) & (ratio_ma.notna())
signals[long_condition] = 1
# 当比率突破上轨时做空价差 -> 卖空比率
short_condition = (ratio_series > upper_band) & (ratio_ma.notna())
signals[short_condition] = -1
# 当比率回归均值时平仓
close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0)
signals[close_condition] = 0
return signals, ratio_ma, upper_band, lower_band
def generate_ratio_size(signals, price_ratio, position_size=0.5):
"""
生成比率交易的size数据
返回一个与price_ratio相同形状的Series包含交易数量
"""
# 创建与price_ratio相同形状的size Series初始为0
size_series = pd.Series(0, index=signals.index, name='size')
current_position = 0
for i in range(len(signals)):
if i < 20: # 跳过布林带计算期
continue
signal = signals.iloc[i]
if signal == 1 and current_position != 1: # 做多价差 -> 买入比率
# 买入相当于做多中芯/做空华虹
size_series.iloc[i] = position_size # 正数表示买入比率
current_position = 1
elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率
# 卖空相当于做空中芯/做多华虹
size_series.iloc[i] = -position_size # 负数表示卖空比率
current_position = -1
elif signal == 0 and current_position != 0: # 平仓
size_series.iloc[i] = 0 # 平仓
current_position = 0
return size_series
def execute_strategy():
"""执行配对交易策略"""
# 获取数据
smic_data, hhic_data = get_processed_data()
# ========== 创建价格比率作为独立资产 ==========
print("\n=== 创建价格比率作为独立资产 ===")
close_smic = smic_data['close']
close_hhic = hhic_data['close']
# 计算价格比率 - 作为独立的"股票"
price_ratio = close_smic / close_hhic
price_ratio.name = 'SMIC_HHIC_RATIO'
print(f"价格比率数据形状: {price_ratio.shape}")
print(f"价格比率统计:")
print(f" 均值: {price_ratio.mean():.4f}")
print(f" 标准差: {price_ratio.std():.4f}")
print(f" 最小值: {price_ratio.min():.4f}")
print(f" 最大值: {price_ratio.max():.4f}")
# 设置交易参数
initial_cash = 100000
commission = 0.001 # 0.1% 交易佣金
position_size = 0.5 # 每次交易仓位比例
# 计算信号
signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals(
price_ratio, window=20, num_std=2
)
print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}")
# 生成size数据
size = generate_ratio_size(signals, price_ratio, position_size)
print(f"size数据形状: {size.shape}")
print(f"非零交易数量: {(size != 0).sum()}")
return {
'price_ratio': price_ratio,
'signals': signals,
'size': size,
'ratio_ma': ratio_ma,
'upper_band': upper_band,
'lower_band': lower_band,
'initial_cash': initial_cash,
'commission': commission,
'smic_data': smic_data,
'hhic_data': hhic_data
}
if __name__ == "__main__":
strategy_data = execute_strategy()
print("策略执行完成!")