336 lines
12 KiB
Python
336 lines
12 KiB
Python
# strategy_executor.py
|
||
import pandas as pd
|
||
import numpy as np
|
||
import vectorbt as vbt
|
||
from data_fetcher import get_processed_data
|
||
|
||
def calculate_ratio_signals(ratio_series, window=20, num_std=2):
|
||
"""
|
||
基于价格比率计算配对交易信号
|
||
"""
|
||
# 计算布林带
|
||
ratio_ma = ratio_series.rolling(window=window).mean()
|
||
ratio_std = ratio_series.rolling(window=window).std()
|
||
|
||
upper_band = ratio_ma + num_std * ratio_std
|
||
lower_band = ratio_ma - num_std * ratio_std
|
||
|
||
# 生成交易信号
|
||
# 1: 做多价差 (买中芯/卖华虹) -> 买入比率
|
||
# -1: 做空价差 (卖中芯/买华虹) -> 卖空比率
|
||
# 0: 平仓
|
||
signals = pd.Series(0, index=ratio_series.index, name='signal')
|
||
|
||
# 当比率突破下轨时做多价差 -> 买入比率
|
||
long_condition = (ratio_series < lower_band) & (ratio_ma.notna())
|
||
signals[long_condition] = 1
|
||
|
||
# 当比率突破上轨时做空价差 -> 卖空比率
|
||
short_condition = (ratio_series > upper_band) & (ratio_ma.notna())
|
||
signals[short_condition] = -1
|
||
|
||
# 当比率回归均值时平仓
|
||
close_condition = (ratio_series.between(lower_band, upper_band)) & (signals.shift(1) != 0)
|
||
signals[close_condition] = 0
|
||
|
||
return signals, ratio_ma, upper_band, lower_band
|
||
|
||
def generate_ratio_size(signals, price_ratio, position_size=0.5):
|
||
"""
|
||
生成比率交易的size数据
|
||
返回一个与price_ratio相同形状的Series,包含交易数量
|
||
"""
|
||
# 创建与price_ratio相同形状的size Series,初始为0
|
||
size_series = pd.Series(0, index=signals.index, name='size')
|
||
current_position = 0
|
||
|
||
for i in range(len(signals)):
|
||
if i < 20: # 跳过布林带计算期
|
||
continue
|
||
|
||
signal = signals.iloc[i]
|
||
|
||
if signal == 1 and current_position != 1: # 做多价差 -> 买入比率
|
||
# 买入相当于做多中芯/做空华虹
|
||
size_series.iloc[i] = position_size # 正数表示买入比率
|
||
current_position = 1
|
||
|
||
elif signal == -1 and current_position != -1: # 做空价差 -> 卖空比率
|
||
# 卖空相当于做空中芯/做多华虹
|
||
size_series.iloc[i] = -position_size # 负数表示卖空比率
|
||
current_position = -1
|
||
|
||
elif signal == 0 and current_position != 0: # 平仓
|
||
size_series.iloc[i] = 0 # 平仓
|
||
current_position = 0
|
||
|
||
return size_series
|
||
|
||
def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5):
|
||
"""
|
||
生成真实股票交易的size数据
|
||
返回两个DataFrame:smic_size和hhic_size
|
||
"""
|
||
# 创建空的size Series
|
||
smic_size = pd.Series(0.0, index=signals.index, name='SMIC')
|
||
hhic_size = pd.Series(0.0, index=signals.index, name='HHIC')
|
||
|
||
current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差
|
||
|
||
for i in range(len(signals)):
|
||
if i < 20: # 跳过布林带计算期
|
||
continue
|
||
|
||
signal = signals.iloc[i]
|
||
smic_price = close_smic.iloc[i]
|
||
hhic_price = close_hhic.iloc[i]
|
||
|
||
if signal == 1 and current_position != 1: # 做多价差:买入中芯,卖空华虹
|
||
# 计算每只股票的仓位价值
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做多中芯国际
|
||
smic_shares = position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
|
||
# 做空华虹半导体
|
||
hhic_shares = -position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
|
||
current_position = 1
|
||
|
||
elif signal == -1 and current_position != -1: # 做空价差:卖空中芯,买入华虹
|
||
# 计算每只股票的仓位价值
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做空中芯国际
|
||
smic_shares = -position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
|
||
# 做多华虹半导体
|
||
hhic_shares = position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
|
||
current_position = -1
|
||
|
||
elif signal == 0 and current_position != 0: # 平仓
|
||
# 平掉所有仓位
|
||
if current_position == 1: # 平掉做多价差仓位
|
||
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
|
||
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
|
||
elif current_position == -1: # 平掉做空价差仓位
|
||
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
|
||
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
|
||
|
||
current_position = 0
|
||
|
||
# 创建size DataFrame
|
||
size_df = pd.DataFrame({
|
||
'SMIC': smic_size,
|
||
'HHIC': hhic_size
|
||
})
|
||
|
||
return size_df
|
||
|
||
def generate_strategy():
|
||
"""生成配对交易策略"""
|
||
# 获取数据
|
||
smic_data, hhic_data = get_processed_data()
|
||
|
||
# ========== 创建价格比率作为独立资产 ==========
|
||
print("\n=== 创建价格比率作为独立资产 ===")
|
||
close_smic = smic_data['close']
|
||
close_hhic = hhic_data['close']
|
||
|
||
# 计算价格比率 - 作为独立的"股票"
|
||
price_ratio = close_smic / close_hhic
|
||
price_ratio.name = 'SMIC_HHIC_RATIO'
|
||
|
||
print(f"价格比率数据形状: {price_ratio.shape}")
|
||
print(f"价格比率统计:")
|
||
print(f" 均值: {price_ratio.mean():.4f}")
|
||
print(f" 标准差: {price_ratio.std():.4f}")
|
||
print(f" 最小值: {price_ratio.min():.4f}")
|
||
print(f" 最大值: {price_ratio.max():.4f}")
|
||
|
||
# 设置交易参数
|
||
initial_cash = 100000
|
||
commission = 0.001 # 0.1% 交易佣金
|
||
position_size = 0.5 # 每次交易仓位比例
|
||
|
||
# 计算信号
|
||
signals, ratio_ma, upper_band, lower_band = calculate_ratio_signals(
|
||
price_ratio, window=20, num_std=2
|
||
)
|
||
|
||
print(f"信号计算完成,有效信号数量: {(signals != 0).sum()}")
|
||
|
||
# 生成比率交易的size数据(用于基于比率的回测)
|
||
ratio_size = generate_ratio_size(signals, price_ratio, position_size)
|
||
print(f"比率size数据形状: {ratio_size.shape}")
|
||
print(f"比率非零交易数量: {(ratio_size != 0).sum()}")
|
||
|
||
# 生成真实股票交易的size数据(用于真实股票回测)
|
||
stock_sizes = generate_stock_sizes(signals, close_smic, close_hhic, initial_cash, position_size)
|
||
print(f"股票size数据形状: {stock_sizes.shape}")
|
||
print(f"中芯国际非零交易数量: {(stock_sizes['SMIC'] != 0).sum()}")
|
||
print(f"华虹半导体非零交易数量: {(stock_sizes['HHIC'] != 0).sum()}")
|
||
|
||
return {
|
||
'price_ratio': price_ratio,
|
||
'signals': signals,
|
||
'ratio_size': ratio_size, # 基于比率的size
|
||
'stock_sizes': stock_sizes, # 真实股票的size
|
||
'ratio_ma': ratio_ma,
|
||
'upper_band': upper_band,
|
||
'lower_band': lower_band,
|
||
'initial_cash': initial_cash,
|
||
'commission': commission,
|
||
'smic_data': smic_data,
|
||
'hhic_data': hhic_data,
|
||
'close_smic': close_smic,
|
||
'close_hhic': close_hhic
|
||
}
|
||
|
||
def create_portfolio(strategy_data):
|
||
"""创建基于价格比率的投资组合(用于分析)"""
|
||
print("创建基于价格比率的投资组合...")
|
||
|
||
price_ratio = strategy_data['price_ratio']
|
||
ratio_size = strategy_data['ratio_size']
|
||
initial_cash = strategy_data['initial_cash']
|
||
commission = strategy_data['commission']
|
||
|
||
try:
|
||
# 将price_ratio转换为DataFrame(vectorbt需要)
|
||
ratio_close = pd.DataFrame({'RATIO': price_ratio})
|
||
|
||
portfolio = vbt.Portfolio.from_orders(
|
||
close=ratio_close, # 只传入比率数据
|
||
size=ratio_size, # 基于比率的交易信号
|
||
init_cash=initial_cash,
|
||
fees=commission,
|
||
freq='D'
|
||
)
|
||
|
||
print("基于价格比率的投资组合创建成功!")
|
||
return portfolio
|
||
|
||
except Exception as e:
|
||
print(f"创建投资组合时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def create_real_stock_portfolio(strategy_data):
|
||
"""创建基于真实股票的投资组合(用于真实收益计算)"""
|
||
print("创建基于真实股票的投资组合...")
|
||
|
||
close_smic = strategy_data['close_smic']
|
||
close_hhic = strategy_data['close_hhic']
|
||
stock_sizes = strategy_data['stock_sizes']
|
||
initial_cash = strategy_data['initial_cash']
|
||
commission = strategy_data['commission']
|
||
|
||
try:
|
||
# 创建包含两只股票收盘价的DataFrame
|
||
close_df = pd.DataFrame({
|
||
'SMIC': close_smic,
|
||
'HHIC': close_hhic
|
||
})
|
||
|
||
# 创建投资组合
|
||
portfolio = vbt.Portfolio.from_orders(
|
||
close=close_df, # 传入两只股票的收盘价
|
||
size=stock_sizes, # 传入两只股票的交易数量
|
||
init_cash=initial_cash,
|
||
fees=commission,
|
||
freq='D'
|
||
)
|
||
|
||
print("基于真实股票的投资组合创建成功!")
|
||
return portfolio
|
||
|
||
except Exception as e:
|
||
print(f"创建真实股票投资组合时出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def generate_stock_sizes(signals, close_smic, close_hhic, initial_cash=100000, position_ratio=0.5):
|
||
"""
|
||
生成真实股票交易的size数据
|
||
返回两个DataFrame:smic_size和hhic_size
|
||
"""
|
||
# 创建空的size Series
|
||
smic_size = pd.Series(0.0, index=signals.index, name='SMIC')
|
||
hhic_size = pd.Series(0.0, index=signals.index, name='HHIC')
|
||
|
||
current_position = 0 # 0: 无仓位, 1: 做多价差, -1: 做空价差
|
||
|
||
for i in range(len(signals)):
|
||
if i < 20: # 跳过布林带计算期
|
||
continue
|
||
|
||
signal = signals.iloc[i]
|
||
smic_price = close_smic.iloc[i]
|
||
hhic_price = close_hhic.iloc[i]
|
||
|
||
if signal == 1 and current_position != 1: # 做多价差:买入中芯,卖空华虹
|
||
# 计算每只股票的仓位价值(等市值对冲)
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做多中芯国际
|
||
smic_shares = position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
|
||
# 做空华虹半导体
|
||
hhic_shares = -position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
|
||
current_position = 1
|
||
|
||
elif signal == -1 and current_position != -1: # 做空价差:卖空中芯,买入华虹
|
||
# 计算每只股票的仓位价值(等市值对冲)
|
||
position_value = initial_cash * position_ratio
|
||
|
||
# 做空中芯国际
|
||
smic_shares = -position_value / smic_price
|
||
smic_size.iloc[i] = smic_shares
|
||
|
||
# 做多华虹半导体
|
||
hhic_shares = position_value / hhic_price
|
||
hhic_size.iloc[i] = hhic_shares
|
||
|
||
current_position = -1
|
||
|
||
elif signal == 0 and current_position != 0: # 平仓
|
||
# 平掉所有仓位
|
||
if current_position == 1: # 平掉做多价差仓位
|
||
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
|
||
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
|
||
elif current_position == -1: # 平掉做空价差仓位
|
||
smic_size.iloc[i] = -smic_size.shift(1).iloc[i] if i > 0 else 0
|
||
hhic_size.iloc[i] = -hhic_size.shift(1).iloc[i] if i > 0 else 0
|
||
|
||
current_position = 0
|
||
|
||
# 创建size DataFrame
|
||
size_df = pd.DataFrame({
|
||
'SMIC': smic_size,
|
||
'HHIC': hhic_size
|
||
})
|
||
|
||
return size_df
|
||
|
||
|
||
if __name__ == "__main__":
|
||
strategy_data = generate_strategy()
|
||
|
||
# 测试基于比率的投资组合
|
||
ratio_portfolio = create_portfolio(strategy_data)
|
||
|
||
# 测试基于真实股票的投资组合
|
||
stock_portfolio = create_real_stock_portfolio(strategy_data)
|
||
|
||
print("策略生成完成!") |