This commit is contained in:
2025-10-31 09:32:53 +08:00
commit 33c243a22a
15 changed files with 5800 additions and 0 deletions

File diff suppressed because one or more lines are too long

1369
1-quicstart.ipynb Normal file

File diff suppressed because one or more lines are too long

294
bt.py Normal file
View File

@ -0,0 +1,294 @@
import vectorbt as vbt
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# 设置中文字体和显示格式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
pd.set_option('display.float_format', '{:.4f}'.format)
stock_00981 = ak.stock_hk_daily(symbol="00981")
stock_01347 = ak.stock_hk_daily(symbol="01347")
class PairsTradingStrategy:
def __init__(self, price_a, price_b, lookback_window=60, k=1.5):
"""
初始化配对交易策略
"""
self.price_a = price_a
self.price_b = price_b
self.lookback_window = lookback_window
self.k = k
def calculate_ratio(self):
"""计算价格比率 RS = 华虹/中芯"""
return self.price_b / self.price_a
def generate_signals(self):
"""生成交易信号"""
ratio = self.calculate_ratio()
# 计算滚动均值和标准差
ratio_mean = ratio.rolling(window=self.lookback_window).mean()
ratio_std = ratio.rolling(window=self.lookback_window).std()
# 计算上下轨道
upper_band = ratio_mean + self.k * ratio_std
lower_band = ratio_mean - self.k * ratio_std
# 生成信号
short_b_long_a = ratio > upper_band # 做空华虹,做多中芯
long_b_short_a = ratio < lower_band # 做多华虹,做空中芯
exit_signal = (ratio <= ratio_mean) & (ratio >= ratio_mean) # 回归均值平仓
return {
'ratio': ratio,
'ratio_mean': ratio_mean,
'upper_band': upper_band,
'lower_band': lower_band,
'short_b_long_a': short_b_long_a,
'long_b_short_a': long_b_short_a,
'exit_signal': exit_signal
}
def backtest(self, initial_cash=100000, transaction_cost=0.001):
"""执行回测 - 修复后的版本"""
signals = self.generate_signals()
# 创建价格DataFrame
prices = pd.DataFrame({
'SMIC': self.price_a, # 中芯国际
'HuaHong': self.price_b # 华虹半导体
})
# 生成仓位信号
positions = self._generate_positions(signals, prices)
# 使用vectorbt进行回测
portfolio = vbt.Portfolio.from_holdings(
prices,
size=positions, # 仓位大小
init_cash=initial_cash,
fees=transaction_cost,
freq='1D'
)
return portfolio, signals
def _generate_positions(self, signals, prices):
"""生成仓位序列"""
positions = pd.DataFrame(0, index=prices.index, columns=prices.columns)
cash_per_trade = 0.2 # 每次交易使用20%资金
current_position = 0 # 0: 无仓位, 1: 做空华虹做多中芯, -1: 做多华虹做空中芯
for i in range(len(prices)):
if i < self.lookback_window:
continue
current_date = prices.index[i]
# 退出信号
if current_position != 0 and signals['exit_signal'].iloc[i]:
positions.loc[current_date, :] = 0
current_position = 0
# 开仓信号 - 只在没有持仓时开仓
elif current_position == 0:
if signals['short_b_long_a'].iloc[i]:
# 做空华虹,做多中芯
positions.loc[current_date, 'HuaHong'] = -cash_per_trade # 做空
positions.loc[current_date, 'SMIC'] = cash_per_trade # 做多
current_position = 1
elif signals['long_b_short_a'].iloc[i]:
# 做多华虹,做空中芯
positions.loc[current_date, 'HuaHong'] = cash_per_trade # 做多
positions.loc[current_date, 'SMIC'] = -cash_per_trade # 做空
current_position = -1
# 前向填充仓位,直到下一个信号
positions = positions.replace(0, np.nan).ffill().fillna(0)
return positions
# 数据预处理函数
def prepare_data(stock_00981, stock_01347):
"""准备回测数据"""
# 复制数据避免修改原数据
smic_data = stock_00981.copy()
huahong_data = stock_01347.copy()
# 设置日期索引
smic_data['date'] = pd.to_datetime(smic_data['date'])
huahong_data['date'] = pd.to_datetime(huahong_data['date'])
smic_data = smic_data.set_index('date')
huahong_data = huahong_data.set_index('date')
# 对齐数据 - 只保留两个股票都有的交易日
common_dates = smic_data.index.intersection(huahong_data.index)
smic_aligned = smic_data.loc[common_dates]
huahong_aligned = huahong_data.loc[common_dates]
return smic_aligned['close'], huahong_aligned['close']
# 执行回测
print("准备数据...")
smic_close, huahong_close = prepare_data(stock_00981, stock_01347)
print(f"数据时间范围: {smic_close.index.min()}{smic_close.index.max()}")
print(f"总交易日数: {len(smic_close)}")
print(f"中芯国际价格范围: {smic_close.min():.2f} - {smic_close.max():.2f}")
print(f"华虹半导体价格范围: {huahong_close.min():.2f} - {huahong_close.max():.2f}")
# 创建策略实例
strategy = PairsTradingStrategy(
price_a=smic_close, # 中芯国际
price_b=huahong_close, # 华虹半导体
lookback_window=60, # 60日滚动窗口
k=1.5 # 1.5倍标准差
)
print("执行回测...")
portfolio, signals = strategy.backtest(
initial_cash=100000, # 初始资金10万
transaction_cost=0.001 # 交易成本0.1%
)
# 分析结果
print("\n" + "="*50)
print("回测结果分析")
print("="*50)
# 基本统计
try:
print(f"总收益率: {portfolio.total_return():.2%}")
print(f"年化收益率: {portfolio.annualized_return():.2%}")
print(f"最大回撤: {portfolio.max_drawdown():.2%}")
print(f"夏普比率: {portfolio.sharpe_ratio():.2f}")
except:
print("部分指标计算失败,继续显示其他结果...")
# 交易统计
try:
stats = portfolio.stats()
print(f"总交易次数: {stats['Total Trades']}")
print(f"胜率: {stats.get('Win Rate [%]', 'N/A')}%")
print(f"盈亏比: {stats.get('Profit Factor', 'N/A')}")
except:
print("交易统计获取失败")
# 可视化结果
fig = plt.figure(figsize=(15, 12))
# 1. 价格比率和交易信号
ax1 = plt.subplot(3, 1, 1)
plt.plot(signals['ratio'].index, signals['ratio'].values, label='价格比率(华虹/中芯)', linewidth=1)
plt.plot(signals['ratio_mean'].index, signals['ratio_mean'].values, label='滚动均值', linestyle='--', alpha=0.7)
plt.plot(signals['upper_band'].index, signals['upper_band'].values, label=f'上轨(μ+{strategy.k}σ)', linestyle='--', color='red', alpha=0.7)
plt.plot(signals['lower_band'].index, signals['lower_band'].values, label=f'下轨(μ-{strategy.k}σ)', linestyle='--', color='green', alpha=0.7)
# 标记交易信号
short_signals = signals['ratio'][signals['short_b_long_a'] & (signals['ratio'].notna())]
long_signals = signals['ratio'][signals['long_b_short_a'] & (signals['ratio'].notna())]
if len(short_signals) > 0:
plt.scatter(short_signals.index, short_signals.values, color='red', marker='v', s=50, label='做空华虹/做多中芯')
if len(long_signals) > 0:
plt.scatter(long_signals.index, long_signals.values, color='green', marker='^', s=50, label='做多华虹/做空中芯')
plt.title('价格比率与交易信号')
plt.legend()
plt.grid(True, alpha=0.3)
# 2. 累积收益
ax2 = plt.subplot(3, 1, 2)
try:
portfolio.value.vbt.plot(ax=ax2, label='策略价值')
(portfolio.init_cash * (1 + portfolio.returns).cumprod()).vbt.plot(ax=ax2, label='买入持有')
plt.title('策略价值 vs 买入持有')
plt.legend()
plt.grid(True, alpha=0.3)
except:
plt.text(0.5, 0.5, '收益数据无法显示', ha='center', va='center', transform=ax2.transAxes)
# 3. 仓位变化
ax3 = plt.subplot(3, 1, 3)
try:
portfolio.positions.records_readable['Size'].groupby(portfolio.positions.records_readable['Timestamp']).sum().vbt.plot(ax=ax3)
plt.title('仓位变化')
plt.grid(True, alpha=0.3)
except:
# 如果上面的方法失败,使用备选方法
try:
portfolio.holdings.vbt.plot(ax=ax3)
plt.title('持仓价值')
plt.grid(True, alpha=0.3)
except:
plt.text(0.5, 0.5, '仓位数据无法显示', ha='center', va='center', transform=ax3.transAxes)
plt.tight_layout()
plt.show()
# 显示详细的交易记录
print("\n交易记录详情:")
try:
trades = portfolio.trades.records_readable
if len(trades) > 0:
print(trades[['Entry Index', 'Column', 'Size', 'Entry Price', 'Exit Price', 'PnL']].tail(10))
else:
print("没有交易记录")
except:
print("无法获取交易记录")
# 参数优化分析
print("\n" + "="*50)
print("参数优化分析")
print("="*50)
# 测试不同的k值
k_values = [1.0, 1.3, 1.5, 1.7, 2.0]
results = []
for k in k_values:
try:
test_strategy = PairsTradingStrategy(
price_a=smic_close,
price_b=huahong_close,
lookback_window=60,
k=k
)
test_portfolio, _ = test_strategy.backtest(initial_cash=100000)
results.append({
'k': k,
'总收益率': test_portfolio.total_return(),
'年化收益率': test_portfolio.annualized_return(),
'最大回撤': test_portfolio.max_drawdown(),
'夏普比率': test_portfolio.sharpe_ratio(),
'总交易次数': getattr(test_portfolio.stats(), 'get', lambda x: 'N/A')('Total Trades')
})
except Exception as e:
print(f"参数k={k}测试失败: {e}")
continue
if results:
results_df = pd.DataFrame(results)
print(results_df.round(4))
else:
print("所有参数测试都失败了")
# 显示价格比率的基本统计信息
print("\n价格比率统计信息:")
ratio = signals['ratio'].dropna()
print(f"均值: {ratio.mean():.4f}")
print(f"标准差: {ratio.std():.4f}")
print(f"最小值: {ratio.min():.4f}")
print(f"最大值: {ratio.max():.4f}")
print(f"当前值: {ratio.iloc[-1]:.4f}")
# 显示策略信号统计
print(f"\n策略信号统计:")
print(f"做空华虹/做多中芯信号次数: {signals['short_b_long_a'].sum()}")
print(f"做多华虹/做空中芯信号次数: {signals['long_b_short_a'].sum()}")

127
t.py Normal file
View File

@ -0,0 +1,127 @@
import vectorbt as vbt
import akshare as ak
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime
stock_00981 = ak.stock_hk_daily(symbol="00981")
stock_01347 = ak.stock_hk_daily(symbol="01347")
# 数据预处理 - 设置日期索引
def prepare_data(df):
"""准备数据,设置日期索引"""
df = df.copy()
df['date'] = pd.to_datetime(df['date'])
df = df.set_index('date')
return df
stock_00981 = prepare_data(stock_00981)
stock_01347 = prepare_data(stock_01347)
stock_00981_close = stock_00981['close']
print("中芯国际最近5个交易日收盘价:")
print(stock_00981_close.tail(5))
def custom_indicator(close, rsi_window=14, ma_window=50):
# 计算日线RSI
rsi = vbt.RSI.run(close, window=rsi_window).rsi
# 计算日线移动平均
ma = vbt.MA.run(close, window=ma_window).ma
# 确保所有数组都是numpy数组避免pandas索引问题
close_np = close.to_numpy() if hasattr(close, 'to_numpy') else close
rsi_np = rsi.to_numpy() if hasattr(rsi, 'to_numpy') else rsi
ma_np = ma.to_numpy() if hasattr(ma, 'to_numpy') else ma
# 使用numpy数组进行计算
trend = np.where(rsi_np > 70, -1, 0)
trend = np.where((rsi_np < 30) & (close_np < ma_np), 1, trend)
return trend
# 创建自定义指标 - 移除 keep_pd=True
ind = vbt.IndicatorFactory(
class_name="Combination",
short_name="comb",
input_names=["close"],
param_names=["rsi_window", "ma_window"],
output_names=["value"]
).from_apply_func(
custom_indicator,
rsi_window=14,
ma_window=50
# 移除 keep_pd=True
)
# 运行指标
res = ind.run(
stock_00981_close,
rsi_window=21,
ma_window=50
)
print("指标结果:")
print(res.value)
# 生成交易信号
entries = res.value == 1.0
exits = res.value == -1.0
print(f"买入信号数量: {entries.sum().sum()}")
print(f"卖出信号数量: {exits.sum().sum()}")
# 如果结果有多列,选择第一列
if entries.ndim > 1 and entries.shape[1] > 1:
entries = entries.iloc[:, 0]
exits = exits.iloc[:, 0]
print("使用第一组参数")
# 创建投资组合
pf = vbt.Portfolio.from_signals(stock_00981_close, entries, exits, freq='D')
# 绘图
try:
pf.plot().show()
except Exception as e:
print(f"绘图错误: {e}")
# 简化绘图
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# 价格和信号
ax1.plot(stock_00981_close.index, stock_00981_close.values, label='Close Price', color='blue')
# 买入信号
buy_signals = entries.fillna(False)
if buy_signals.any():
buy_dates = stock_00981_close.index[buy_signals]
buy_prices = stock_00981_close[buy_signals]
ax1.scatter(buy_dates, buy_prices, color='green', marker='^', s=100, label='Buy')
# 卖出信号
sell_signals = exits.fillna(False)
if sell_signals.any():
sell_dates = stock_00981_close.index[sell_signals]
sell_prices = stock_00981_close[sell_signals]
ax1.scatter(sell_dates, sell_prices, color='red', marker='v', s=100, label='Sell')
ax1.set_title('Price and Trading Signals')
ax1.legend()
ax1.grid(True)
# 投资组合价值
portfolio_value = pf.value()
ax2.plot(portfolio_value.index, portfolio_value.values, label='Portfolio Value', color='orange')
ax2.set_title('Portfolio Value')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.show()
print("-------------------")
print("投资组合统计:")
stats = pf.stats()
print(stats)
print(f"总收益率: {pf.total_return():.2%}")
print("-------------------")

Binary file not shown.

View File

@ -0,0 +1,26 @@
import vectorbt as vbt
import datetime
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days = 3)
btc_price = vbt.YFData.download(
["BTC-USD","ETH-USD","XMR-USD","ADA-USD"],
interval="1m",
start = start_date,
end = end_date,
missing_index='drop').get("Close")
print(btc_price)
rsi = vbt.RSI.run(btc_price, window =[14,21])
entries = rsi.rsi_crossed_below(30)
exits = rsi.rsi_crossed_above(70)
pf = vbt.Portfolio.from_signals(btc_price, entries, exits)
#pf.plot().show()
print(pf.total_return())

View File

@ -0,0 +1,61 @@
import vectorbt as vbt
import pandas as pd
import numpy as np
import datetime
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=2)
btc_price = vbt.YFData.download(
["BTC-USD","ETH-USD"],
missing_index='drop',
start=start_time,
end=end_time,
interval="1m").get("Close")
def custom_indicator(close, rsi_window = 14, ma_window = 50):
close_5m = close.resample("5T").last()
rsi = vbt.RSI.run(close_5m, window = rsi_window).rsi
rsi, _ = rsi.align(close,
broadcast_axis=0,
method='ffill',
join='right')
close = close.to_numpy()
rsi = rsi.to_numpy()
ma = vbt.MA.run(close, ma_window).ma.to_numpy()
trend = np.where( rsi > 70, -1, 0)
trend = np.where( (rsi < 30) & (close < ma), 1, trend)
return trend
ind = vbt.IndicatorFactory(
class_name = "Combination",
short_name = "comb",
input_names = ["close"],
param_names = ["rsi_window", "ma_window"],
output_names = ["value"]
).from_apply_func(
custom_indicator,
rsi_window = 14,
ma_window = 50,
keep_pd=True
)
res = ind.run(
btc_price,
rsi_window = 21,
ma_window = 50
)
print(res.value.to_string())
entries = res.value == 1.0
exits = res.value == -1.0
pf = vbt.Portfolio.from_signals(btc_price, entries, exits)
print(pf.total_return())

View File

@ -0,0 +1,84 @@
import vectorbt as vbt
import pandas as pd
import numpy as np
import datetime
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=2)
btc_price = vbt.YFData.download(
["BTC-USD","ETH-USD"],
missing_index='drop',
start=start_time,
end=end_time,
interval="1m").get("Close")
def custom_indicator(close, rsi_window = 14, ma_window = 50, entry = 30, exit = 70):
close_5m = close.resample("5T").last()
rsi = vbt.RSI.run(close_5m, window = rsi_window).rsi
rsi, _ = rsi.align(close,
broadcast_axis=0,
method='ffill',
join='right')
close = close.to_numpy()
rsi = rsi.to_numpy()
ma = vbt.MA.run(close, ma_window).ma.to_numpy()
trend = np.where( rsi > exit, -1, 0)
trend = np.where( (rsi < entry) & (close < ma), 1, trend)
return trend
ind = vbt.IndicatorFactory(
class_name = "Combination",
short_name = "comb",
input_names = ["close"],
param_names = ["rsi_window", "ma_window","entry","exit"],
output_names = ["value"]
).from_apply_func(
custom_indicator,
rsi_window = 14,
ma_window = 50,
entry = 30,
exit = 70,
keep_pd=True
)
res = ind.run(
btc_price,
rsi_window = np.arange(10,40,step=3,dtype=int),
#ma_window = np.arange(20,200,step=20,dtype=int),
entry = np.arange(10,40,step=4,dtype=int),
exit = np.arange(60,85,step=4,dtype=int),
param_product = True
)
#print(res.value.to_string())
entries = res.value == 1.0
exits = res.value == -1.0
pf = vbt.Portfolio.from_signals(btc_price, entries, exits)
returns = pf.total_return()
#returns = returns[ returns.index.isin(["BTC-USD"], level="symbol")]
#returns = returns.groupby(level=["comb_exit","comb_entry","symbol"]).mean()
print(returns.to_string())
print(returns.max())
print(returns.idxmax())
"""comb_rsi_window comb_ma_window"""
fig = returns.vbt.volume(
x_level = "comb_rsi_window",
y_level = "comb_entry",
z_level = "comb_exit",
slider_level = "symbol",
)
fig.show()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,74 @@
import vectorbt as vbt
import pandas as pd
import numpy as np
import datetime
import talib
from numba import njit
end_time = datetime.datetime.now()
start_time = end_time - datetime.timedelta(days=2)
btc_price = pd.read_csv("data.csv")
btc_price["Datetime"] = pd.to_datetime(btc_price["Datetime"])
btc_price.set_index("Datetime", inplace=True)
print(btc_price)
RSI = vbt.IndicatorFactory.from_talib('RSI')
@njit
def produce_signal(rsi, entry, exit):
trend = np.where( rsi > exit, -1, 0)
trend = np.where( (rsi < entry) , 1, trend)
return trend
def custom_indicator(close, rsi_window = 14, entry = 30, exit = 70):
rsi = RSI.run(close, rsi_window).real.to_numpy()
return produce_signal(rsi, entry, exit)
ind = vbt.IndicatorFactory(
class_name = "Combination",
short_name = "comb",
input_names = ["close"],
param_names = ["rsi_window","entry","exit"],
output_names = ["value"]
).from_apply_func(
custom_indicator,
rsi_window = 14,
entry = 30,
exit = 70,
)
rsi_windows = np.arange(10,40,step=1,dtype=int)
entries = np.arange(10,40,step=1,dtype=int)
master_returns = []
for window in rsi_windows:
res = ind.run(
btc_price,
rsi_window = window,
entry = entry,
exit = np.arange(60,85,step=1,dtype=int),
param_product = True
)
entries = res.value == 1.0
exits = res.value == -1.0
pf = vbt.Portfolio.from_signals(btc_price, entries, exits)
master_returns.append(pf.total_return())
print(master_returns)
returns = pd.concat(master_returns)
#returns = returns[ returns.index.isin(["BTC-USD"], level="symbol")]
#returns = returns.groupby(level=["comb_exit","comb_entry","symbol"]).mean()
print(returns.to_string())
print(returns.max())
print(returns.idxmax())

View File

@ -0,0 +1,92 @@
import vectorbt as vbt
import datetime
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days = 3)
vbt.settings.set_theme("seaborn")
vbt.settings['plotting']['layout']['width'] = 1200
vbt.settings['plotting']['layout']['height'] = 600
btc_price = vbt.YFData.download(
["BTC-USD"],
interval="1m",
start = start_date,
end = end_date,
missing_index='drop').get("Close")
print(btc_price)
fast_ma = vbt.MA.run(btc_price, window =50)
slow_ma = vbt.MA.run(btc_price, window =200)
entries = fast_ma.ma_crossed_above(slow_ma)
exits = fast_ma.ma_crossed_below(slow_ma)
pf = vbt.Portfolio.from_signals(btc_price, entries, exits)
fig = pf.plot(subplots = [
('price', dict(
title='Price',
yaxis_kwargs=dict(title='Price')
)),
('price', dict(
title='Price',
yaxis_kwargs=dict(title='Price')
)),
'orders',
'trade_pnl',
'cum_returns',
'drawdowns'],
make_subplots_kwargs=dict(rows=10, cols=2))
scatter = vbt.plotting.Scatter(
data = btc_price,
x_labels = btc_price.index,
trace_names = ["Price"],
trace_kwargs=dict(line=dict(color='red')),
add_trace_kwargs=dict(row=1,col=1),
fig=fig)
fast_ma_scatter = vbt.plotting.Scatter(
data = fast_ma.ma,
x_labels = fast_ma.ma.index,
trace_names = ["Fast_ma"],
trace_kwargs=dict(line=dict(color='green')),
add_trace_kwargs=dict(row=1,col=2),
fig=fig)
slow_ma_scatter = vbt.plotting.Scatter(
data = slow_ma.ma,
x_labels = slow_ma.ma.index,
trace_names = ["slow_ma"],
trace_kwargs=dict(line=dict(color='blue')),
add_trace_kwargs=dict(row=1,col=2),
fig=fig)
entries_plot = entries.vbt.signals.plot_as_entry_markers(
slow_ma.ma,
add_trace_kwargs=dict(row=1,col=2),
fig=fig)
exits_plot = exits.vbt.signals.plot_as_exit_markers(
slow_ma.ma,
add_trace_kwargs=dict(row=1,col=2),
fig=fig)
fig.add_hline( y= 38000,
line_color="#FFFFFF",
row=1,
col = 2,
line_width=20)
"""
fig = btc_price.vbt.plot(trace_kwargs=dict(name='Price',line=dict(color='red')))
fig = fast_ma.ma.vbt.plot(trace_kwargs=dict(name='Fast_ma',line=dict(color='blue')), fig=fig)
fig = slow_ma.ma.vbt.plot(trace_kwargs=dict(name='Slow_ma',line=dict(color='green')), fig=fig)
fig = entries.vbt.signals.plot_as_entry_markers(btc_price, fig=fig)
fig = exits.vbt.signals.plot_as_exit_markers(btc_price, fig=fig)
"""
fig.show()

View File

@ -0,0 +1,33 @@
import vectorbt as vbt
import datetime
end_date = datetime.datetime.now()
start_date = end_date - datetime.timedelta(days = 1)
btc_price = vbt.YFData.download(
"BTC-USD",
interval="1m",
start = start_date,
end = end_date,
missing_index='drop').get("Close")
print(btc_price)
rsi = vbt.RSI.run(btc_price, window =21)
entries = rsi.rsi_crossed_below(30)
exits = rsi.rsi_crossed_above(70)
pf = vbt.Portfolio.from_signals(
btc_price,
entries=entries2,
exits=exit2,
short_exits=entries,
short_entries=exits,
upon_dir_conflict=vbt.portfolio.enums.DirectionConflictMode.Short,
)
pf.plot().show()
#print(pf.total_return())

View File

@ -0,0 +1,97 @@
import pandas as pd
import vectorbt as vbt
import numpy as np
import datetime
now = datetime.datetime.now()
before = now - datetime.timedelta(days=3)
btc_price = vbt.YFData.download(
"BTC-USD",
missing_index="drop",
interval="1m",
start=before.timestamp(),
end=now.timestamp()).get("Close")
btc_price, range_indexes = btc_price.vbt.range_split(n=100, range_len=1440)
# RSI SECTION
def optimize_rsi(close, window, entry, exit):
rsi = vbt.IndicatorFactory.from_talib("RSI").run(close, timeperiod=window).real
return rsi < entry, rsi > exit
rsi_ind = vbt.IndicatorFactory(
class_name="optimizeRsi",
short_name="rsi",
input_names=["close"],
param_names=["window","entry","exit"],
output_names=["entries","exits"]
).from_apply_func(
optimize_rsi,
window=14,
entry=30,
exit=70)
step_size =10
entries = np.arange(10,45, step=step_size, dtype =int)
exits = np.arange(55,95, step=step_size, dtype =int)
windows = np.arange(10,45, step=step_size, dtype =int)
rsi_res = rsi_ind.run(
btc_price,
window=14,
entry=20,
exit=80,
)
rsi_entries = rsi_res.entries
rsi_exits = rsi_res.exits
rsi_exits.iloc[-1, :] = True
rsi_pf = vbt.Portfolio.from_signals(btc_price, rsi_entries, rsi_exits, freq="1T")
rsi_tot_returns = rsi_pf.total_return()
def random_signal(close):
return np.random.randint(0,2, size=close.shape)
rand_ind = vbt.IndicatorFactory(
class_name="Random",
short_name="rand",
input_names=["close"],
output_names=["signal"]
).from_apply_func(
random_signal)
rand_res = rand_ind.run(
btc_price
)
rand_entries = rand_res.signal == 1
rand_exits = rand_res.signal == 0
rand_exits.iloc[-1, :] = True
rand_pf = vbt.Portfolio.from_signals(btc_price, rand_entries, rand_exits, freq="1T")
rand_tot_returns = rand_pf.total_return()
df = pd.DataFrame({
"rsi":list(rsi_tot_returns),
"rand":list(rand_tot_returns)
})
print(df.median())
box = vbt.plotting.Box(
data = df,
trace_names=["rsi","rand"])
box.fig.show()

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2024 Chad Thackray
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,2 @@
# vectorbt-for-beginners-2022
All of the code from my video "Vectorbt for beginners - Full Python Course"