未命名
PTD
阅读:11
2025-09-09 16:45:04
评论:0
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import tkinter as tk
from tkinter import ttk, messagebox
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
class TickBacktester:
def __init__(self, initial_capital=100000, commission=0.001):
self.initial_capital = initial_capital
self.commission = commission
self.reset()
def reset(self):
self.capital = self.initial_capital
self.position = 0
self.trades = []
self.equity_curve = []
self.current_time = None
def load_data(self, data):
self.data = data.copy()
self.data['timestamp'] = pd.to_datetime(self.data['timestamp'])
self.data = self.data.set_index('timestamp')
def run_backtest(self, strategy):
self.reset()
signals = strategy.calculate_signals(self.data)
for i, (timestamp, row) in enumerate(self.data.iterrows()):
self.current_time = timestamp
price = row['price']
current_equity = self.capital + self.position * price
self.equity_curve.append({
'timestamp': timestamp,
'equity': current_equity,
'price': price
})
if timestamp in signals.index:
signal = signals.loc[timestamp]
if signal['signal'] == 'buy' and self.capital > 0:
max_shares = self.capital // (price * (1 + self.commission))
if max_shares > 0:
cost = max_shares * price * (1 + self.commission)
self.capital -= cost
self.position += max_shares
self.trades.append({
'timestamp': timestamp,
'type': 'buy',
'price': price,
'shares': max_shares,
'value': cost,
'equity': current_equity
})
elif signal['signal'] == 'sell' and self.position > 0:
proceeds = self.position * price * (1 - self.commission)
self.capital += proceeds
self.trades.append({
'timestamp': timestamp,
'type': 'sell',
'price': price,
'shares': self.position,
'value': proceeds,
'equity': current_equity
})
self.position = 0
self.equity_curve = pd.DataFrame(self.equity_curve)
self.equity_curve = self.equity_curve.set_index('timestamp')
self.trades = pd.DataFrame(self.trades)
if not self.trades.empty:
self.trades = self.trades.set_index('timestamp')
def get_performance(self):
if self.equity_curve.empty:
return {}
equity = self.equity_curve['equity']
returns = equity.pct_change().dropna()
total_return = (equity.iloc[-1] - self.initial_capital) / self.initial_capital
sharpe_ratio = np.sqrt(252 * 24 * 60) * returns.mean() / returns.std() if len(returns) > 1 else 0
max_drawdown = (equity.cummax() - equity).max() / equity.cummax().max()
if not self.trades.empty:
buy_trades = self.trades[self.trades['type'] == 'buy']
sell_trades = self.trades[self.trades['type'] == 'sell']
win_rate = len(sell_trades[sell_trades['value'] > buy_trades['value'].values[:len(sell_trades)]]) / len(sell_trades) if len(sell_trades) > 0 else 0
else:
win_rate = 0
return {
'总收益率': total_return,
'夏普比率': sharpe_ratio,
'最大回撤': max_drawdown,
'胜率': win_rate,
'交易次数': len(self.trades) if not self.trades.empty else 0
}
# 策略基类
class BaseStrategy:
def __init__(self, name):
self.name = name
def calculate_signals(self, data):
raise NotImplementedError("子类必须实现calculate_signals方法")
# 1. 移动平均线交叉策略
class MovingAverageCrossoverStrategy(BaseStrategy):
def __init__(self, short_window=5, long_window=15):
super().__init__("移动平均线交叉策略")
self.short_window = short_window
self.long_window = long_window
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['short_ma'] = signals['price'].rolling(window=self.short_window).mean()
signals['long_ma'] = signals['price'].rolling(window=self.long_window).mean()
signals['signal'] = np.where(
signals['short_ma'] > signals['long_ma'], 'buy', 'sell'
)
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change']]
return signals[['signal']]
# 2. RSI策略
class RSIStrategy(BaseStrategy):
def __init__(self, window=14, oversold=30, overbought=70):
super().__init__("RSI策略")
self.window = window
self.oversold = oversold
self.overbought = overbought
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
delta = signals['price'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=self.window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=self.window).mean()
rs = gain / loss
signals['rsi'] = 100 - (100 / (1 + rs))
signals['signal'] = 'hold'
signals.loc[signals['rsi'] < self.oversold, 'signal'] = 'buy'
signals.loc[signals['rsi'] > self.overbought, 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 3. 布林带策略
class BollingerBandsStrategy(BaseStrategy):
def __init__(self, window=20, num_std=2):
super().__init__("布林带策略")
self.window = window
self.num_std = num_std
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['ma'] = signals['price'].rolling(window=self.window).mean()
signals['std'] = signals['price'].rolling(window=self.window).std()
signals['upper_band'] = signals['ma'] + (signals['std'] * self.num_std)
signals['lower_band'] = signals['ma'] - (signals['std'] * self.num_std)
signals['signal'] = 'hold'
signals.loc[signals['price'] < signals['lower_band'], 'signal'] = 'buy'
signals.loc[signals['price'] > signals['upper_band'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 4. MACD策略
class MACDStrategy(BaseStrategy):
def __init__(self, fast=12, slow=26, signal=9):
super().__init__("MACD策略")
self.fast = fast
self.slow = slow
self.signal = signal
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
exp1 = signals['price'].ewm(span=self.fast).mean()
exp2 = signals['price'].ewm(span=self.slow).mean()
signals['macd'] = exp1 - exp2
signals['signal_line'] = signals['macd'].ewm(span=self.signal).mean()
signals['signal'] = 'hold'
signals.loc[signals['macd'] > signals['signal_line'], 'signal'] = 'buy'
signals.loc[signals['macd'] < signals['signal_line'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 5. 随机震荡指标策略
class StochasticOscillatorStrategy(BaseStrategy):
def __init__(self, k_window=14, d_window=3, oversold=20, overbought=80):
super().__init__("随机震荡指标策略")
self.k_window = k_window
self.d_window = d_window
self.oversold = oversold
self.overbought = overbought
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
low_min = signals['price'].rolling(window=self.k_window).min()
high_max = signals['price'].rolling(window=self.k_window).max()
signals['%K'] = 100 * (signals['price'] - low_min) / (high_max - low_min)
signals['%D'] = signals['%K'].rolling(window=self.d_window).mean()
signals['signal'] = 'hold'
signals.loc[signals['%K'] < self.oversold, 'signal'] = 'buy'
signals.loc[signals['%K'] > self.overbought, 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 6. 乖离率策略
class BIASStrategy(BaseStrategy):
def __init__(self, window=20, buy_threshold=-2, sell_threshold=2):
super().__init__("乖离率策略")
self.window = window
self.buy_threshold = buy_threshold
self.sell_threshold = sell_threshold
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
ma = signals['price'].rolling(window=self.window).mean()
signals['bias'] = (signals['price'] - ma) / ma * 100
signals['signal'] = 'hold'
signals.loc[signals['bias'] < self.buy_threshold, 'signal'] = 'buy'
signals.loc[signals['bias'] > self.sell_threshold, 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 7. 动量策略
class MomentumStrategy(BaseStrategy):
def __init__(self, window=10, buy_threshold=0, sell_threshold=0):
super().__init__("动量策略")
self.window = window
self.buy_threshold = buy_threshold
self.sell_threshold = sell_threshold
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['momentum'] = signals['price'] - signals['price'].shift(self.window)
signals['signal'] = 'hold'
signals.loc[signals['momentum'] > self.buy_threshold, 'signal'] = 'buy'
signals.loc[signals['momentum'] < self.sell_threshold, 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 8. 平均真实范围策略
class ATRStrategy(BaseStrategy):
def __init__(self, window=14, multiplier=2):
super().__init__("ATR策略")
self.window = window
self.multiplier = multiplier
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
high_low = signals['price'] - signals['price'].shift(1)
high_close = (signals['price'] - signals['price'].shift(1)).abs()
low_close = (signals['price'].shift(1) - signals['price']).abs()
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
signals['atr'] = tr.rolling(window=self.window).mean()
signals['upper_band'] = signals['price'] + self.multiplier * signals['atr']
signals['lower_band'] = signals['price'] - self.multiplier * signals['atr']
signals['signal'] = 'hold'
signals.loc[signals['price'] > signals['upper_band'], 'signal'] = 'sell'
signals.loc[signals['price'] < signals['lower_band'], 'signal'] = 'buy'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 9. 威廉指标策略
class WilliamsRStrategy(BaseStrategy):
def __init__(self, window=14, oversold=-80, overbought=-20):
super().__init__("威廉指标策略")
self.window = window
self.oversold = oversold
self.overbought = overbought
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
highest_high = signals['price'].rolling(window=self.window).max()
lowest_low = signals['price'].rolling(window=self.window).min()
signals['williams_r'] = -100 * (highest_high - signals['price']) / (highest_high - lowest_low)
signals['signal'] = 'hold'
signals.loc[signals['williams_r'] < self.oversold, 'signal'] = 'buy'
signals.loc[signals['williams_r'] > self.overbought, 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 10. 顺势指标策略
class CCIStrategy(BaseStrategy):
def __init__(self, window=20, oversold=-100, overbought=100):
super().__init__("CCI策略")
self.window = window
self.oversold = oversold
self.overbought = overbought
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
typical_price = signals['price']
sma = typical_price.rolling(window=self.window).mean()
mad = typical_price.rolling(window=self.window).apply(lambda x: np.mean(np.abs(x - np.mean(x))))
signals['cci'] = (typical_price - sma) / (0.015 * mad)
signals['signal'] = 'hold'
signals.loc[signals['cci'] < self.oversold, 'signal'] = 'buy'
signals.loc[signals['cci'] > self.overbought, 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 11. 价格通道策略
class PriceChannelStrategy(BaseStrategy):
def __init__(self, window=20):
super().__init__("价格通道策略")
self.window = window
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['upper_channel'] = signals['price'].rolling(window=self.window).max()
signals['lower_channel'] = signals['price'].rolling(window=self.window).min()
signals['signal'] = 'hold'
signals.loc[signals['price'] > signals['upper_channel'], 'signal'] = 'buy'
signals.loc[signals['price'] < signals['lower_channel'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 12. 抛物线转向指标策略
class ParabolicSARStrategy(BaseStrategy):
def __init__(self, af=0.02, max_af=0.2):
super().__init__("抛物线转向指标策略")
self.af = af
self.max_af = max_af
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
# 简化版的SAR计算
high = signals['price']
low = signals['price']
sar = [low.iloc[0]]
trend = 1
ep = high.iloc[0]
af = self.af
for i in range(1, len(signals)):
if trend == 1:
sar.append(sar[-1] + af * (ep - sar[-1]))
if high.iloc[i] > ep:
ep = high.iloc[i]
af = min(af + self.af, self.max_af)
if low.iloc[i] < sar[-1]:
trend = -1
sar[-1] = ep
ep = low.iloc[i]
af = self.af
else:
sar.append(sar[-1] + af * (ep - sar[-1]))
if low.iloc[i] < ep:
ep = low.iloc[i]
af = min(af + self.af, self.max_af)
if high.iloc[i] > sar[-1]:
trend = 1
sar[-1] = ep
ep = high.iloc[i]
af = self.af
signals['sar'] = sar
signals['signal'] = 'hold'
signals.loc[signals['price'] > signals['sar'], 'signal'] = 'buy'
signals.loc[signals['price'] < signals['sar'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 13. 成交量加权平均价格策略
class VWAPStrategy(BaseStrategy):
def __init__(self, window=20):
super().__init__("VWAP策略")
self.window = window
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['volume'] = data.get('volume', 1) # 如果没有成交量数据,默认设为1
typical_price = signals['price']
signals['vwap'] = (typical_price * signals['volume']).rolling(window=self.window).sum() / signals['volume'].rolling(window=self.window).sum()
signals['signal'] = 'hold'
signals.loc[signals['price'] > signals['vwap'], 'signal'] = 'buy'
signals.loc[signals['price'] < signals['vwap'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 14. 动态动量指数策略
class DMIStrategy(BaseStrategy):
def __init__(self, window=14):
super().__init__("DMI策略")
self.window = window
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
# 简化版的DMI计算
up_move = signals['price'].diff()
down_move = -signals['price'].diff()
plus_dm = up_move.where(up_move > down_move, 0)
minus_dm = down_move.where(down_move > up_move, 0)
atr = (signals['price'] - signals['price'].shift(1)).abs().rolling(window=self.window).mean()
signals['plus_di'] = 100 * plus_dm.rolling(window=self.window).mean() / atr
signals['minus_di'] = 100 * minus_dm.rolling(window=self.window).mean() / atr
signals['signal'] = 'hold'
signals.loc[signals['plus_di'] > signals['minus_di'], 'signal'] = 'buy'
signals.loc[signals['plus_di'] < signals['minus_di'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 15. 价格波动率策略
class VolatilityStrategy(BaseStrategy):
def __init__(self, window=20, threshold=0.01):
super().__init__("价格波动率策略")
self.window = window
self.threshold = threshold
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
returns = signals['price'].pct_change()
signals['volatility'] = returns.rolling(window=self.window).std()
signals['signal'] = 'hold'
signals.loc[signals['volatility'] > self.threshold, 'signal'] = 'sell'
signals.loc[signals['volatility'] < self.threshold/2, 'signal'] = 'buy'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 16. 价格突破策略
class BreakoutStrategy(BaseStrategy):
def __init__(self, window=20):
super().__init__("价格突破策略")
self.window = window
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['high'] = signals['price'].rolling(window=self.window).max()
signals['low'] = signals['price'].rolling(window=self.window).min()
signals['signal'] = 'hold'
signals.loc[signals['price'] > signals['high'].shift(1), 'signal'] = 'buy'
signals.loc[signals['price'] < signals['low'].shift(1), 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 17. 价格回归策略
class MeanReversionStrategy(BaseStrategy):
def __init__(self, window=20, num_std=2):
super().__init__("价格回归策略")
self.window = window
self.num_std = num_std
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['ma'] = signals['price'].rolling(window=self.window).mean()
signals['std'] = signals['price'].rolling(window=self.window).std()
signals['signal'] = 'hold'
signals.loc[signals['price'] < signals['ma'] - self.num_std * signals['std'], 'signal'] = 'buy'
signals.loc[signals['price'] > signals['ma'] + self.num_std * signals['std'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 18. 三重指数移动平均策略
class TRIXStrategy(BaseStrategy):
def __init__(self, window=15, signal_line=9):
super().__init__("TRIX策略")
self.window = window
self.signal_line = signal_line
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
ema1 = signals['price'].ewm(span=self.window).mean()
ema2 = ema1.ewm(span=self.window).mean()
ema3 = ema2.ewm(span=self.window).mean()
signals['trix'] = ema3.pct_change() * 100
signals['signal_line'] = signals['trix'].ewm(span=self.signal_line).mean()
signals['signal'] = 'hold'
signals.loc[signals['trix'] > signals['signal_line'], 'signal'] = 'buy'
signals.loc[signals['trix'] < signals['signal_line'], 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 19. 资金流量指标策略
class MFIStrategy(BaseStrategy):
def __init__(self, window=14, oversold=20, overbought=80):
super().__init__("资金流量指标策略")
self.window = window
self.oversold = oversold
self.overbought = overbought
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
signals['volume'] = data.get('volume', 1)
typical_price = signals['price']
raw_money_flow = typical_price * signals['volume']
positive_flow = raw_money_flow.where(typical_price > typical_price.shift(1), 0)
negative_flow = raw_money_flow.where(typical_price < typical_price.shift(1), 0)
positive_mf = positive_flow.rolling(window=self.window).sum()
negative_mf = negative_flow.rolling(window=self.window).sum()
signals['mfi'] = 100 * positive_mf / (positive_mf + negative_mf)
signals['signal'] = 'hold'
signals.loc[signals['mfi'] < self.oversold, 'signal'] = 'buy'
signals.loc[signals['mfi'] > self.overbought, 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 20. 相对强弱指数策略(改进版)
class AdvancedRSIStrategy(BaseStrategy):
def __init__(self, window=14, oversold=30, overbought=70, ma_window=10):
super().__init__("改进RSI策略")
self.window = window
self.oversold = oversold
self.overbought = overbought
self.ma_window = ma_window
def calculate_signals(self, data):
signals = pd.DataFrame(index=data.index)
signals['price'] = data['price']
delta = signals['price'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=self.window).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=self.window).mean()
rs = gain / loss
signals['rsi'] = 100 - (100 / (1 + rs))
# 添加移动平均线过滤
signals['ma'] = signals['price'].rolling(window=self.ma_window).mean()
signals['signal'] = 'hold'
signals.loc[(signals['rsi'] < self.oversold) & (signals['price'] > signals['ma']), 'signal'] = 'buy'
signals.loc[(signals['rsi'] > self.overbought) & (signals['price'] < signals['ma']), 'signal'] = 'sell'
signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
return signals[['signal']]
# 生成示例tick数据
def generate_sample_data(num_ticks=1000, start_price=100, volatility=0.001):
start_time = datetime.now() - timedelta(hours=1)
timestamps = [start_time + timedelta(seconds=i) for i in range(num_ticks)]
prices = [start_price]
for i in range(1, num_ticks):
change = np.random.normal(0, volatility)
prices.append(prices[-1] * (1 + change))
volumes = np.random.randint(100, 1000, num_ticks)
data = pd.DataFrame({
'timestamp': timestamps,
'price': prices,
'volume': volumes
})
return data
# Tkinter GUI应用
class BacktestApp:
def __init__(self, root):
self.root = root
self.root.title("Tick级别回测系统 - 20种策略")
self.root.geometry("1400x900")
self.backtester = TickBacktester()
self.data = generate_sample_data()
# 20种策略
self.strategies = {
"移动平均线交叉": MovingAverageCrossoverStrategy(5, 15),
"RSI策略": RSIStrategy(14, 30, 70),
"布林带策略": BollingerBandsStrategy(20, 2),
"MACD策略": MACDStrategy(12, 26, 9),
"随机震荡指标": StochasticOscillatorStrategy(14, 3, 20, 80),
"乖离率策略": BIASStrategy(20, -2, 2),
"动量策略": MomentumStrategy(10, 0, 0),
"ATR策略": ATRStrategy(14, 2),
"威廉指标": WilliamsRStrategy(14, -80, -20),
"CCI策略": CCIStrategy(20, -100, 100),
"价格通道": PriceChannelStrategy(20),
"抛物线转向": ParabolicSARStrategy(0.02, 0.2),
"VWAP策略": VWAPStrategy(20),
"DMI策略": DMIStrategy(14),
"价格波动率": VolatilityStrategy(20, 0.01),
"价格突破": BreakoutStrategy(20),
"价格回归": MeanReversionStrategy(20, 2),
"TRIX策略": TRIXStrategy(15, 9),
"资金流量指标": MFIStrategy(14, 20, 80),
"改进RSI策略": AdvancedRSIStrategy(14, 30, 70, 10)
}
self.setup_ui()
def setup_ui(self):
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
main_frame.rowconfigure(1, weight=1)
# 控制面板
control_frame = ttk.LabelFrame(main_frame, text="回测控制", padding="5")
control_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
# 策略选择
ttk.Label(control_frame, text="选择策略:").grid(row=0, column=0, sticky=tk.W, padx=(0, 5))
self.strategy_var = tk.StringVar()
strategy_combo = ttk.Combobox(control_frame, textvariable=self.strategy_var,
values=list(self.strategies.keys()), width=20)
strategy_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10))
strategy_combo.current(0)
# 初始资金
ttk.Label(control_frame, text="初始资金:").grid(row=0, column=2, sticky=tk.W, padx=(0, 5))
self.capital_var = tk.StringVar(value="100000")
capital_entry = ttk.Entry(control_frame, textvariable=self.capital_var, width=10)
capital_entry.grid(row=0, column=3, sticky=(tk.W, tk.E), padx=(0, 10))
# 佣金比例
ttk.Label(control_frame, text="佣金比例:").grid(row=0, column=4, sticky=tk.W, padx=(0, 5))
self.commission_var = tk.StringVar(value="0.001")
commission_entry = ttk.Entry(control_frame, textvariable=self.commission_var, width=10)
commission_entry.grid(row=0, column=5, sticky=(tk.W, tk.E), padx=(0, 10))
# 运行回测按钮
run_btn = ttk.Button(control_frame, text="运行回测", command=self.run_backtest)
run_btn.grid(row=0, column=6, padx=(10, 0))
# 图表框架
chart_frame = ttk.Frame(main_frame)
chart_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S))
self.fig = Figure(figsize=(12, 8), dpi=100)
self.canvas = FigureCanvasTkAgg(self.fig, master=chart_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# 性能指标框架
metrics_frame = ttk.LabelFrame(main_frame, text="性能指标", padding="5")
metrics_frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(10, 0))
# 性能指标标签
self.metrics_vars = {}
metrics = ['总收益率', '夏普比率', '最大回撤', '胜率', '交易次数']
for i, metric in enumerate(metrics):
ttk.Label(metrics_frame, text=f"{metric}:").grid(row=0, column=i*2, sticky=tk.W, padx=(10, 5))
self.metrics_vars[metric] = tk.StringVar(value="N/A")
ttk.Label(metrics_frame, textvariable=self.metrics_vars[metric]).grid(row=0, column=i*2+1, sticky=tk.W, padx=(0, 20))
# 交易记录框架
trades_frame = ttk.LabelFrame(main_frame, text="交易记录", padding="5")
trades_frame.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(10, 0))
# 交易记录表格
columns = ('时间', '类型', '价格', '数量', '金额', '权益')
self.trades_tree = ttk.Treeview(trades_frame, columns=columns, show='headings', height=6)
for col in columns:
self.trades_tree.heading(col, text=col)
self.trades_tree.column(col, width=100)
# 添加滚动条
scrollbar = ttk.Scrollbar(trades_frame, orient=tk.VERTICAL, command=self.trades_tree.yview)
self.trades_tree.configure(yscrollcommand=scrollbar.set)
self.trades_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# 配置权重
trades_frame.columnconfigure(0, weight=1)
trades_frame.rowconfigure(0, weight=1)
main_frame.rowconfigure(3, weight=1)
def run_backtest(self):
"""运行回测并更新界面"""
try:
# 获取参数
initial_capital = float(self.capital_var.get())
commission = float(self.commission_var.get())
strategy_name = self.strategy_var.get()
# 更新回测引擎参数
self.backtester.initial_capital = initial_capital
self.backtester.commission = commission
self.backtester.reset()
# 加载数据
self.backtester.load_data(self.data)
# 运行策略
strategy = self.strategies[strategy_name]
self.backtester.run_backtest(strategy)
# 更新性能指标
performance = self.backtester.get_performance()
for metric, value in performance.items():
if metric in ['总收益率', '最大回撤', '胜率']:
self.metrics_vars[metric].set(f"{value:.2%}")
elif metric == '夏普比率':
self.metrics_vars[metric].set(f"{value:.2f}")
else:
self.metrics_vars[metric].set(f"{value}")
# 更新交易记录
self.update_trades_table()
# 更新图表
self.update_charts()
except Exception as e:
messagebox.showerror("错误", f"回测过程中发生错误: {str(e)}")
def update_trades_table(self):
"""更新交易记录表格"""
# 清空现有数据
for item in self.trades_tree.get_children():
self.trades_tree.delete(item)
# 添加新数据
if not self.backtester.trades.empty:
for timestamp, trade in self.backtester.trades.iterrows():
self.trades_tree.insert('', 'end', values=(
timestamp.strftime('%H:%M:%S'),
trade['type'],
f"{trade['price']:.2f}",
f"{trade['shares']:.0f}",
f"{trade['value']:.0f}",
f"{trade['equity']:.0f}"
))
def update_charts(self):
"""更新图表"""
self.fig.clear()
if self.backtester.equity_curve.empty:
return
# 创建两个子图
ax1 = self.fig.add_subplot(211)
ax2 = self.fig.add_subplot(212)
# 绘制价格和买卖点
ax1.plot(self.backtester.equity_curve.index, self.backtester.equity_curve['price'],
label='价格', linewidth=1, alpha=0.7)
if not self.backtester.trades.empty:
buy_trades = self.backtester.trades[self.backtester.trades['type'] == 'buy']
sell_trades = self.backtester.trades[self.backtester.trades['type'] == 'sell']
ax1.scatter(buy_trades.index, buy_trades['price'],
color='green', marker='^', s=100, label='买入', alpha=0.7)
ax1.scatter(sell_trades.index, sell_trades['price'],
color='red', marker='v', s=100, label='卖出', alpha=0.7)
# 添加交易金额标注
for i, trade in buy_trades.iterrows():
ax1.annotate(f"{trade['value']:.0f}",
(i, trade['price']),
xytext=(0, 10), textcoords='offset points',
fontsize=8, color='green', ha='center')
for i, trade in sell_trades.iterrows():
ax1.annotate(f"{trade['value']:.0f}",
(i, trade['price']),
xytext=(0, -15), textcoords='offset points',
fontsize=8, color='red', ha='center')
ax1.set_title('价格走势与交易信号')
ax1.set_ylabel('价格')
ax1.legend()
ax1.grid(True, alpha=0.3)
# 绘制权益曲线
ax2.plot(self.backtester.equity_curve.index, self.backtester.equity_curve['equity'],
label='权益曲线', linewidth=2, color='blue')
ax2.axhline(y=self.backtester.initial_capital, color='gray', linestyle='--', alpha=0.7, label='初始资金')
ax2.set_title('权益曲线')
ax2.set_ylabel('权益')
ax2.set_xlabel('时间')
ax2.legend()
ax2.grid(True, alpha=0.3)
# 格式化x轴日期显示
for ax in [ax1, ax2]:
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
ax.xaxis.set_major_locator(mdates.AutoDateLocator())
self.fig.tight_layout()
self.canvas.draw()
# 主程序
if __name__ == "__main__":
root = tk.Tk()
app = BacktestApp(root)
root.mainloop()
本文由 海星量化研究所 作者提供,转载请保留链接和署名!网址:http://qmt.hxquant.com/?id=49
声明
1.本站原创文章,转载需注明文章作者来源。 2.如果文章内容涉及版权问题,请联系我们删除,向本站投稿文章可能会经我们编辑修改。 3.本站对信息准确性或完整性不作保证,亦不对因使用该等信息而引发或可能引发的损失承担任何责任。