"""Historical backtester for the temporal arbitrage strategy. Supports two modes: 1. Synthetic: Random walk price simulation for quick parameter testing. 2. Historical: Real Binance kline data for realistic backtesting. Usage: python backtest.py --mode synthetic --windows 2000 python backtest.py --mode historical --asset BTC --days 7 """ from __future__ import annotations import argparse import asyncio import json import math import time from dataclasses import dataclass, field from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional from src.config import load_config from src.data.models import Asset, Direction, Signal, Timeframe from src.risk.fee_calculator import FeeCalculator from src.strategy.temporal_arb import TemporalArbStrategy import structlog log = structlog.get_logger() @dataclass class BacktestTrade: """Single backtest trade result.""" window_idx: int asset: str timeframe: str direction: str entry_price: float size: int edge: float estimated_prob: float won: bool pnl: float fee: float timestamp: float = 0.0 @dataclass class BacktestResult: """Aggregated backtest results.""" mode: str = "synthetic" asset: str = "" timeframe: str = "" total_windows: int = 0 total_trades: int = 0 wins: int = 0 losses: int = 0 total_pnl: float = 0.0 total_fees: float = 0.0 total_volume: float = 0.0 max_drawdown: float = 0.0 best_trade: float = 0.0 worst_trade: float = 0.0 peak_balance: float = 0.0 trades: list[BacktestTrade] = field(default_factory=list) @property def win_rate(self) -> float: return self.wins / self.total_trades * 100 if self.total_trades > 0 else 0 @property def avg_pnl(self) -> float: return self.total_pnl / self.total_trades if self.total_trades > 0 else 0 @property def profit_factor(self) -> float: gross_wins = sum(t.pnl for t in self.trades if t.pnl > 0) gross_losses = abs(sum(t.pnl for t in self.trades if t.pnl < 0)) return gross_wins / gross_losses if gross_losses > 0 else float("inf") @property def sharpe_ratio(self) -> float: """Approximate Sharpe ratio from trade PnLs.""" if len(self.trades) < 2: return 0.0 pnls = [t.pnl for t in self.trades] avg = sum(pnls) / len(pnls) variance = sum((p - avg) ** 2 for p in pnls) / len(pnls) std = math.sqrt(variance) if variance > 0 else 1e-9 return avg / std * math.sqrt(len(pnls)) # Annualized approximation @property def max_consecutive_losses(self) -> int: max_streak = 0 current = 0 for t in self.trades: if not t.won: current += 1 max_streak = max(max_streak, current) else: current = 0 return max_streak def to_dict(self) -> dict: return { "mode": self.mode, "asset": self.asset, "timeframe": self.timeframe, "total_windows": self.total_windows, "total_trades": self.total_trades, "wins": self.wins, "losses": self.losses, "win_rate": round(self.win_rate, 2), "total_pnl": round(self.total_pnl, 2), "avg_pnl": round(self.avg_pnl, 2), "total_fees": round(self.total_fees, 2), "total_volume": round(self.total_volume, 2), "profit_factor": round(self.profit_factor, 2), "sharpe_ratio": round(self.sharpe_ratio, 2), "max_drawdown": round(self.max_drawdown, 2), "best_trade": round(self.best_trade, 2), "worst_trade": round(self.worst_trade, 2), "max_consecutive_losses": self.max_consecutive_losses, } class Backtester: """Replay historical or synthetic data through the temporal arb strategy.""" def __init__(self, config_path: str = "config.toml", balance: float = 10000.0) -> None: self.config = load_config(config_path) self.initial_balance = balance self.fee_calc = FeeCalculator(self.config.fees) def _make_strategy(self, balance: float) -> TemporalArbStrategy: return TemporalArbStrategy( arb_config=self.config.temporal_arb, risk_config=self.config.risk, fees_config=self.config.fees, balance=balance, ) # ------------------------------------------------------------------ # Synthetic backtest # ------------------------------------------------------------------ async def run_synthetic( self, asset: str = "BTC", timeframe: str = "15M", num_windows: int = 1000, avg_volatility_pct: float = 0.3, ) -> BacktestResult: """Run a synthetic backtest using simulated price movements.""" import numpy as np strategy = self._make_strategy(self.initial_balance) result = BacktestResult( mode="synthetic", asset=asset, timeframe=timeframe, total_windows=num_windows, ) balance = self.initial_balance peak_balance = balance window_sec = 300 if timeframe == "5M" else 900 rng = np.random.default_rng(42) base_prices = {"BTC": 84000, "ETH": 2300, "SOL": 135} base_price = base_prices.get(asset, 50000) for i in range(num_windows): start_price = base_price * (1 + rng.normal(0, 0.02)) num_ticks = 100 returns = rng.normal(0, avg_volatility_pct / 100 / math.sqrt(num_ticks), num_ticks) prices = [start_price] for r in returns: prices.append(prices[-1] * (1 + r)) end_price = prices[-1] actual_direction = "UP" if end_price > start_price else "DOWN" # Evaluate at multiple points in the window for eval_frac in [0.3, 0.5, 0.7]: eval_idx = int(num_ticks * eval_frac) eval_price = prices[eval_idx] time_remaining = window_sec * (1 - eval_frac) change_pct = (eval_price - start_price) / start_price * 100 # Simulate Polymarket price (lagging behind reality) if abs(change_pct) > 0.05: lag_factor = 0.3 # Polymarket adjusts at 30% of actual if change_pct > 0: sim_poly_up = 0.50 + abs(change_pct) * lag_factor * 10 sim_poly_up = min(sim_poly_up, 0.75) sim_poly_down = max(0.25, 1.0 - sim_poly_up - rng.uniform(0, 0.04)) else: sim_poly_down = 0.50 + abs(change_pct) * lag_factor * 10 sim_poly_down = min(sim_poly_down, 0.75) sim_poly_up = max(0.25, 1.0 - sim_poly_down - rng.uniform(0, 0.04)) else: sim_poly_up = 0.50 + rng.uniform(-0.02, 0.02) sim_poly_down = 0.50 + rng.uniform(-0.02, 0.02) strategy.update_balance(balance) signal = await strategy.evaluate( symbol=asset, cex_price=eval_price, window_start_price=start_price, window_end_time=time.time() + time_remaining, poly_up_ask=sim_poly_up, poly_down_ask=sim_poly_down, up_token_id=f"up_{i}", down_token_id=f"down_{i}", timeframe=timeframe, ) if signal is None: continue # Simulate outcome won = (signal.direction == Direction.UP and actual_direction == "UP") or \ (signal.direction == Direction.DOWN and actual_direction == "DOWN") pnl = self.fee_calc.net_payout( timeframe=timeframe, entry_price=signal.price, size=signal.size, won=won, ) fee = self.fee_calc.taker_fee(timeframe, signal.price, signal.size) if won else 0 balance += pnl peak_balance = max(peak_balance, balance) drawdown = peak_balance - balance trade = BacktestTrade( window_idx=i, asset=asset, timeframe=timeframe, direction=signal.direction.value, entry_price=signal.price, size=signal.size, edge=signal.edge, estimated_prob=signal.estimated_prob, won=won, pnl=pnl, fee=fee, ) result.trades.append(trade) result.total_trades += 1 result.total_pnl += pnl result.total_fees += fee result.total_volume += signal.price * signal.size if won: result.wins += 1 else: result.losses += 1 result.best_trade = max(result.best_trade, pnl) result.worst_trade = min(result.worst_trade, pnl) result.max_drawdown = max(result.max_drawdown, drawdown) result.peak_balance = peak_balance break # Only take one trade per window return result # ------------------------------------------------------------------ # Historical backtest (Binance klines) # ------------------------------------------------------------------ async def fetch_binance_klines( self, symbol: str, interval: str = "1m", days: int = 7, ) -> list[dict]: """Fetch historical kline data from Binance REST API.""" import aiohttp pair = f"{symbol}USDT" url = "https://api.binance.com/api/v3/klines" end_time = int(time.time() * 1000) start_time = end_time - (days * 24 * 60 * 60 * 1000) all_klines = [] current_start = start_time async with aiohttp.ClientSession() as session: while current_start < end_time: params = { "symbol": pair, "interval": interval, "startTime": current_start, "endTime": end_time, "limit": 1000, } async with session.get(url, params=params) as resp: if resp.status != 200: log.error("binance_klines_error", status=resp.status) break data = await resp.json() if not data: break for k in data: all_klines.append({ "open_time": k[0], "open": float(k[1]), "high": float(k[2]), "low": float(k[3]), "close": float(k[4]), "volume": float(k[5]), "close_time": k[6], }) current_start = data[-1][6] + 1 # Next ms after last close await asyncio.sleep(0.1) # Rate limiting log.info("klines_fetched", symbol=symbol, count=len(all_klines), days=days) return all_klines async def run_historical( self, asset: str = "BTC", timeframe: str = "15M", days: int = 7, ) -> BacktestResult: """Run backtest using real Binance historical data.""" strategy = self._make_strategy(self.initial_balance) result = BacktestResult( mode="historical", asset=asset, timeframe=timeframe, ) balance = self.initial_balance peak_balance = balance # Fetch 1-minute klines klines = await self.fetch_binance_klines(asset, interval="1m", days=days) if not klines: log.error("no_klines_data", asset=asset) return result window_minutes = 5 if timeframe == "5M" else 15 window_sec = window_minutes * 60 # Group klines into windows window_idx = 0 i = 0 while i + window_minutes <= len(klines): window_klines = klines[i:i + window_minutes] start_price = window_klines[0]["open"] end_price = window_klines[-1]["close"] actual_direction = "UP" if end_price > start_price else "DOWN" result.total_windows += 1 # Simulate evaluation at mid-point mid_idx = window_minutes // 2 mid_price = window_klines[mid_idx]["close"] time_remaining = window_sec * 0.5 change_pct = (mid_price - start_price) / start_price * 100 # Simulate Polymarket prices based on actual market behavior # More conservative lag simulation for historical if abs(change_pct) > 0.05: lag = 0.25 # Market adjusts at ~25% speed if change_pct > 0: poly_up = 0.50 + abs(change_pct) * lag * 8 poly_up = min(poly_up, 0.72) poly_down = max(0.28, 1.0 - poly_up - 0.02) else: poly_down = 0.50 + abs(change_pct) * lag * 8 poly_down = min(poly_down, 0.72) poly_up = max(0.28, 1.0 - poly_down - 0.02) else: poly_up = 0.50 poly_down = 0.50 strategy.update_balance(balance) signal = await strategy.evaluate( symbol=asset, cex_price=mid_price, window_start_price=start_price, window_end_time=time.time() + time_remaining, poly_up_ask=poly_up, poly_down_ask=poly_down, up_token_id=f"hist_up_{window_idx}", down_token_id=f"hist_down_{window_idx}", timeframe=timeframe, ) if signal: won = (signal.direction == Direction.UP and actual_direction == "UP") or \ (signal.direction == Direction.DOWN and actual_direction == "DOWN") pnl = self.fee_calc.net_payout( timeframe=timeframe, entry_price=signal.price, size=signal.size, won=won, ) fee = self.fee_calc.taker_fee(timeframe, signal.price, signal.size) if won else 0 balance += pnl peak_balance = max(peak_balance, balance) drawdown = peak_balance - balance trade = BacktestTrade( window_idx=window_idx, asset=asset, timeframe=timeframe, direction=signal.direction.value, entry_price=signal.price, size=signal.size, edge=signal.edge, estimated_prob=signal.estimated_prob, won=won, pnl=pnl, fee=fee, timestamp=window_klines[0]["open_time"] / 1000, ) result.trades.append(trade) result.total_trades += 1 result.total_pnl += pnl result.total_fees += fee result.total_volume += signal.price * signal.size if won: result.wins += 1 else: result.losses += 1 result.best_trade = max(result.best_trade, pnl) result.worst_trade = min(result.worst_trade, pnl) result.max_drawdown = max(result.max_drawdown, drawdown) result.peak_balance = peak_balance i += window_minutes window_idx += 1 return result # --------------------------------------------------------------------------- # Output # --------------------------------------------------------------------------- def print_results(result: BacktestResult) -> None: """Pretty-print backtest results.""" print("\n" + "=" * 65) print(f" BACKTEST RESULTS — {result.asset} {result.timeframe} ({result.mode})") print("=" * 65) print(f" Windows Tested: {result.total_windows}") print(f" Total Trades: {result.total_trades}") print(f" Wins / Losses: {result.wins} / {result.losses}") print(f" Win Rate: {result.win_rate:.1f}%") print(f" Total PnL: ${result.total_pnl:+,.2f}") print(f" Average PnL: ${result.avg_pnl:+,.2f}") print(f" Total Fees: ${result.total_fees:,.2f}") print(f" Total Volume: ${result.total_volume:,.0f}") print(f" Profit Factor: {result.profit_factor:.2f}") print(f" Sharpe Ratio: {result.sharpe_ratio:.2f}") print(f" Max Drawdown: ${result.max_drawdown:,.2f}") print(f" Best Trade: ${result.best_trade:+,.2f}") print(f" Worst Trade: ${result.worst_trade:+,.2f}") print(f" Max Consec. Losses: {result.max_consecutive_losses}") print("=" * 65) def save_results(results: list[BacktestResult], output_path: str = "backtest_results.json") -> None: """Save all backtest results to JSON.""" data = [r.to_dict() for r in results] Path(output_path).write_text(json.dumps(data, indent=2)) print(f"\nResults saved to {output_path}") # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- async def main() -> None: parser = argparse.ArgumentParser(description="Polymarket Arb Bot Backtester") parser.add_argument("--mode", choices=["synthetic", "historical", "both"], default="synthetic", help="Backtest mode") parser.add_argument("--asset", nargs="+", default=["BTC", "ETH", "SOL"], help="Assets to backtest") parser.add_argument("--timeframe", nargs="+", default=["5M", "15M"], help="Timeframes") parser.add_argument("--windows", type=int, default=1000, help="Number of windows for synthetic mode") parser.add_argument("--days", type=int, default=7, help="Days of history for historical mode") parser.add_argument("--balance", type=float, default=10000.0, help="Starting balance") parser.add_argument("--output", default="backtest_results.json", help="Output JSON file") args = parser.parse_args() bt = Backtester(balance=args.balance) all_results = [] if args.mode in ("synthetic", "both"): print("\n>>> SYNTHETIC BACKTEST <<<\n") vol_map = {"BTC": 0.3, "ETH": 0.4, "SOL": 0.6} for asset in args.asset: for tf in args.timeframe: result = await bt.run_synthetic( asset=asset, timeframe=tf, num_windows=args.windows, avg_volatility_pct=vol_map.get(asset, 0.3), ) print_results(result) all_results.append(result) if args.mode in ("historical", "both"): print("\n>>> HISTORICAL BACKTEST <<<\n") for asset in args.asset: for tf in args.timeframe: print(f"Fetching {args.days} days of {asset} data...") result = await bt.run_historical( asset=asset, timeframe=tf, days=args.days, ) print_results(result) all_results.append(result) if all_results: save_results(all_results, args.output) # Summary print("\n" + "=" * 65) print(" COMBINED SUMMARY") print("=" * 65) total_pnl = sum(r.total_pnl for r in all_results) total_trades = sum(r.total_trades for r in all_results) total_wins = sum(r.wins for r in all_results) print(f" Total Trades: {total_trades}") print(f" Overall PnL: ${total_pnl:+,.2f}") print(f" Overall WR: {total_wins / total_trades * 100:.1f}%" if total_trades > 0 else " N/A") print("=" * 65) if __name__ == "__main__": asyncio.run(main())