"""Paper trading wrapper for the Polymarket Temporal Arbitrage Bot. Runs the same event loop as ``src.main`` but forces paper-trading mode, adds a simulated order execution layer that tracks virtual positions and PnL, and prints periodic performance summaries. Usage:: python paper_trade.py """ from __future__ import annotations import asyncio import signal import sys import time import uuid from collections import deque from dataclasses import dataclass, field from typing import Optional import structlog from src.config import ( BinanceConfig, Config, FeesConfig, GeneralConfig, NotificationConfig, PolymarketConfig, RiskConfig, SpreadCaptureConfig, SumToOneConfig, TemporalArbConfig, load_config, ) from src.data import TradeDB from src.data.models import ( ActiveMarket, Asset, Direction, OrderBookSnapshot, Signal, Timeframe, Trade, TradeStatus, WindowState, ) from src.feeds import BinanceFeed, PolymarketFeed from src.market import MarketDiscovery, WindowTracker from src.market.oracle import OracleMonitor from src.strategy import SignalAggregator from src.utils import get_logger, setup_logging # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _STATUS_INTERVAL_S: float = 30.0 _SUMMARY_INTERVAL_S: float = 300.0 # 5 minutes _DISCOVERY_INTERVAL_S: float = 30.0 _BANNER = r""" ____ _____ _ | _ \ __ _ _ __ ___ _ __ |_ _| __ __ _ __| | ___ | |_) / _` | '_ \ / _ \ '__| | || '__/ _` |/ _` |/ _ \ | __/ (_| | |_) | __/ | | || | | (_| | (_| | __/ |_| \__,_| .__/ \___|_| |_||_| \__,_|\__,_|\___| |_| """ # --------------------------------------------------------------------------- # Virtual position tracking # --------------------------------------------------------------------------- @dataclass class VirtualPosition: """A simulated open position.""" position_id: str asset: Asset timeframe: Timeframe direction: Direction token_id: str entry_price: float size: int opened_at: float = field(default_factory=time.time) @dataclass class VirtualTradeRecord: """A completed (closed) virtual trade.""" trade_id: str asset: Asset timeframe: Timeframe direction: Direction token_id: str entry_price: float exit_price: float size: int fee: float pnl: float opened_at: float closed_at: float = field(default_factory=time.time) class PaperExecutionEngine: """Simulated order execution that tracks virtual positions and PnL. Accepts trade signals, creates virtual positions, and resolves them when the window expires. All trades are logged to SQLite via :class:`TradeDB`. """ def __init__(self, db: TradeDB, fees: FeesConfig) -> None: self._db = db self._fees = fees self._log = get_logger("paper_engine") self._positions: dict[str, VirtualPosition] = {} self._closed_trades: deque[VirtualTradeRecord] = deque(maxlen=10000) self._total_pnl: float = 0.0 self._win_count: int = 0 self._loss_count: int = 0 # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ @property def active_positions(self) -> list[VirtualPosition]: return list(self._positions.values()) @property def total_pnl(self) -> float: return self._total_pnl @property def trade_count(self) -> int: return len(self._closed_trades) @property def win_rate(self) -> float: total = self._win_count + self._loss_count if total == 0: return 0.0 return self._win_count / total def open_position( self, asset: Asset, timeframe: Timeframe, direction: Direction, token_id: str, entry_price: float, size: int, edge: float, estimated_prob: float, ) -> str: """Simulate opening a position. Returns the position ID.""" position_id = uuid.uuid4().hex[:12] pos = VirtualPosition( position_id=position_id, asset=asset, timeframe=timeframe, direction=direction, token_id=token_id, entry_price=entry_price, size=size, ) self._positions[position_id] = pos # Log the pending trade to DB sig = Signal( direction=direction, asset=asset, timeframe=timeframe, token_id=token_id, price=entry_price, size=size, edge=edge, estimated_prob=estimated_prob, ) trade = Trade( id=f"paper-{position_id}", signal=sig, status=TradeStatus.FILLED, fill_price=entry_price, fill_size=size, fee=0.0, pnl=0.0, ) self._db.log_trade(trade) self._log.info( "virtual_position_opened", position_id=position_id, asset=asset.value, timeframe=timeframe.value, direction=direction.value, entry_price=entry_price, size=size, ) return position_id def close_position(self, position_id: str, exit_price: float) -> Optional[VirtualTradeRecord]: """Simulate closing a position at *exit_price*. Returns the trade record.""" pos = self._positions.pop(position_id, None) if pos is None: self._log.warning("close_unknown_position", position_id=position_id) return None # Calculate fee based on timeframe fee_rate = self._fees.fee_for_timeframe(pos.timeframe.value) notional = pos.size * pos.entry_price fee = notional * fee_rate # PnL: if direction is UP and price went up, the binary resolves to 1 # Simplified: pnl = size * (exit_price - entry_price) - fee pnl = pos.size * (exit_price - pos.entry_price) - fee record = VirtualTradeRecord( trade_id=f"paper-{pos.position_id}", asset=pos.asset, timeframe=pos.timeframe, direction=pos.direction, token_id=pos.token_id, entry_price=pos.entry_price, exit_price=exit_price, size=pos.size, fee=fee, pnl=pnl, opened_at=pos.opened_at, ) self._closed_trades.append(record) self._total_pnl += pnl if pnl > 0: self._win_count += 1 else: self._loss_count += 1 # Update the trade record in DB self._db.update_trade( record.trade_id, fill_price=exit_price, fee=fee, pnl=pnl, status=TradeStatus.FILLED.value, ) self._log.info( "virtual_position_closed", position_id=pos.position_id, asset=pos.asset.value, direction=pos.direction.value, entry=pos.entry_price, exit=exit_price, pnl=round(pnl, 4), fee=round(fee, 4), total_pnl=round(self._total_pnl, 4), ) return record def print_summary(self) -> None: """Print a performance summary to the log.""" self._log.info( "paper_trade_summary", total_pnl=round(self._total_pnl, 4), trade_count=self.trade_count, win_rate=round(self.win_rate * 100, 1), active_positions=len(self._positions), positions=[ { "id": p.position_id, "asset": p.asset.value, "dir": p.direction.value, "entry": p.entry_price, "size": p.size, } for p in self._positions.values() ], ) # --------------------------------------------------------------------------- # Paper trading bot # --------------------------------------------------------------------------- class PaperTradingBot: """Orchestrates Phase 1 components with a paper execution layer.""" def __init__(self, config: Config) -> None: self._cfg = config self._log = get_logger("paper_bot") self._starting_balance = config.general.starting_balance self._balance = config.general.starting_balance # Components self._db: Optional[TradeDB] = None self._engine: Optional[PaperExecutionEngine] = None self._discovery: Optional[MarketDiscovery] = None self._tracker: Optional[WindowTracker] = None self._binance_feed: Optional[BinanceFeed] = None self._poly_feed: Optional[PolymarketFeed] = None self._signal_agg: Optional[SignalAggregator] = None self._oracle: Optional[OracleMonitor] = None # Discovered markets cache (keyed by condition_id to prevent duplicates) self._active_markets: dict[str, ActiveMarket] = {} # Live orderbook state (token_id → snapshot) self._orderbooks: dict[str, OrderBookSnapshot] = {} # Queue drop counter for monitoring self._queue_drop_count: int = 0 # Pending strategy evaluations (initialized in start()) self._eval_queue: Optional[asyncio.Queue] = None # Shutdown coordination self._shutdown_event = asyncio.Event() # ------------------------------------------------------------------ # Callbacks # ------------------------------------------------------------------ def _on_binance_price( self, symbol: str, price: float, timestamp: float, volume: float ) -> None: """Process each Binance trade tick — also triggers strategy evaluation.""" self._log.debug( "binance_price", symbol=symbol, price=price, volume=volume, ) if self._tracker is not None: self._tracker.update_price(symbol, price, timestamp) # Trigger strategy evaluation for each timeframe if self._signal_agg is None or self._tracker is None: return for tf_str in self._cfg.general.timeframes: window = self._tracker.get_window(symbol, tf_str) if window is None or window.start_price is None: continue # If no real Polymarket orderbooks, simulate them every tick orderbooks = self._orderbooks is_simulated = window.market is not None and window.market.condition_id.startswith("sim_") if window.market is None or is_simulated: orderbooks = self._simulate_orderbooks(symbol, price, window) # Now window.market is guaranteed to be set if window.market is not None and self._eval_queue is not None: try: self._eval_queue.put_nowait((symbol, price, window, orderbooks)) except asyncio.QueueFull: self._queue_drop_count += 1 if self._queue_drop_count % 100 == 1: self._log.warning("eval_queue_full", drops=self._queue_drop_count) except Exception: pass def _on_orderbook_update( self, token_id: str, snapshot: OrderBookSnapshot ) -> None: """Process each Polymarket orderbook update — store for strategy use.""" self._orderbooks[token_id] = snapshot self._log.debug( "poly_orderbook", token_id=token_id[:12], best_bid=snapshot.best_bid, best_ask=snapshot.best_ask, ) def _on_window_change(self, window: WindowState) -> None: """Fired when a price window transitions. *window* is the COMPLETED window (with final current_price and price_change_pct). Resolve open positions and snapshot to DB. """ self._log.info( "window_completed", asset=window.asset.value, timeframe=window.timeframe.value, start_price=window.start_price, end_price=window.current_price, change_pct=window.price_change_pct, window_start=window.window_start_time, window_end=window.window_end_time, ) # Resolve positions from the completed window self._on_window_change_resolve(window) # Snapshot completed window to DB if self._db is not None and window.start_price is not None: try: self._db.log_window(window) except Exception: self._log.exception("window_snapshot_failed") def _on_signal(self, signal: Signal) -> None: """Handle a trading signal from the strategy engine — open a virtual position.""" if self._engine is None: return # Check balance cost = signal.price * signal.size if cost > self._balance * 0.5: # Don't risk more than 50% on one trade self._log.warning("signal_rejected_balance", cost=round(cost, 2), balance=round(self._balance, 2)) return position_id = self._engine.open_position( asset=signal.asset, timeframe=signal.timeframe, direction=signal.direction, token_id=signal.token_id, entry_price=signal.price, size=signal.size, edge=signal.edge, estimated_prob=signal.estimated_prob, ) self._log.info( "paper_trade_opened", position_id=position_id, asset=signal.asset.value, direction=signal.direction.value, price=signal.price, size=signal.size, edge=round(signal.edge, 4), ) def _on_window_change_resolve(self, window: WindowState) -> None: """When a window changes, resolve any open positions from the PREVIOUS window. If the price went UP and we held UP, we win ($1 payout). If the price went DOWN and we held DOWN, we win ($1 payout). Otherwise, we lose ($0 payout). """ if self._engine is None: return # Check if any active positions match this asset+timeframe for pos in list(self._engine.active_positions): if pos.asset == window.asset and pos.timeframe == window.timeframe: # Determine outcome from the COMPLETED window's price change if window.price_change_pct is not None: actual_up = window.price_change_pct > 0 position_won = ( (pos.direction == Direction.UP and actual_up) or (pos.direction == Direction.DOWN and not actual_up) ) exit_price = 1.0 if position_won else 0.0 else: # No price data — treat as loss exit_price = 0.0 record = self._engine.close_position(pos.position_id, exit_price) if record: # Update balance self._balance = self._starting_balance + self._engine.total_pnl self._log.info( "paper_trade_resolved", asset=pos.asset.value, direction=pos.direction.value, pnl=round(record.pnl, 4), balance=round(self._balance, 2), won=record.pnl > 0, ) def _simulate_orderbooks( self, symbol: str, cex_price: float, window: WindowState ) -> dict[str, OrderBookSnapshot]: """Generate simulated Polymarket orderbooks when no real market exists. Simulates the oracle-lag effect: Polymarket odds lag behind the CEX price. A bigger price move = higher true prob, but the simulated Polymarket price adjusts more slowly. """ import random from src.data.models import OrderBookLevel if window.start_price is None or window.start_price <= 0: return {} change_pct = (cex_price - window.start_price) / window.start_price * 100 # Simulate Polymarket's lagging price # Real prob might be 85%, but Polymarket shows ~55-60% (the arb opportunity) lag_factor = 0.35 # Polymarket adjusts at ~35% of true movement speed noise = random.uniform(-0.02, 0.02) if abs(change_pct) < 0.05: # Flat — both sides near 50% up_ask = 0.50 + noise down_ask = 0.50 - noise else: # Up direction confirmed on CEX # True prob: ~70-90%, but Polymarket shows ~52-65% poly_adjustment = abs(change_pct) * lag_factor if change_pct > 0: up_ask = min(0.65, 0.50 + poly_adjustment + noise) down_ask = max(0.35, 1.0 - up_ask - 0.02) else: down_ask = min(0.65, 0.50 + poly_adjustment + noise) up_ask = max(0.35, 1.0 - down_ask - 0.02) up_ask = round(max(0.01, min(0.99, up_ask)), 2) down_ask = round(max(0.01, min(0.99, down_ask)), 2) # Create synthetic token IDs up_token = f"sim_up_{symbol}_{window.timeframe.value}" down_token = f"sim_down_{symbol}_{window.timeframe.value}" # Ensure window has a simulated market for signal generation if window.market is None: sim_market = ActiveMarket( condition_id=f"sim_{symbol}_{window.timeframe.value}", up_token_id=up_token, down_token_id=down_token, asset=window.asset, timeframe=window.timeframe, end_date="", question=f"Simulated {symbol} {window.timeframe.value}", ) window.market = sim_market return { up_token: OrderBookSnapshot( token_id=up_token, asks=[OrderBookLevel(price=up_ask, size=10000)], bids=[OrderBookLevel(price=up_ask - 0.02, size=10000)], ), down_token: OrderBookSnapshot( token_id=down_token, asks=[OrderBookLevel(price=down_ask, size=10000)], bids=[OrderBookLevel(price=down_ask - 0.02, size=10000)], ), } def _on_new_markets(self, markets: list[ActiveMarket]) -> None: """Callback when MarketDiscovery finds new markets.""" for mkt in markets: self._active_markets[mkt.condition_id] = mkt if self._tracker is not None: for mkt in markets: self._tracker.link_market( asset=mkt.asset.value, timeframe=mkt.timeframe.value, market=mkt, ) if self._poly_feed is not None: for mkt in markets: self._poly_feed.subscribe_market(mkt.up_token_id) self._poly_feed.subscribe_market(mkt.down_token_id) # ------------------------------------------------------------------ # Status and summary loops # ------------------------------------------------------------------ async def _strategy_eval_loop(self) -> None: """Consume price ticks from the queue and run strategy evaluations.""" eval_count = 0 while not self._shutdown_event.is_set(): try: symbol, price, window, orderbooks = await asyncio.wait_for( self._eval_queue.get(), timeout=1.0 ) except asyncio.TimeoutError: continue except Exception: continue eval_count += 1 if eval_count % 100 == 1: self._log.info( "strategy_eval_tick", eval_count=eval_count, symbol=symbol, price=price, change_pct=round(window.price_change_pct or 0, 4), has_market=window.market is not None, orderbook_tokens=list(orderbooks.keys())[:2], ) if self._signal_agg is not None: try: await self._signal_agg.on_price_tick( symbol=symbol, cex_price=price, window=window, orderbooks=orderbooks, ) except Exception: self._log.exception("strategy_eval_error") async def _status_loop(self) -> None: """Log a status summary every ``_STATUS_INTERVAL_S`` seconds.""" while not self._shutdown_event.is_set(): try: await asyncio.wait_for( self._shutdown_event.wait(), timeout=_STATUS_INTERVAL_S, ) break except asyncio.TimeoutError: pass windows = ( self._tracker.get_all_active_windows() if self._tracker else [] ) latest_prices: dict[str, Optional[float]] = {} if self._binance_feed is not None: for sym in self._cfg.general.assets: latest_prices[sym] = self._binance_feed.get_latest_price(sym) # Update balance from PnL pnl = self._engine.total_pnl if self._engine else 0.0 self._balance = self._starting_balance + pnl # Record balance snapshot to DB if self._db is not None: self._db.log_balance(self._balance, pnl, event="periodic") self._log.info( "status", mode="paper", balance=round(self._balance, 2), pnl=round(pnl, 2), binance_connected=( self._binance_feed.is_connected if self._binance_feed else False ), polymarket_connected=( self._poly_feed.is_connected if self._poly_feed else False ), active_windows=len(windows), active_markets=len(self._active_markets), latest_prices=latest_prices, ) async def _oracle_snapshot_loop(self) -> None: """Log oracle vs CEX deviation to DB every 30 seconds.""" while not self._shutdown_event.is_set(): try: await asyncio.wait_for( self._shutdown_event.wait(), timeout=30.0, ) break except asyncio.TimeoutError: pass if self._oracle is None or self._binance_feed is None or self._db is None: continue for asset in self._cfg.general.assets: cex_price = self._binance_feed.get_latest_price(asset) oracle_price = self._oracle._last_oracle_prices.get(asset) if cex_price is None or oracle_price is None: continue deviation = self._oracle.get_oracle_vs_cex_deviation(asset, cex_price) lag = self._oracle.get_estimated_lag(asset) round_id = self._oracle._last_oracle_round_ids.get(asset, 0) if deviation is not None and lag is not None: self._db.log_oracle( asset=asset, oracle_price=oracle_price, cex_price=cex_price, deviation_pct=deviation, oracle_lag_sec=lag, oracle_round_id=round_id, ) self._log.info( "oracle_snapshot", stats=self._oracle.get_stats(), ) async def _summary_loop(self) -> None: """Print paper trading summary every ``_SUMMARY_INTERVAL_S`` seconds.""" while not self._shutdown_event.is_set(): try: await asyncio.wait_for( self._shutdown_event.wait(), timeout=_SUMMARY_INTERVAL_S, ) break except asyncio.TimeoutError: pass if self._engine is not None: self._engine.print_summary() if self._db is not None: self._log.info( "db_summary", today_trades=self._db.get_today_trade_count(), today_pnl=round(self._db.get_today_pnl(), 4), total_pnl=round(self._db.get_total_pnl(), 4), ) # Periodic DB maintenance (prune old data, WAL checkpoint) try: self._db.periodic_maintenance(retention_days=7) except Exception: self._log.exception("db_maintenance_error") # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def start(self) -> None: """Initialise all components, then run feeds concurrently.""" self._log.info("paper_bot_initialising") # 0. Initialize eval queue (must be in async context) self._eval_queue = asyncio.Queue(maxsize=1000) # 1. Database (paper-specific path) self._db = TradeDB(db_path="paper_trades.db") # 2. Paper execution engine self._engine = PaperExecutionEngine( db=self._db, fees=self._cfg.fees, ) # Record starting balance self._db.log_balance(self._balance, 0.0, event="start") self._log.info("starting_balance", balance=self._balance) # 2b. Signal aggregator (strategy engine) self._signal_agg = SignalAggregator( config=self._cfg, balance=self._balance, ) self._signal_agg.on_signal(self._on_signal) self._log.info("strategy_engine_ready", temporal_arb=self._cfg.temporal_arb.enabled, sum_to_one=self._cfg.sum_to_one.enabled) # 3. Market discovery self._discovery = MarketDiscovery( on_new_markets=self._on_new_markets, ) self._log.info("running_initial_discovery") initial_markets = await self._discovery.discover() self._active_markets = {m.condition_id: m for m in initial_markets} self._log.info( "initial_discovery_complete", count=len(initial_markets), ) # 4. Window tracker assets = [Asset(a) for a in self._cfg.general.assets] timeframes = [Timeframe(t) for t in self._cfg.general.timeframes] self._tracker = WindowTracker(assets=assets, timeframes=timeframes) self._tracker.on_window_change(self._on_window_change) # Link initial markets to windows for mkt in initial_markets: self._tracker.link_market( asset=mkt.asset.value, timeframe=mkt.timeframe.value, market=mkt, ) # 4b. Chainlink oracle monitor self._oracle = OracleMonitor() oracle_ok = await self._oracle.initialize() self._log.info("oracle_init", success=oracle_ok) # 5. Binance feed ws_url = self._cfg.binance.ws_url streams = "/".join( f"{s}@trade" for s in self._cfg.binance.symbols ) full_url = f"{ws_url}?streams={streams}" self._binance_feed = BinanceFeed(url=full_url) self._binance_feed.subscribe(self._on_binance_price) # 6. Polymarket feed self._poly_feed = PolymarketFeed(url=self._cfg.polymarket.ws_url) self._poly_feed.on_orderbook_update(self._on_orderbook_update) for mkt in initial_markets: self._poly_feed.subscribe_market(mkt.up_token_id) self._poly_feed.subscribe_market(mkt.down_token_id) self._log.info("paper_bot_starting_feeds") # 7. Run all tasks concurrently tasks = [ self._binance_feed.start(), self._poly_feed.start(), self._discovery.discover_loop(interval_sec=_DISCOVERY_INTERVAL_S), self._strategy_eval_loop(), self._status_loop(), self._summary_loop(), ] if self._oracle and self._oracle._initialized: tasks.append(self._oracle.poll_loop(interval=5.0, shutdown_event=self._shutdown_event)) tasks.append(self._oracle_snapshot_loop()) try: await asyncio.gather(*tasks) except asyncio.CancelledError: self._log.info("gather_cancelled") async def shutdown(self) -> None: """Gracefully stop all components.""" self._log.info("paper_bot_shutting_down") self._shutdown_event.set() if self._binance_feed is not None: await self._binance_feed.stop() if self._poly_feed is not None: await self._poly_feed.stop() # Close discovery session if self._discovery is not None: await self._discovery._close_session_if_owned() # Print final summary if self._engine is not None: self._engine.print_summary() # Close database connection if self._db is not None: self._db.close() self._log.info("paper_bot_stopped") # --------------------------------------------------------------------------- # Config override helper # --------------------------------------------------------------------------- def _force_paper_config(base: Config) -> Config: """Return a new Config with ``general.mode`` forced to ``'paper'``.""" return Config( general=GeneralConfig( mode="paper", assets=base.general.assets, timeframes=base.general.timeframes, log_level=base.general.log_level, starting_balance=base.general.starting_balance, ), temporal_arb=base.temporal_arb, sum_to_one=base.sum_to_one, spread_capture=base.spread_capture, risk=base.risk, fees=base.fees, binance=base.binance, polymarket=base.polymarket, notifications=base.notifications, ) # --------------------------------------------------------------------------- # Signal handling and entry point # --------------------------------------------------------------------------- def _install_signal_handlers( loop: asyncio.AbstractEventLoop, bot: PaperTradingBot ) -> None: """Register SIGINT / SIGTERM handlers that trigger graceful shutdown.""" for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler( sig, lambda: asyncio.ensure_future(bot.shutdown()), ) except NotImplementedError: pass async def async_main() -> None: """Async entry point for the paper trading wrapper.""" base_config = load_config() config = _force_paper_config(base_config) setup_logging(config.general.log_level) print(_BANNER) print(f" Mode: {config.general.mode} (forced)") print(f" Assets: {', '.join(config.general.assets)}") print(f" Timeframes: {', '.join(config.general.timeframes)}") print(f" Log level: {config.general.log_level}") print(f" Balance: ${config.general.starting_balance:,.2f}") print(f" DB: paper_trades.db") print() bot = PaperTradingBot(config) loop = asyncio.get_running_loop() _install_signal_handlers(loop, bot) try: await bot.start() except KeyboardInterrupt: pass finally: await bot.shutdown() def main() -> None: """Synchronous entry point for ``python paper_trade.py``.""" try: asyncio.run(async_main()) except KeyboardInterrupt: print("\nPaper trading shutdown complete.") sys.exit(0) if __name__ == "__main__": main()