未命名

PTD 阅读:11 2025-09-09 16:45:04 评论:0
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import tkinter as tk
from tkinter import ttk, messagebox
import matplotlib.dates as mdates
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

class TickBacktester:
    def __init__(self, initial_capital=100000, commission=0.001):
        self.initial_capital = initial_capital
        self.commission = commission
        self.reset()
        
    def reset(self):
        self.capital = self.initial_capital
        self.position = 0
        self.trades = []
        self.equity_curve = []
        self.current_time = None
        
    def load_data(self, data):
        self.data = data.copy()
        self.data['timestamp'] = pd.to_datetime(self.data['timestamp'])
        self.data = self.data.set_index('timestamp')
        
    def run_backtest(self, strategy):
        self.reset()
        signals = strategy.calculate_signals(self.data)
        
        for i, (timestamp, row) in enumerate(self.data.iterrows()):
            self.current_time = timestamp
            price = row['price']
            
            current_equity = self.capital + self.position * price
            self.equity_curve.append({
                'timestamp': timestamp,
                'equity': current_equity,
                'price': price
            })
            
            if timestamp in signals.index:
                signal = signals.loc[timestamp]
                
                if signal['signal'] == 'buy' and self.capital > 0:
                    max_shares = self.capital // (price * (1 + self.commission))
                    if max_shares > 0:
                        cost = max_shares * price * (1 + self.commission)
                        self.capital -= cost
                        self.position += max_shares
                        
                        self.trades.append({
                            'timestamp': timestamp,
                            'type': 'buy',
                            'price': price,
                            'shares': max_shares,
                            'value': cost,
                            'equity': current_equity
                        })
                
                elif signal['signal'] == 'sell' and self.position > 0:
                    proceeds = self.position * price * (1 - self.commission)
                    self.capital += proceeds
                    
                    self.trades.append({
                        'timestamp': timestamp,
                        'type': 'sell',
                        'price': price,
                        'shares': self.position,
                        'value': proceeds,
                        'equity': current_equity
                    })
                    self.position = 0
        
        self.equity_curve = pd.DataFrame(self.equity_curve)
        self.equity_curve = self.equity_curve.set_index('timestamp')
        self.trades = pd.DataFrame(self.trades)
        if not self.trades.empty:
            self.trades = self.trades.set_index('timestamp')
        
    def get_performance(self):
        if self.equity_curve.empty:
            return {}
            
        equity = self.equity_curve['equity']
        returns = equity.pct_change().dropna()
        
        total_return = (equity.iloc[-1] - self.initial_capital) / self.initial_capital
        sharpe_ratio = np.sqrt(252 * 24 * 60) * returns.mean() / returns.std() if len(returns) > 1 else 0
        max_drawdown = (equity.cummax() - equity).max() / equity.cummax().max()
        
        if not self.trades.empty:
            buy_trades = self.trades[self.trades['type'] == 'buy']
            sell_trades = self.trades[self.trades['type'] == 'sell']
            win_rate = len(sell_trades[sell_trades['value'] > buy_trades['value'].values[:len(sell_trades)]]) / len(sell_trades) if len(sell_trades) > 0 else 0
        else:
            win_rate = 0
            
        return {
            '总收益率': total_return,
            '夏普比率': sharpe_ratio,
            '最大回撤': max_drawdown,
            '胜率': win_rate,
            '交易次数': len(self.trades) if not self.trades.empty else 0
        }


# 策略基类
class BaseStrategy:
    def __init__(self, name):
        self.name = name
        
    def calculate_signals(self, data):
        raise NotImplementedError("子类必须实现calculate_signals方法")


# 1. 移动平均线交叉策略
class MovingAverageCrossoverStrategy(BaseStrategy):
    def __init__(self, short_window=5, long_window=15):
        super().__init__("移动平均线交叉策略")
        self.short_window = short_window
        self.long_window = long_window
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        signals['short_ma'] = signals['price'].rolling(window=self.short_window).mean()
        signals['long_ma'] = signals['price'].rolling(window=self.long_window).mean()
        
        signals['signal'] = np.where(
            signals['short_ma'] > signals['long_ma'], 'buy', 'sell'
        )
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change']]
        
        return signals[['signal']]


# 2. RSI策略
class RSIStrategy(BaseStrategy):
    def __init__(self, window=14, oversold=30, overbought=70):
        super().__init__("RSI策略")
        self.window = window
        self.oversold = oversold
        self.overbought = overbought
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        delta = signals['price'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=self.window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=self.window).mean()
        rs = gain / loss
        signals['rsi'] = 100 - (100 / (1 + rs))
        
        signals['signal'] = 'hold'
        signals.loc[signals['rsi'] < self.oversold, 'signal'] = 'buy'
        signals.loc[signals['rsi'] > self.overbought, 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 3. 布林带策略
class BollingerBandsStrategy(BaseStrategy):
    def __init__(self, window=20, num_std=2):
        super().__init__("布林带策略")
        self.window = window
        self.num_std = num_std
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        signals['ma'] = signals['price'].rolling(window=self.window).mean()
        signals['std'] = signals['price'].rolling(window=self.window).std()
        signals['upper_band'] = signals['ma'] + (signals['std'] * self.num_std)
        signals['lower_band'] = signals['ma'] - (signals['std'] * self.num_std)
        
        signals['signal'] = 'hold'
        signals.loc[signals['price'] < signals['lower_band'], 'signal'] = 'buy'
        signals.loc[signals['price'] > signals['upper_band'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 4. MACD策略
class MACDStrategy(BaseStrategy):
    def __init__(self, fast=12, slow=26, signal=9):
        super().__init__("MACD策略")
        self.fast = fast
        self.slow = slow
        self.signal = signal
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        exp1 = signals['price'].ewm(span=self.fast).mean()
        exp2 = signals['price'].ewm(span=self.slow).mean()
        signals['macd'] = exp1 - exp2
        signals['signal_line'] = signals['macd'].ewm(span=self.signal).mean()
        
        signals['signal'] = 'hold'
        signals.loc[signals['macd'] > signals['signal_line'], 'signal'] = 'buy'
        signals.loc[signals['macd'] < signals['signal_line'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 5. 随机震荡指标策略
class StochasticOscillatorStrategy(BaseStrategy):
    def __init__(self, k_window=14, d_window=3, oversold=20, overbought=80):
        super().__init__("随机震荡指标策略")
        self.k_window = k_window
        self.d_window = d_window
        self.oversold = oversold
        self.overbought = overbought
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        low_min = signals['price'].rolling(window=self.k_window).min()
        high_max = signals['price'].rolling(window=self.k_window).max()
        signals['%K'] = 100 * (signals['price'] - low_min) / (high_max - low_min)
        signals['%D'] = signals['%K'].rolling(window=self.d_window).mean()
        
        signals['signal'] = 'hold'
        signals.loc[signals['%K'] < self.oversold, 'signal'] = 'buy'
        signals.loc[signals['%K'] > self.overbought, 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 6. 乖离率策略
class BIASStrategy(BaseStrategy):
    def __init__(self, window=20, buy_threshold=-2, sell_threshold=2):
        super().__init__("乖离率策略")
        self.window = window
        self.buy_threshold = buy_threshold
        self.sell_threshold = sell_threshold
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        ma = signals['price'].rolling(window=self.window).mean()
        signals['bias'] = (signals['price'] - ma) / ma * 100
        
        signals['signal'] = 'hold'
        signals.loc[signals['bias'] < self.buy_threshold, 'signal'] = 'buy'
        signals.loc[signals['bias'] > self.sell_threshold, 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 7. 动量策略
class MomentumStrategy(BaseStrategy):
    def __init__(self, window=10, buy_threshold=0, sell_threshold=0):
        super().__init__("动量策略")
        self.window = window
        self.buy_threshold = buy_threshold
        self.sell_threshold = sell_threshold
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        signals['momentum'] = signals['price'] - signals['price'].shift(self.window)
        
        signals['signal'] = 'hold'
        signals.loc[signals['momentum'] > self.buy_threshold, 'signal'] = 'buy'
        signals.loc[signals['momentum'] < self.sell_threshold, 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 8. 平均真实范围策略
class ATRStrategy(BaseStrategy):
    def __init__(self, window=14, multiplier=2):
        super().__init__("ATR策略")
        self.window = window
        self.multiplier = multiplier
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        high_low = signals['price'] - signals['price'].shift(1)
        high_close = (signals['price'] - signals['price'].shift(1)).abs()
        low_close = (signals['price'].shift(1) - signals['price']).abs()
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        signals['atr'] = tr.rolling(window=self.window).mean()
        
        signals['upper_band'] = signals['price'] + self.multiplier * signals['atr']
        signals['lower_band'] = signals['price'] - self.multiplier * signals['atr']
        
        signals['signal'] = 'hold'
        signals.loc[signals['price'] > signals['upper_band'], 'signal'] = 'sell'
        signals.loc[signals['price'] < signals['lower_band'], 'signal'] = 'buy'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 9. 威廉指标策略
class WilliamsRStrategy(BaseStrategy):
    def __init__(self, window=14, oversold=-80, overbought=-20):
        super().__init__("威廉指标策略")
        self.window = window
        self.oversold = oversold
        self.overbought = overbought
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        highest_high = signals['price'].rolling(window=self.window).max()
        lowest_low = signals['price'].rolling(window=self.window).min()
        signals['williams_r'] = -100 * (highest_high - signals['price']) / (highest_high - lowest_low)
        
        signals['signal'] = 'hold'
        signals.loc[signals['williams_r'] < self.oversold, 'signal'] = 'buy'
        signals.loc[signals['williams_r'] > self.overbought, 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 10. 顺势指标策略
class CCIStrategy(BaseStrategy):
    def __init__(self, window=20, oversold=-100, overbought=100):
        super().__init__("CCI策略")
        self.window = window
        self.oversold = oversold
        self.overbought = overbought
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        typical_price = signals['price']
        sma = typical_price.rolling(window=self.window).mean()
        mad = typical_price.rolling(window=self.window).apply(lambda x: np.mean(np.abs(x - np.mean(x))))
        signals['cci'] = (typical_price - sma) / (0.015 * mad)
        
        signals['signal'] = 'hold'
        signals.loc[signals['cci'] < self.oversold, 'signal'] = 'buy'
        signals.loc[signals['cci'] > self.overbought, 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 11. 价格通道策略
class PriceChannelStrategy(BaseStrategy):
    def __init__(self, window=20):
        super().__init__("价格通道策略")
        self.window = window
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        signals['upper_channel'] = signals['price'].rolling(window=self.window).max()
        signals['lower_channel'] = signals['price'].rolling(window=self.window).min()
        
        signals['signal'] = 'hold'
        signals.loc[signals['price'] > signals['upper_channel'], 'signal'] = 'buy'
        signals.loc[signals['price'] < signals['lower_channel'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 12. 抛物线转向指标策略
class ParabolicSARStrategy(BaseStrategy):
    def __init__(self, af=0.02, max_af=0.2):
        super().__init__("抛物线转向指标策略")
        self.af = af
        self.max_af = max_af
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        # 简化版的SAR计算
        high = signals['price']
        low = signals['price']
        
        sar = [low.iloc[0]]
        trend = 1
        ep = high.iloc[0]
        af = self.af
        
        for i in range(1, len(signals)):
            if trend == 1:
                sar.append(sar[-1] + af * (ep - sar[-1]))
                if high.iloc[i] > ep:
                    ep = high.iloc[i]
                    af = min(af + self.af, self.max_af)
                if low.iloc[i] < sar[-1]:
                    trend = -1
                    sar[-1] = ep
                    ep = low.iloc[i]
                    af = self.af
            else:
                sar.append(sar[-1] + af * (ep - sar[-1]))
                if low.iloc[i] < ep:
                    ep = low.iloc[i]
                    af = min(af + self.af, self.max_af)
                if high.iloc[i] > sar[-1]:
                    trend = 1
                    sar[-1] = ep
                    ep = high.iloc[i]
                    af = self.af
        
        signals['sar'] = sar
        
        signals['signal'] = 'hold'
        signals.loc[signals['price'] > signals['sar'], 'signal'] = 'buy'
        signals.loc[signals['price'] < signals['sar'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 13. 成交量加权平均价格策略
class VWAPStrategy(BaseStrategy):
    def __init__(self, window=20):
        super().__init__("VWAP策略")
        self.window = window
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        signals['volume'] = data.get('volume', 1)  # 如果没有成交量数据,默认设为1
        
        typical_price = signals['price']
        signals['vwap'] = (typical_price * signals['volume']).rolling(window=self.window).sum() / signals['volume'].rolling(window=self.window).sum()
        
        signals['signal'] = 'hold'
        signals.loc[signals['price'] > signals['vwap'], 'signal'] = 'buy'
        signals.loc[signals['price'] < signals['vwap'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 14. 动态动量指数策略
class DMIStrategy(BaseStrategy):
    def __init__(self, window=14):
        super().__init__("DMI策略")
        self.window = window
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        # 简化版的DMI计算
        up_move = signals['price'].diff()
        down_move = -signals['price'].diff()
        
        plus_dm = up_move.where(up_move > down_move, 0)
        minus_dm = down_move.where(down_move > up_move, 0)
        
        atr = (signals['price'] - signals['price'].shift(1)).abs().rolling(window=self.window).mean()
        
        signals['plus_di'] = 100 * plus_dm.rolling(window=self.window).mean() / atr
        signals['minus_di'] = 100 * minus_dm.rolling(window=self.window).mean() / atr
        
        signals['signal'] = 'hold'
        signals.loc[signals['plus_di'] > signals['minus_di'], 'signal'] = 'buy'
        signals.loc[signals['plus_di'] < signals['minus_di'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 15. 价格波动率策略
class VolatilityStrategy(BaseStrategy):
    def __init__(self, window=20, threshold=0.01):
        super().__init__("价格波动率策略")
        self.window = window
        self.threshold = threshold
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        returns = signals['price'].pct_change()
        signals['volatility'] = returns.rolling(window=self.window).std()
        
        signals['signal'] = 'hold'
        signals.loc[signals['volatility'] > self.threshold, 'signal'] = 'sell'
        signals.loc[signals['volatility'] < self.threshold/2, 'signal'] = 'buy'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 16. 价格突破策略
class BreakoutStrategy(BaseStrategy):
    def __init__(self, window=20):
        super().__init__("价格突破策略")
        self.window = window
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        signals['high'] = signals['price'].rolling(window=self.window).max()
        signals['low'] = signals['price'].rolling(window=self.window).min()
        
        signals['signal'] = 'hold'
        signals.loc[signals['price'] > signals['high'].shift(1), 'signal'] = 'buy'
        signals.loc[signals['price'] < signals['low'].shift(1), 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 17. 价格回归策略
class MeanReversionStrategy(BaseStrategy):
    def __init__(self, window=20, num_std=2):
        super().__init__("价格回归策略")
        self.window = window
        self.num_std = num_std
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        signals['ma'] = signals['price'].rolling(window=self.window).mean()
        signals['std'] = signals['price'].rolling(window=self.window).std()
        
        signals['signal'] = 'hold'
        signals.loc[signals['price'] < signals['ma'] - self.num_std * signals['std'], 'signal'] = 'buy'
        signals.loc[signals['price'] > signals['ma'] + self.num_std * signals['std'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 18. 三重指数移动平均策略
class TRIXStrategy(BaseStrategy):
    def __init__(self, window=15, signal_line=9):
        super().__init__("TRIX策略")
        self.window = window
        self.signal_line = signal_line
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        ema1 = signals['price'].ewm(span=self.window).mean()
        ema2 = ema1.ewm(span=self.window).mean()
        ema3 = ema2.ewm(span=self.window).mean()
        signals['trix'] = ema3.pct_change() * 100
        signals['signal_line'] = signals['trix'].ewm(span=self.signal_line).mean()
        
        signals['signal'] = 'hold'
        signals.loc[signals['trix'] > signals['signal_line'], 'signal'] = 'buy'
        signals.loc[signals['trix'] < signals['signal_line'], 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 19. 资金流量指标策略
class MFIStrategy(BaseStrategy):
    def __init__(self, window=14, oversold=20, overbought=80):
        super().__init__("资金流量指标策略")
        self.window = window
        self.oversold = oversold
        self.overbought = overbought
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        signals['volume'] = data.get('volume', 1)
        
        typical_price = signals['price']
        raw_money_flow = typical_price * signals['volume']
        
        positive_flow = raw_money_flow.where(typical_price > typical_price.shift(1), 0)
        negative_flow = raw_money_flow.where(typical_price < typical_price.shift(1), 0)
        
        positive_mf = positive_flow.rolling(window=self.window).sum()
        negative_mf = negative_flow.rolling(window=self.window).sum()
        
        signals['mfi'] = 100 * positive_mf / (positive_mf + negative_mf)
        
        signals['signal'] = 'hold'
        signals.loc[signals['mfi'] < self.oversold, 'signal'] = 'buy'
        signals.loc[signals['mfi'] > self.overbought, 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 20. 相对强弱指数策略(改进版)
class AdvancedRSIStrategy(BaseStrategy):
    def __init__(self, window=14, oversold=30, overbought=70, ma_window=10):
        super().__init__("改进RSI策略")
        self.window = window
        self.oversold = oversold
        self.overbought = overbought
        self.ma_window = ma_window
        
    def calculate_signals(self, data):
        signals = pd.DataFrame(index=data.index)
        signals['price'] = data['price']
        
        delta = signals['price'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=self.window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=self.window).mean()
        rs = gain / loss
        signals['rsi'] = 100 - (100 / (1 + rs))
        
        # 添加移动平均线过滤
        signals['ma'] = signals['price'].rolling(window=self.ma_window).mean()
        
        signals['signal'] = 'hold'
        signals.loc[(signals['rsi'] < self.oversold) & (signals['price'] > signals['ma']), 'signal'] = 'buy'
        signals.loc[(signals['rsi'] > self.overbought) & (signals['price'] < signals['ma']), 'signal'] = 'sell'
        
        signals['signal_change'] = signals['signal'] != signals['signal'].shift(1)
        signals = signals[signals['signal_change'] & (signals['signal'] != 'hold')]
        
        return signals[['signal']]


# 生成示例tick数据
def generate_sample_data(num_ticks=1000, start_price=100, volatility=0.001):
    start_time = datetime.now() - timedelta(hours=1)
    timestamps = [start_time + timedelta(seconds=i) for i in range(num_ticks)]
    
    prices = [start_price]
    for i in range(1, num_ticks):
        change = np.random.normal(0, volatility)
        prices.append(prices[-1] * (1 + change))
    
    volumes = np.random.randint(100, 1000, num_ticks)
    
    data = pd.DataFrame({
        'timestamp': timestamps,
        'price': prices,
        'volume': volumes
    })
    
    return data


# Tkinter GUI应用
class BacktestApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Tick级别回测系统 - 20种策略")
        self.root.geometry("1400x900")
        
        self.backtester = TickBacktester()
        self.data = generate_sample_data()
        
        # 20种策略
        self.strategies = {
            "移动平均线交叉": MovingAverageCrossoverStrategy(5, 15),
            "RSI策略": RSIStrategy(14, 30, 70),
            "布林带策略": BollingerBandsStrategy(20, 2),
            "MACD策略": MACDStrategy(12, 26, 9),
            "随机震荡指标": StochasticOscillatorStrategy(14, 3, 20, 80),
            "乖离率策略": BIASStrategy(20, -2, 2),
            "动量策略": MomentumStrategy(10, 0, 0),
            "ATR策略": ATRStrategy(14, 2),
            "威廉指标": WilliamsRStrategy(14, -80, -20),
            "CCI策略": CCIStrategy(20, -100, 100),
            "价格通道": PriceChannelStrategy(20),
            "抛物线转向": ParabolicSARStrategy(0.02, 0.2),
            "VWAP策略": VWAPStrategy(20),
            "DMI策略": DMIStrategy(14),
            "价格波动率": VolatilityStrategy(20, 0.01),
            "价格突破": BreakoutStrategy(20),
            "价格回归": MeanReversionStrategy(20, 2),
            "TRIX策略": TRIXStrategy(15, 9),
            "资金流量指标": MFIStrategy(14, 20, 80),
            "改进RSI策略": AdvancedRSIStrategy(14, 30, 70, 10)
        }
        
        self.setup_ui()
        
    def setup_ui(self):
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        
        self.root.columnconfigure(0, weight=1)
        self.root.rowconfigure(0, weight=1)
        main_frame.columnconfigure(1, weight=1)
        main_frame.rowconfigure(1, weight=1)
        
        # 控制面板
        control_frame = ttk.LabelFrame(main_frame, text="回测控制", padding="5")
        control_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
        
        # 策略选择
        ttk.Label(control_frame, text="选择策略:").grid(row=0, column=0, sticky=tk.W, padx=(0, 5))
        self.strategy_var = tk.StringVar()
        strategy_combo = ttk.Combobox(control_frame, textvariable=self.strategy_var, 
                                     values=list(self.strategies.keys()), width=20)
        strategy_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10))
        strategy_combo.current(0)
        
        # 初始资金
        ttk.Label(control_frame, text="初始资金:").grid(row=0, column=2, sticky=tk.W, padx=(0, 5))
        self.capital_var = tk.StringVar(value="100000")
        capital_entry = ttk.Entry(control_frame, textvariable=self.capital_var, width=10)
        capital_entry.grid(row=0, column=3, sticky=(tk.W, tk.E), padx=(0, 10))
        
        # 佣金比例
        ttk.Label(control_frame, text="佣金比例:").grid(row=0, column=4, sticky=tk.W, padx=(0, 5))
        self.commission_var = tk.StringVar(value="0.001")
        commission_entry = ttk.Entry(control_frame, textvariable=self.commission_var, width=10)
        commission_entry.grid(row=0, column=5, sticky=(tk.W, tk.E), padx=(0, 10))
        
        # 运行回测按钮
        run_btn = ttk.Button(control_frame, text="运行回测", command=self.run_backtest)
        run_btn.grid(row=0, column=6, padx=(10, 0))
        
        # 图表框架
        chart_frame = ttk.Frame(main_frame)
        chart_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S))
        
        self.fig = Figure(figsize=(12, 8), dpi=100)
        self.canvas = FigureCanvasTkAgg(self.fig, master=chart_frame)
        self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
        
        # 性能指标框架
        metrics_frame = ttk.LabelFrame(main_frame, text="性能指标", padding="5")
        metrics_frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(10, 0))
        
        # 性能指标标签
        self.metrics_vars = {}
        metrics = ['总收益率', '夏普比率', '最大回撤', '胜率', '交易次数']
        for i, metric in enumerate(metrics):
            ttk.Label(metrics_frame, text=f"{metric}:").grid(row=0, column=i*2, sticky=tk.W, padx=(10, 5))
            self.metrics_vars[metric] = tk.StringVar(value="N/A")
            ttk.Label(metrics_frame, textvariable=self.metrics_vars[metric]).grid(row=0, column=i*2+1, sticky=tk.W, padx=(0, 20))
        
        # 交易记录框架
        trades_frame = ttk.LabelFrame(main_frame, text="交易记录", padding="5")
        trades_frame.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(10, 0))
        
        # 交易记录表格
        columns = ('时间', '类型', '价格', '数量', '金额', '权益')
        self.trades_tree = ttk.Treeview(trades_frame, columns=columns, show='headings', height=6)
        
        for col in columns:
            self.trades_tree.heading(col, text=col)
            self.trades_tree.column(col, width=100)
        
        # 添加滚动条
        scrollbar = ttk.Scrollbar(trades_frame, orient=tk.VERTICAL, command=self.trades_tree.yview)
        self.trades_tree.configure(yscrollcommand=scrollbar.set)
        
        self.trades_tree.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
        scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
        
        # 配置权重
        trades_frame.columnconfigure(0, weight=1)
        trades_frame.rowconfigure(0, weight=1)
        main_frame.rowconfigure(3, weight=1)
        
    def run_backtest(self):
        """运行回测并更新界面"""
        try:
            # 获取参数
            initial_capital = float(self.capital_var.get())
            commission = float(self.commission_var.get())
            strategy_name = self.strategy_var.get()
            
            # 更新回测引擎参数
            self.backtester.initial_capital = initial_capital
            self.backtester.commission = commission
            self.backtester.reset()
            
            # 加载数据
            self.backtester.load_data(self.data)
            
            # 运行策略
            strategy = self.strategies[strategy_name]
            self.backtester.run_backtest(strategy)
            
            # 更新性能指标
            performance = self.backtester.get_performance()
            for metric, value in performance.items():
                if metric in ['总收益率', '最大回撤', '胜率']:
                    self.metrics_vars[metric].set(f"{value:.2%}")
                elif metric == '夏普比率':
                    self.metrics_vars[metric].set(f"{value:.2f}")
                else:
                    self.metrics_vars[metric].set(f"{value}")
            
            # 更新交易记录
            self.update_trades_table()
            
            # 更新图表
            self.update_charts()
            
        except Exception as e:
            messagebox.showerror("错误", f"回测过程中发生错误: {str(e)}")
    
    def update_trades_table(self):
        """更新交易记录表格"""
        # 清空现有数据
        for item in self.trades_tree.get_children():
            self.trades_tree.delete(item)
        
        # 添加新数据
        if not self.backtester.trades.empty:
            for timestamp, trade in self.backtester.trades.iterrows():
                self.trades_tree.insert('', 'end', values=(
                    timestamp.strftime('%H:%M:%S'),
                    trade['type'],
                    f"{trade['price']:.2f}",
                    f"{trade['shares']:.0f}",
                    f"{trade['value']:.0f}",
                    f"{trade['equity']:.0f}"
                ))
    
    def update_charts(self):
        """更新图表"""
        self.fig.clear()
        
        if self.backtester.equity_curve.empty:
            return
        
        # 创建两个子图
        ax1 = self.fig.add_subplot(211)
        ax2 = self.fig.add_subplot(212)
        
        # 绘制价格和买卖点
        ax1.plot(self.backtester.equity_curve.index, self.backtester.equity_curve['price'], 
                label='价格', linewidth=1, alpha=0.7)
        
        if not self.backtester.trades.empty:
            buy_trades = self.backtester.trades[self.backtester.trades['type'] == 'buy']
            sell_trades = self.backtester.trades[self.backtester.trades['type'] == 'sell']
            
            ax1.scatter(buy_trades.index, buy_trades['price'], 
                       color='green', marker='^', s=100, label='买入', alpha=0.7)
            ax1.scatter(sell_trades.index, sell_trades['price'], 
                       color='red', marker='v', s=100, label='卖出', alpha=0.7)
            
            # 添加交易金额标注
            for i, trade in buy_trades.iterrows():
                ax1.annotate(f"{trade['value']:.0f}", 
                            (i, trade['price']),
                            xytext=(0, 10), textcoords='offset points',
                            fontsize=8, color='green', ha='center')
            
            for i, trade in sell_trades.iterrows():
                ax1.annotate(f"{trade['value']:.0f}", 
                            (i, trade['price']),
                            xytext=(0, -15), textcoords='offset points',
                            fontsize=8, color='red', ha='center')
        
        ax1.set_title('价格走势与交易信号')
        ax1.set_ylabel('价格')
        ax1.legend()
        ax1.grid(True, alpha=0.3)
        
        # 绘制权益曲线
        ax2.plot(self.backtester.equity_curve.index, self.backtester.equity_curve['equity'], 
                label='权益曲线', linewidth=2, color='blue')
        ax2.axhline(y=self.backtester.initial_capital, color='gray', linestyle='--', alpha=0.7, label='初始资金')
        ax2.set_title('权益曲线')
        ax2.set_ylabel('权益')
        ax2.set_xlabel('时间')
        ax2.legend()
        ax2.grid(True, alpha=0.3)
        
        # 格式化x轴日期显示
        for ax in [ax1, ax2]:
            ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
            ax.xaxis.set_major_locator(mdates.AutoDateLocator())
        
        self.fig.tight_layout()
        self.canvas.draw()


# 主程序
if __name__ == "__main__":
    root = tk.Tk()
    app = BacktestApp(root)
    root.mainloop()

本文由 海星量化研究所 作者提供,转载请保留链接和署名!网址:http://qmt.hxquant.com/?id=49

声明

1.本站原创文章,转载需注明文章作者来源。 2.如果文章内容涉及版权问题,请联系我们删除,向本站投稿文章可能会经我们编辑修改。 3.本站对信息准确性或完整性不作保证,亦不对因使用该等信息而引发或可能引发的损失承担任何责任。

搜索
排行榜
关注我们

扫码开通QMT/ptrade