Files
polymarket-arb-bot/backtest.py

555 lines
20 KiB
Python
Raw Normal View History

2026-03-22 09:28:14 +09:00
"""Historical backtester for the temporal arbitrage strategy.
Supports two modes:
1. Synthetic: Random walk price simulation for quick parameter testing.
2. Historical: Real Binance kline data for realistic backtesting.
Usage:
python backtest.py --mode synthetic --windows 2000
python backtest.py --mode historical --asset BTC --days 7
"""
from __future__ import annotations
import argparse
import asyncio
import json
import math
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
from src.config import load_config
from src.data.models import Asset, Direction, Signal, Timeframe
from src.risk.fee_calculator import FeeCalculator
from src.strategy.temporal_arb import TemporalArbStrategy
import structlog
log = structlog.get_logger()
@dataclass
class BacktestTrade:
"""Single backtest trade result."""
window_idx: int
asset: str
timeframe: str
direction: str
entry_price: float
size: int
edge: float
estimated_prob: float
won: bool
pnl: float
fee: float
timestamp: float = 0.0
@dataclass
class BacktestResult:
"""Aggregated backtest results."""
mode: str = "synthetic"
asset: str = ""
timeframe: str = ""
total_windows: int = 0
total_trades: int = 0
wins: int = 0
losses: int = 0
total_pnl: float = 0.0
total_fees: float = 0.0
total_volume: float = 0.0
max_drawdown: float = 0.0
best_trade: float = 0.0
worst_trade: float = 0.0
peak_balance: float = 0.0
trades: list[BacktestTrade] = field(default_factory=list)
@property
def win_rate(self) -> float:
return self.wins / self.total_trades * 100 if self.total_trades > 0 else 0
@property
def avg_pnl(self) -> float:
return self.total_pnl / self.total_trades if self.total_trades > 0 else 0
@property
def profit_factor(self) -> float:
gross_wins = sum(t.pnl for t in self.trades if t.pnl > 0)
gross_losses = abs(sum(t.pnl for t in self.trades if t.pnl < 0))
return gross_wins / gross_losses if gross_losses > 0 else float("inf")
@property
def sharpe_ratio(self) -> float:
"""Approximate Sharpe ratio from trade PnLs."""
if len(self.trades) < 2:
return 0.0
pnls = [t.pnl for t in self.trades]
avg = sum(pnls) / len(pnls)
variance = sum((p - avg) ** 2 for p in pnls) / len(pnls)
std = math.sqrt(variance) if variance > 0 else 1e-9
return avg / std * math.sqrt(len(pnls)) # Annualized approximation
@property
def max_consecutive_losses(self) -> int:
max_streak = 0
current = 0
for t in self.trades:
if not t.won:
current += 1
max_streak = max(max_streak, current)
else:
current = 0
return max_streak
def to_dict(self) -> dict:
return {
"mode": self.mode,
"asset": self.asset,
"timeframe": self.timeframe,
"total_windows": self.total_windows,
"total_trades": self.total_trades,
"wins": self.wins,
"losses": self.losses,
"win_rate": round(self.win_rate, 2),
"total_pnl": round(self.total_pnl, 2),
"avg_pnl": round(self.avg_pnl, 2),
"total_fees": round(self.total_fees, 2),
"total_volume": round(self.total_volume, 2),
"profit_factor": round(self.profit_factor, 2),
"sharpe_ratio": round(self.sharpe_ratio, 2),
"max_drawdown": round(self.max_drawdown, 2),
"best_trade": round(self.best_trade, 2),
"worst_trade": round(self.worst_trade, 2),
"max_consecutive_losses": self.max_consecutive_losses,
}
class Backtester:
"""Replay historical or synthetic data through the temporal arb strategy."""
def __init__(self, config_path: str = "config.toml", balance: float = 10000.0) -> None:
self.config = load_config(config_path)
self.initial_balance = balance
self.fee_calc = FeeCalculator(self.config.fees)
def _make_strategy(self, balance: float) -> TemporalArbStrategy:
return TemporalArbStrategy(
arb_config=self.config.temporal_arb,
risk_config=self.config.risk,
fees_config=self.config.fees,
balance=balance,
)
# ------------------------------------------------------------------
# Synthetic backtest
# ------------------------------------------------------------------
async def run_synthetic(
self,
asset: str = "BTC",
timeframe: str = "15M",
num_windows: int = 1000,
avg_volatility_pct: float = 0.3,
) -> BacktestResult:
"""Run a synthetic backtest using simulated price movements."""
import numpy as np
strategy = self._make_strategy(self.initial_balance)
result = BacktestResult(
mode="synthetic",
asset=asset,
timeframe=timeframe,
total_windows=num_windows,
)
balance = self.initial_balance
peak_balance = balance
window_sec = 300 if timeframe == "5M" else 900
rng = np.random.default_rng(42)
base_prices = {"BTC": 84000, "ETH": 2300, "SOL": 135}
base_price = base_prices.get(asset, 50000)
for i in range(num_windows):
start_price = base_price * (1 + rng.normal(0, 0.02))
num_ticks = 100
returns = rng.normal(0, avg_volatility_pct / 100 / math.sqrt(num_ticks), num_ticks)
prices = [start_price]
for r in returns:
prices.append(prices[-1] * (1 + r))
end_price = prices[-1]
actual_direction = "UP" if end_price > start_price else "DOWN"
# Evaluate at multiple points in the window
for eval_frac in [0.3, 0.5, 0.7]:
eval_idx = int(num_ticks * eval_frac)
eval_price = prices[eval_idx]
time_remaining = window_sec * (1 - eval_frac)
change_pct = (eval_price - start_price) / start_price * 100
# Simulate Polymarket price (lagging behind reality)
if abs(change_pct) > 0.05:
lag_factor = 0.3 # Polymarket adjusts at 30% of actual
if change_pct > 0:
sim_poly_up = 0.50 + abs(change_pct) * lag_factor * 10
sim_poly_up = min(sim_poly_up, 0.75)
sim_poly_down = max(0.25, 1.0 - sim_poly_up - rng.uniform(0, 0.04))
else:
sim_poly_down = 0.50 + abs(change_pct) * lag_factor * 10
sim_poly_down = min(sim_poly_down, 0.75)
sim_poly_up = max(0.25, 1.0 - sim_poly_down - rng.uniform(0, 0.04))
else:
sim_poly_up = 0.50 + rng.uniform(-0.02, 0.02)
sim_poly_down = 0.50 + rng.uniform(-0.02, 0.02)
strategy.update_balance(balance)
signal = await strategy.evaluate(
symbol=asset,
cex_price=eval_price,
window_start_price=start_price,
window_end_time=time.time() + time_remaining,
poly_up_ask=sim_poly_up,
poly_down_ask=sim_poly_down,
up_token_id=f"up_{i}",
down_token_id=f"down_{i}",
timeframe=timeframe,
)
if signal is None:
continue
# Simulate outcome
won = (signal.direction == Direction.UP and actual_direction == "UP") or \
(signal.direction == Direction.DOWN and actual_direction == "DOWN")
pnl = self.fee_calc.net_payout(
timeframe=timeframe,
entry_price=signal.price,
size=signal.size,
won=won,
)
fee = self.fee_calc.taker_fee(timeframe, signal.price, signal.size) if won else 0
balance += pnl
peak_balance = max(peak_balance, balance)
drawdown = peak_balance - balance
trade = BacktestTrade(
window_idx=i,
asset=asset,
timeframe=timeframe,
direction=signal.direction.value,
entry_price=signal.price,
size=signal.size,
edge=signal.edge,
estimated_prob=signal.estimated_prob,
won=won,
pnl=pnl,
fee=fee,
)
result.trades.append(trade)
result.total_trades += 1
result.total_pnl += pnl
result.total_fees += fee
result.total_volume += signal.price * signal.size
if won:
result.wins += 1
else:
result.losses += 1
result.best_trade = max(result.best_trade, pnl)
result.worst_trade = min(result.worst_trade, pnl)
result.max_drawdown = max(result.max_drawdown, drawdown)
result.peak_balance = peak_balance
break # Only take one trade per window
return result
# ------------------------------------------------------------------
# Historical backtest (Binance klines)
# ------------------------------------------------------------------
async def fetch_binance_klines(
self,
symbol: str,
interval: str = "1m",
days: int = 7,
) -> list[dict]:
"""Fetch historical kline data from Binance REST API."""
import aiohttp
pair = f"{symbol}USDT"
url = "https://api.binance.com/api/v3/klines"
end_time = int(time.time() * 1000)
start_time = end_time - (days * 24 * 60 * 60 * 1000)
all_klines = []
current_start = start_time
async with aiohttp.ClientSession() as session:
while current_start < end_time:
params = {
"symbol": pair,
"interval": interval,
"startTime": current_start,
"endTime": end_time,
"limit": 1000,
}
async with session.get(url, params=params) as resp:
if resp.status != 200:
log.error("binance_klines_error", status=resp.status)
break
data = await resp.json()
if not data:
break
for k in data:
all_klines.append({
"open_time": k[0],
"open": float(k[1]),
"high": float(k[2]),
"low": float(k[3]),
"close": float(k[4]),
"volume": float(k[5]),
"close_time": k[6],
})
current_start = data[-1][6] + 1 # Next ms after last close
await asyncio.sleep(0.1) # Rate limiting
log.info("klines_fetched", symbol=symbol, count=len(all_klines), days=days)
return all_klines
async def run_historical(
self,
asset: str = "BTC",
timeframe: str = "15M",
days: int = 7,
) -> BacktestResult:
"""Run backtest using real Binance historical data."""
strategy = self._make_strategy(self.initial_balance)
result = BacktestResult(
mode="historical",
asset=asset,
timeframe=timeframe,
)
balance = self.initial_balance
peak_balance = balance
# Fetch 1-minute klines
klines = await self.fetch_binance_klines(asset, interval="1m", days=days)
if not klines:
log.error("no_klines_data", asset=asset)
return result
window_minutes = 5 if timeframe == "5M" else 15
window_sec = window_minutes * 60
# Group klines into windows
window_idx = 0
i = 0
while i + window_minutes <= len(klines):
window_klines = klines[i:i + window_minutes]
start_price = window_klines[0]["open"]
end_price = window_klines[-1]["close"]
actual_direction = "UP" if end_price > start_price else "DOWN"
result.total_windows += 1
# Simulate evaluation at mid-point
mid_idx = window_minutes // 2
mid_price = window_klines[mid_idx]["close"]
time_remaining = window_sec * 0.5
change_pct = (mid_price - start_price) / start_price * 100
# Simulate Polymarket prices based on actual market behavior
# More conservative lag simulation for historical
if abs(change_pct) > 0.05:
lag = 0.25 # Market adjusts at ~25% speed
if change_pct > 0:
poly_up = 0.50 + abs(change_pct) * lag * 8
poly_up = min(poly_up, 0.72)
poly_down = max(0.28, 1.0 - poly_up - 0.02)
else:
poly_down = 0.50 + abs(change_pct) * lag * 8
poly_down = min(poly_down, 0.72)
poly_up = max(0.28, 1.0 - poly_down - 0.02)
else:
poly_up = 0.50
poly_down = 0.50
strategy.update_balance(balance)
signal = await strategy.evaluate(
symbol=asset,
cex_price=mid_price,
window_start_price=start_price,
window_end_time=time.time() + time_remaining,
poly_up_ask=poly_up,
poly_down_ask=poly_down,
up_token_id=f"hist_up_{window_idx}",
down_token_id=f"hist_down_{window_idx}",
timeframe=timeframe,
)
if signal:
won = (signal.direction == Direction.UP and actual_direction == "UP") or \
(signal.direction == Direction.DOWN and actual_direction == "DOWN")
pnl = self.fee_calc.net_payout(
timeframe=timeframe,
entry_price=signal.price,
size=signal.size,
won=won,
)
fee = self.fee_calc.taker_fee(timeframe, signal.price, signal.size) if won else 0
balance += pnl
peak_balance = max(peak_balance, balance)
drawdown = peak_balance - balance
trade = BacktestTrade(
window_idx=window_idx,
asset=asset,
timeframe=timeframe,
direction=signal.direction.value,
entry_price=signal.price,
size=signal.size,
edge=signal.edge,
estimated_prob=signal.estimated_prob,
won=won,
pnl=pnl,
fee=fee,
timestamp=window_klines[0]["open_time"] / 1000,
)
result.trades.append(trade)
result.total_trades += 1
result.total_pnl += pnl
result.total_fees += fee
result.total_volume += signal.price * signal.size
if won:
result.wins += 1
else:
result.losses += 1
result.best_trade = max(result.best_trade, pnl)
result.worst_trade = min(result.worst_trade, pnl)
result.max_drawdown = max(result.max_drawdown, drawdown)
result.peak_balance = peak_balance
i += window_minutes
window_idx += 1
return result
# ---------------------------------------------------------------------------
# Output
# ---------------------------------------------------------------------------
def print_results(result: BacktestResult) -> None:
"""Pretty-print backtest results."""
print("\n" + "=" * 65)
print(f" BACKTEST RESULTS — {result.asset} {result.timeframe} ({result.mode})")
print("=" * 65)
print(f" Windows Tested: {result.total_windows}")
print(f" Total Trades: {result.total_trades}")
print(f" Wins / Losses: {result.wins} / {result.losses}")
print(f" Win Rate: {result.win_rate:.1f}%")
print(f" Total PnL: ${result.total_pnl:+,.2f}")
print(f" Average PnL: ${result.avg_pnl:+,.2f}")
print(f" Total Fees: ${result.total_fees:,.2f}")
print(f" Total Volume: ${result.total_volume:,.0f}")
print(f" Profit Factor: {result.profit_factor:.2f}")
print(f" Sharpe Ratio: {result.sharpe_ratio:.2f}")
print(f" Max Drawdown: ${result.max_drawdown:,.2f}")
print(f" Best Trade: ${result.best_trade:+,.2f}")
print(f" Worst Trade: ${result.worst_trade:+,.2f}")
print(f" Max Consec. Losses: {result.max_consecutive_losses}")
print("=" * 65)
def save_results(results: list[BacktestResult], output_path: str = "backtest_results.json") -> None:
"""Save all backtest results to JSON."""
data = [r.to_dict() for r in results]
Path(output_path).write_text(json.dumps(data, indent=2))
print(f"\nResults saved to {output_path}")
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
async def main() -> None:
parser = argparse.ArgumentParser(description="Polymarket Arb Bot Backtester")
parser.add_argument("--mode", choices=["synthetic", "historical", "both"],
default="synthetic", help="Backtest mode")
parser.add_argument("--asset", nargs="+", default=["BTC", "ETH", "SOL"],
help="Assets to backtest")
parser.add_argument("--timeframe", nargs="+", default=["5M", "15M"],
help="Timeframes")
parser.add_argument("--windows", type=int, default=1000,
help="Number of windows for synthetic mode")
parser.add_argument("--days", type=int, default=7,
help="Days of history for historical mode")
parser.add_argument("--balance", type=float, default=10000.0,
help="Starting balance")
parser.add_argument("--output", default="backtest_results.json",
help="Output JSON file")
args = parser.parse_args()
bt = Backtester(balance=args.balance)
all_results = []
if args.mode in ("synthetic", "both"):
print("\n>>> SYNTHETIC BACKTEST <<<\n")
vol_map = {"BTC": 0.3, "ETH": 0.4, "SOL": 0.6}
for asset in args.asset:
for tf in args.timeframe:
result = await bt.run_synthetic(
asset=asset,
timeframe=tf,
num_windows=args.windows,
avg_volatility_pct=vol_map.get(asset, 0.3),
)
print_results(result)
all_results.append(result)
if args.mode in ("historical", "both"):
print("\n>>> HISTORICAL BACKTEST <<<\n")
for asset in args.asset:
for tf in args.timeframe:
print(f"Fetching {args.days} days of {asset} data...")
result = await bt.run_historical(
asset=asset,
timeframe=tf,
days=args.days,
)
print_results(result)
all_results.append(result)
if all_results:
save_results(all_results, args.output)
# Summary
print("\n" + "=" * 65)
print(" COMBINED SUMMARY")
print("=" * 65)
total_pnl = sum(r.total_pnl for r in all_results)
total_trades = sum(r.total_trades for r in all_results)
total_wins = sum(r.wins for r in all_results)
print(f" Total Trades: {total_trades}")
print(f" Overall PnL: ${total_pnl:+,.2f}")
print(f" Overall WR: {total_wins / total_trades * 100:.1f}%" if total_trades > 0 else " N/A")
print("=" * 65)
if __name__ == "__main__":
asyncio.run(main())