From 77fbebe23192886d3768673fcb0295b72483738f Mon Sep 17 00:00:00 2001 From: bingyi Date: Sat, 1 Nov 2025 22:49:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=8E=B7=E5=8F=96=EF=BC=8C?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E6=89=A7=E8=A1=8C=EF=BC=8C=E7=BB=93=E6=9E=9C?= =?UTF-8?q?=E6=9F=A5=E7=9C=8B=E6=8B=86=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- gogogo/data_fetcher.py | 79 +++++++++++++ gogogo/result_visualizer.py | 225 ++++++++++++++++++++++++++++++++++++ gogogo/strategy_executor.py | 122 +++++++++++++++++++ 4 files changed, 428 insertions(+), 1 deletion(-) create mode 100644 gogogo/data_fetcher.py create mode 100644 gogogo/result_visualizer.py create mode 100644 gogogo/strategy_executor.py diff --git a/.gitignore b/.gitignore index 3699812..6094eef 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -**/*.csv \ No newline at end of file +**/*.csv +__pycache__/ diff --git a/gogogo/data_fetcher.py b/gogogo/data_fetcher.py new file mode 100644 index 0000000..2e8419e --- /dev/null +++ b/gogogo/data_fetcher.py @@ -0,0 +1,79 @@ +# data_fetcher.py +import akshare as ak +import pandas as pd +import datetime + +def fetch_stock_data(): + """获取股票数据""" + print("正在获取股票数据...") + stock_00981 = ak.stock_hk_daily(symbol="00981") + stock_01347 = ak.stock_hk_daily(symbol="01347") + + print("中芯国际数据列名:", stock_00981.columns.tolist()) + print("华虹半导体数据列名:", stock_01347.columns.tolist()) + + return stock_00981, stock_01347 + +def preprocess_data(df, symbol): + """预处理股票数据""" + df = df.copy() + # 检查列名并重命名(如果需要) + if 'date' in df.columns: + df['date'] = pd.to_datetime(df['date']) + df.set_index('date', inplace=True) + elif '日期' in df.columns: + df['date'] = pd.to_datetime(df['日期']) + df.set_index('date', inplace=True) + # 重命名中文列为英文 + rename_dict = { + '开盘': 'open', + '最高': 'high', + '最低': 'low', + '收盘': 'close', + '成交量': 'volume' + } + df = df.rename(columns=rename_dict) + else: + # 如果已经有英文列名,直接使用 + df.index = pd.to_datetime(df.index) + + df = df.sort_index() + return df[['open', 'high', 'low', 'close', 'volume']] + +def get_processed_data(): + """获取并处理数据""" + # 获取原始数据 + stock_00981, stock_01347 = fetch_stock_data() + + # 预处理数据 + smic_data = preprocess_data(stock_00981, "00981") + hhic_data = preprocess_data(stock_01347, "01347") + + print(f"中芯国际原始数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") + print(f"华虹半导体原始数据时间范围: {hhic_data.index.min()} 到 {hhic_data.index.max()}") + + # 限制为最近一年数据 + end_date = smic_data.index.max() + start_date = end_date - pd.Timedelta(days=360) + + print(f"\n限制回测时间范围: {start_date} 到 {end_date}") + + smic_data = smic_data.loc[start_date:end_date] + hhic_data = hhic_data.loc[start_date:end_date] + + print(f"限制后中芯国际数据形状: {smic_data.shape}") + print(f"限制后华虹半导体数据形状: {hhic_data.shape}") + + # 对齐数据时间范围 + common_index = smic_data.index.intersection(hhic_data.index) + smic_data = smic_data.loc[common_index] + hhic_data = hhic_data.loc[common_index] + + print(f"\n对齐后数据时间范围: {smic_data.index.min()} 到 {smic_data.index.max()}") + print(f"对齐后数据点数: {len(smic_data)}") + + return smic_data, hhic_data + +if __name__ == "__main__": + smic_data, hhic_data = get_processed_data() + print("数据获取和预处理完成!") \ No newline at end of file diff --git a/gogogo/result_visualizer.py b/gogogo/result_visualizer.py new file mode 100644 index 0000000..32cb739 --- /dev/null +++ b/gogogo/result_visualizer.py @@ -0,0 +1,225 @@ +# result_visualizer.py +import vectorbt as vbt +import pandas as pd +import matplotlib.pyplot as plt +from strategy_executor import execute_strategy + +# 设置中文显示 +plt.rcParams['font.sans-serif'] = ['SimHei'] +plt.rcParams['axes.unicode_minus'] = False + +def create_portfolio(strategy_data): + """创建投资组合""" + print("创建基于价格比率的投资组合...") + + price_ratio = strategy_data['price_ratio'] + size = strategy_data['size'] + initial_cash = strategy_data['initial_cash'] + commission = strategy_data['commission'] + + try: + # 将price_ratio转换为DataFrame(vectorbt需要) + ratio_close = pd.DataFrame({'RATIO': price_ratio}) + + portfolio = vbt.Portfolio.from_orders( + close=ratio_close, # 只传入比率数据 + size=size, # 基于比率的交易信号 + init_cash=initial_cash, + fees=commission, + freq='D' + ) + + print("基于价格比率的投资组合创建成功!") + return portfolio + + except Exception as e: + print(f"创建投资组合时出错: {e}") + import traceback + traceback.print_exc() + return None + +def plot_results(strategy_data, portfolio): + """绘制结果图表""" + price_ratio = strategy_data['price_ratio'] + signals = strategy_data['signals'] + ratio_ma = strategy_data['ratio_ma'] + upper_band = strategy_data['upper_band'] + lower_band = strategy_data['lower_band'] + initial_cash = strategy_data['initial_cash'] + + # ========== 绘制基于比率的技术分析图表 ========== + print("\n绘制基于比率的技术分析图表...") + + # 创建详细的技术分析图表 + fig, axes = plt.subplots(4, 1, figsize=(15, 16)) + + # 1. 价格比率和布林带 + 交易信号 + axes[0].plot(price_ratio.index, price_ratio, label='价格比率(中芯/华虹)', linewidth=1.5, color='blue') + axes[0].plot(price_ratio.index, ratio_ma, label='移动平均', linewidth=1, alpha=0.7, color='orange') + axes[0].plot(price_ratio.index, upper_band, label='上轨', linewidth=1, alpha=0.7, linestyle='--', color='red') + axes[0].plot(price_ratio.index, lower_band, label='下轨', linewidth=1, alpha=0.7, linestyle='--', color='green') + axes[0].set_title('中芯国际-华虹半导体价格比率 (配对交易标的)') + axes[0].set_ylabel('价格比率') + axes[0].legend() + axes[0].grid(True, alpha=0.3) + + # 标记交易信号 + long_signals = signals[signals == 1] + short_signals = signals[signals == -1] + + if len(long_signals) > 0: + axes[0].scatter(long_signals.index, price_ratio[long_signals.index], + color='green', marker='^', s=80, label='买入比率(做多中芯/做空华虹)', zorder=5) + if len(short_signals) > 0: + axes[0].scatter(short_signals.index, price_ratio[short_signals.index], + color='red', marker='v', s=80, label='卖空比率(做空中芯/做多华虹)', zorder=5) + + # 2. 交易信号 + axes[1].plot(signals.index, signals, label='交易信号', linewidth=2, color='purple', drawstyle='steps-post') + axes[1].axhline(y=1, color='green', linestyle='--', alpha=0.5, label='买入信号') + axes[1].axhline(y=-1, color='red', linestyle='--', alpha=0.5, label='卖出信号') + axes[1].axhline(y=0, color='gray', linestyle='-', alpha=0.3) + axes[1].set_title('交易信号时序') + axes[1].set_ylabel('信号') + axes[1].set_ylim(-1.5, 1.5) + axes[1].legend() + axes[1].grid(True, alpha=0.3) + + # 3. 仓位变化 + positions = signals.replace({1: '做多', -1: '做空', 0: '空仓'}) + axes[2].plot(positions.index, positions, label='仓位状态', linewidth=2, color='darkorange', drawstyle='steps-post') + axes[2].set_title('仓位变化') + axes[2].set_ylabel('仓位') + axes[2].legend() + axes[2].grid(True, alpha=0.3) + + # 4. 组合净值 + if portfolio is not None: + portfolio_value = portfolio.value() + if len(portfolio_value) > 0: + axes[3].plot(portfolio_value.index, portfolio_value, label='组合净值', linewidth=2, color='darkblue') + # 标记初始资金线 + axes[3].axhline(y=initial_cash, color='red', linestyle='--', alpha=0.7, label=f'初始资金({initial_cash})') + axes[3].set_title('基于价格比率的配对交易组合净值') + axes[3].set_ylabel('组合价值') + axes[3].set_xlabel('日期') + axes[3].legend() + axes[3].grid(True, alpha=0.3) + + plt.tight_layout() + plt.show() + +def print_statistics(strategy_data, portfolio): + """打印统计结果""" + price_ratio = strategy_data['price_ratio'] + signals = strategy_data['signals'] + + print("\n=== 基于价格比率的配对交易策略表现 ===") + + if portfolio is not None: + # ========== 使用vectorbt进行专业分析 ========== + print("\n=== VectorBT 专业分析(基于价格比率) ===") + + try: + # 选择第一列(也是唯一的一列) + portfolio_single = portfolio['RATIO'] + + print(portfolio_single.stats()) + + # 绘制vectorbt图表 + fig = portfolio_single.plot(subplots=[ + 'orders', # 订单 + 'trade_pnl', # 交易盈亏 + 'cum_returns', # 累积收益 + 'drawdowns' # 回撤 + ]) + fig.update_layout( + title='基于价格比率的配对交易详细分析', + height=800 + ) + fig.show() + except Exception as e: + print(f"详细分析绘制失败: {e}") + + # ========== 打印详细统计 ========== + print("\n=== 详细统计 ===") + try: + # 使用单列统计 + stats = portfolio['RATIO'].stats() + + def safe_get_stat(stat_dict, key, default="N/A"): + value = stat_dict.get(key, default) + if hasattr(value, 'iloc'): + return value.iloc[0] if len(value) == 1 else value + return value + + print(f"开始日期: {safe_get_stat(stats, 'Start')}") + print(f"结束日期: {safe_get_stat(stats, 'End')}") + print(f"期间: {safe_get_stat(stats, 'Period')}") + print(f"总收益率: {safe_get_stat(stats, 'Total Return [%]', 'N/A')}%") + print(f"年化收益率: {safe_get_stat(stats, 'Annual Return [%]', 'N/A')}%") + print(f"年化波动率: {safe_get_stat(stats, 'Annual Volatility [%]', 'N/A')}%") + print(f"夏普比率: {safe_get_stat(stats, 'Sharpe Ratio', 'N/A')}") + print(f"最大回撤: {safe_get_stat(stats, 'Max Drawdown [%]', 'N/A')}%") + print(f"总交易次数: {safe_get_stat(stats, 'Total Trades', 'N/A')}") + print(f"胜率: {safe_get_stat(stats, 'Win Rate [%]', 'N/A')}%") + print(f"盈亏比: {safe_get_stat(stats, 'Profit Factor', 'N/A')}") + + except Exception as e: + print(f"获取详细统计时出错: {e}") + + # 分析每笔交易 + try: + trades_df = portfolio['RATIO'].trades.records_readable + if len(trades_df) > 0: + print(f"\n交易分析:") + print(f"总交易次数: {len(trades_df)}") + if 'Duration' in trades_df.columns: + print(f"平均持仓时间: {trades_df['Duration'].mean():.1f} 天") + if 'PnL' in trades_df.columns: + print(f"最大单笔盈利: {trades_df['PnL'].max():.2f}") + print(f"最大单笔亏损: {trades_df['PnL'].min():.2f}") + winning_trades = trades_df[trades_df['PnL'] > 0] + losing_trades = trades_df[trades_df['PnL'] < 0] + if len(winning_trades) > 0: + print(f"平均盈利: {winning_trades['PnL'].mean():.2f}") + if len(losing_trades) > 0: + print(f"平均亏损: {losing_trades['PnL'].mean():.2f}") + except Exception as e: + print(f"分析交易时出错: {e}") + + # ========== 比率数据统计摘要 ========== + print("\n=== 价格比率统计摘要 ===") + print(f"数据期间: {price_ratio.index.min()} 到 {price_ratio.index.max()}") + print(f"数据点数: {len(price_ratio)}") + print(f"比率均值: {price_ratio.mean():.4f}") + print(f"比率标准差: {price_ratio.std():.4f}") + print(f"比率变异系数: {price_ratio.std()/price_ratio.mean():.4f}") + + # 计算交易信号统计 + long_count = (signals == 1).sum() + short_count = (signals == -1).sum() + total_signals = long_count + short_count + print(f"\n交易信号统计:") + print(f"做多信号次数: {long_count}") + print(f"做空信号次数: {short_count}") + print(f"总信号次数: {total_signals}") + +def main(): + """主函数""" + # 执行策略 + strategy_data = execute_strategy() + + # 创建投资组合 + portfolio = create_portfolio(strategy_data) + + # 绘制结果 + plot_results(strategy_data, portfolio) + + # 打印统计 + print_statistics(strategy_data, portfolio) + + print("程序执行完成!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/gogogo/strategy_executor.py b/gogogo/strategy_executor.py new file mode 100644 index 0000000..3de9ccd --- /dev/null +++ b/gogogo/strategy_executor.py @@ -0,0 +1,122 @@ +# strategy_executor.py +import pandas as pd +import numpy as np +from data_fetcher import get_processed_data + +def calculate_ratio_signals(ratio_series, window=20, num_std=2): + """ + 基于价格比率计算配对交易信号 + """ + # 计算布林带 + ratio_ma = ratio_series.rolling(window=window).mean() + ratio_std = ratio_series.rolling(window=window).std() + + upper_band = ratio_ma + num_std * ratio_std + lower_band = ratio_ma - num_std * ratio_std + + # 生成交易信号 + # 1: 做多价差 (买中芯/卖华虹) -> 买入比率 + # -1: 做空价差 (卖中芯/买华虹) -> 卖空比率 + # 0: 平仓 + signals = pd.Series(0, index=ratio_series.index, name='signal') + + # 当比率突破下轨时做多价差 -> 买入比率 + long_condition = (ratio_series < lower_band) & (ratio_ma.notna()) + signals[long_condition] = 1 + + # 当比率突破上轨时做空价差 -> 卖空比率 + short_condition = (ratio_series > upper_band) & (ratio_ma.notna()) + signals[short_condition] = -1 + + # 当比率回归均值时平仓 + close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0) + signals[close_condition] = 0 + + return signals, ratio_ma, upper_band, lower_band + +def generate_ratio_size(signals, price_ratio, position_size=0.5): + """ + 生成比率交易的size数据 + 返回一个与price_ratio相同形状的Series,包含交易数量 + """ + # 创建与price_ratio相同形状的size Series,初始为0 + size_series = pd.Series(0, index=signals.index, name='size') + current_position = 0 + + for i in range(len(signals)): + if i < 20: # 跳过布林带计算期 + continue + + signal = signals.iloc[i] + + if signal == 1 and current_position != 1: # 做多价差 -> 买入比率 + # 买入相当于做多中芯/做空华虹 + size_series.iloc[i] = position_size # 正数表示买入比率 + current_position = 1 + + elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率 + # 卖空相当于做空中芯/做多华虹 + size_series.iloc[i] = -position_size # 负数表示卖空比率 + current_position = -1 + + elif signal == 0 and current_position != 0: # 平仓 + size_series.iloc[i] = 0 # 平仓 + current_position = 0 + + return size_series + +def execute_strategy(): + """执行配对交易策略""" + # 获取数据 + smic_data, hhic_data = get_processed_data() + + # ========== 创建价格比率作为独立资产 ========== + print("\n=== 创建价格比率作为独立资产 ===") + close_smic = smic_data['close'] + close_hhic = hhic_data['close'] + + # 计算价格比率 - 作为独立的"股票" + price_ratio = close_smic / close_hhic + price_ratio.name = 'SMIC_HHIC_RATIO' + + print(f"价格比率数据形状: {price_ratio.shape}") + print(f"价格比率统计:") + print(f" 均值: {price_ratio.mean():.4f}") + print(f" 标准差: {price_ratio.std():.4f}") + print(f" 最小值: {price_ratio.min():.4f}") + print(f" 最大值: {price_ratio.max():.4f}") + + # 设置交易参数 + initial_cash = 100000 + commission = 0.001 # 0.1% 交易佣金 + position_size = 0.5 # 每次交易仓位比例 + + # 计算信号 + signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals( + price_ratio, window=20, num_std=2 + ) + + print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}") + + # 生成size数据 + size = generate_ratio_size(signals, price_ratio, position_size) + + print(f"size数据形状: {size.shape}") + print(f"非零交易数量: {(size != 0).sum()}") + + return { + 'price_ratio': price_ratio, + 'signals': signals, + 'size': size, + 'ratio_ma': ratio_ma, + 'upper_band': upper_band, + 'lower_band': lower_band, + 'initial_cash': initial_cash, + 'commission': commission, + 'smic_data': smic_data, + 'hhic_data': hhic_data + } + +if __name__ == "__main__": + strategy_data = execute_strategy() + print("策略执行完成!") \ No newline at end of file