Files
polymarket-arb-bot/paper_trade.py

930 lines
32 KiB
Python
Raw Permalink Normal View History

2026-03-22 09:28:14 +09:00
"""Paper trading wrapper for the Polymarket Temporal Arbitrage Bot.
Runs the same event loop as ``src.main`` but forces paper-trading mode,
adds a simulated order execution layer that tracks virtual positions
and PnL, and prints periodic performance summaries.
Usage::
python paper_trade.py
"""
from __future__ import annotations
import asyncio
import signal
import sys
import time
import uuid
2026-03-28 11:38:53 +09:00
from collections import deque
2026-03-22 09:28:14 +09:00
from dataclasses import dataclass, field
from typing import Optional
import structlog
from src.config import (
BinanceConfig,
Config,
FeesConfig,
GeneralConfig,
NotificationConfig,
PolymarketConfig,
RiskConfig,
SpreadCaptureConfig,
SumToOneConfig,
TemporalArbConfig,
load_config,
)
from src.data import TradeDB
from src.data.models import (
ActiveMarket,
Asset,
Direction,
OrderBookSnapshot,
Signal,
Timeframe,
Trade,
TradeStatus,
WindowState,
)
from src.feeds import BinanceFeed, PolymarketFeed
from src.market import MarketDiscovery, WindowTracker
from src.market.oracle import OracleMonitor
from src.strategy import SignalAggregator
from src.utils import get_logger, setup_logging
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_STATUS_INTERVAL_S: float = 30.0
_SUMMARY_INTERVAL_S: float = 300.0 # 5 minutes
_DISCOVERY_INTERVAL_S: float = 30.0
_BANNER = r"""
____ _____ _
| _ \ __ _ _ __ ___ _ __ |_ _| __ __ _ __| | ___
| |_) / _` | '_ \ / _ \ '__| | || '__/ _` |/ _` |/ _ \
| __/ (_| | |_) | __/ | | || | | (_| | (_| | __/
|_| \__,_| .__/ \___|_| |_||_| \__,_|\__,_|\___|
|_|
"""
# ---------------------------------------------------------------------------
# Virtual position tracking
# ---------------------------------------------------------------------------
@dataclass
class VirtualPosition:
"""A simulated open position."""
position_id: str
asset: Asset
timeframe: Timeframe
direction: Direction
token_id: str
entry_price: float
size: int
opened_at: float = field(default_factory=time.time)
@dataclass
class VirtualTradeRecord:
"""A completed (closed) virtual trade."""
trade_id: str
asset: Asset
timeframe: Timeframe
direction: Direction
token_id: str
entry_price: float
exit_price: float
size: int
fee: float
pnl: float
opened_at: float
closed_at: float = field(default_factory=time.time)
class PaperExecutionEngine:
"""Simulated order execution that tracks virtual positions and PnL.
Accepts trade signals, creates virtual positions, and resolves them
when the window expires. All trades are logged to SQLite via
:class:`TradeDB`.
"""
def __init__(self, db: TradeDB, fees: FeesConfig) -> None:
self._db = db
self._fees = fees
self._log = get_logger("paper_engine")
self._positions: dict[str, VirtualPosition] = {}
2026-03-28 11:38:53 +09:00
self._closed_trades: deque[VirtualTradeRecord] = deque(maxlen=10000)
2026-03-22 09:28:14 +09:00
self._total_pnl: float = 0.0
2026-03-28 11:38:53 +09:00
self._win_count: int = 0
self._loss_count: int = 0
2026-03-22 09:28:14 +09:00
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
@property
def active_positions(self) -> list[VirtualPosition]:
return list(self._positions.values())
@property
def total_pnl(self) -> float:
return self._total_pnl
@property
def trade_count(self) -> int:
return len(self._closed_trades)
@property
def win_rate(self) -> float:
2026-03-28 11:38:53 +09:00
total = self._win_count + self._loss_count
if total == 0:
2026-03-22 09:28:14 +09:00
return 0.0
2026-03-28 11:38:53 +09:00
return self._win_count / total
2026-03-22 09:28:14 +09:00
def open_position(
self,
asset: Asset,
timeframe: Timeframe,
direction: Direction,
token_id: str,
entry_price: float,
size: int,
edge: float,
estimated_prob: float,
) -> str:
"""Simulate opening a position. Returns the position ID."""
position_id = uuid.uuid4().hex[:12]
pos = VirtualPosition(
position_id=position_id,
asset=asset,
timeframe=timeframe,
direction=direction,
token_id=token_id,
entry_price=entry_price,
size=size,
)
self._positions[position_id] = pos
# Log the pending trade to DB
sig = Signal(
direction=direction,
asset=asset,
timeframe=timeframe,
token_id=token_id,
price=entry_price,
size=size,
edge=edge,
estimated_prob=estimated_prob,
)
trade = Trade(
id=f"paper-{position_id}",
signal=sig,
status=TradeStatus.FILLED,
fill_price=entry_price,
fill_size=size,
fee=0.0,
pnl=0.0,
)
self._db.log_trade(trade)
self._log.info(
"virtual_position_opened",
position_id=position_id,
asset=asset.value,
timeframe=timeframe.value,
direction=direction.value,
entry_price=entry_price,
size=size,
)
return position_id
def close_position(self, position_id: str, exit_price: float) -> Optional[VirtualTradeRecord]:
"""Simulate closing a position at *exit_price*. Returns the trade record."""
pos = self._positions.pop(position_id, None)
if pos is None:
self._log.warning("close_unknown_position", position_id=position_id)
return None
# Calculate fee based on timeframe
fee_rate = self._fees.fee_for_timeframe(pos.timeframe.value)
notional = pos.size * pos.entry_price
fee = notional * fee_rate
# PnL: if direction is UP and price went up, the binary resolves to 1
# Simplified: pnl = size * (exit_price - entry_price) - fee
pnl = pos.size * (exit_price - pos.entry_price) - fee
record = VirtualTradeRecord(
trade_id=f"paper-{pos.position_id}",
asset=pos.asset,
timeframe=pos.timeframe,
direction=pos.direction,
token_id=pos.token_id,
entry_price=pos.entry_price,
exit_price=exit_price,
size=pos.size,
fee=fee,
pnl=pnl,
opened_at=pos.opened_at,
)
self._closed_trades.append(record)
self._total_pnl += pnl
2026-03-28 11:38:53 +09:00
if pnl > 0:
self._win_count += 1
else:
self._loss_count += 1
2026-03-22 09:28:14 +09:00
# Update the trade record in DB
self._db.update_trade(
record.trade_id,
fill_price=exit_price,
fee=fee,
pnl=pnl,
status=TradeStatus.FILLED.value,
)
self._log.info(
"virtual_position_closed",
position_id=pos.position_id,
asset=pos.asset.value,
direction=pos.direction.value,
entry=pos.entry_price,
exit=exit_price,
pnl=round(pnl, 4),
fee=round(fee, 4),
total_pnl=round(self._total_pnl, 4),
)
return record
def print_summary(self) -> None:
"""Print a performance summary to the log."""
self._log.info(
"paper_trade_summary",
total_pnl=round(self._total_pnl, 4),
trade_count=self.trade_count,
win_rate=round(self.win_rate * 100, 1),
active_positions=len(self._positions),
positions=[
{
"id": p.position_id,
"asset": p.asset.value,
"dir": p.direction.value,
"entry": p.entry_price,
"size": p.size,
}
for p in self._positions.values()
],
)
# ---------------------------------------------------------------------------
# Paper trading bot
# ---------------------------------------------------------------------------
class PaperTradingBot:
"""Orchestrates Phase 1 components with a paper execution layer."""
def __init__(self, config: Config) -> None:
self._cfg = config
self._log = get_logger("paper_bot")
self._starting_balance = config.general.starting_balance
self._balance = config.general.starting_balance
# Components
self._db: Optional[TradeDB] = None
self._engine: Optional[PaperExecutionEngine] = None
self._discovery: Optional[MarketDiscovery] = None
self._tracker: Optional[WindowTracker] = None
self._binance_feed: Optional[BinanceFeed] = None
self._poly_feed: Optional[PolymarketFeed] = None
self._signal_agg: Optional[SignalAggregator] = None
self._oracle: Optional[OracleMonitor] = None
2026-03-28 11:38:53 +09:00
# Discovered markets cache (keyed by condition_id to prevent duplicates)
self._active_markets: dict[str, ActiveMarket] = {}
2026-03-22 09:28:14 +09:00
# Live orderbook state (token_id → snapshot)
self._orderbooks: dict[str, OrderBookSnapshot] = {}
2026-03-28 11:38:53 +09:00
# Queue drop counter for monitoring
self._queue_drop_count: int = 0
2026-03-22 09:28:14 +09:00
# Pending strategy evaluations (initialized in start())
self._eval_queue: Optional[asyncio.Queue] = None
# Shutdown coordination
self._shutdown_event = asyncio.Event()
# ------------------------------------------------------------------
# Callbacks
# ------------------------------------------------------------------
def _on_binance_price(
self, symbol: str, price: float, timestamp: float, volume: float
) -> None:
"""Process each Binance trade tick — also triggers strategy evaluation."""
self._log.debug(
"binance_price",
symbol=symbol,
price=price,
volume=volume,
)
if self._tracker is not None:
self._tracker.update_price(symbol, price, timestamp)
# Trigger strategy evaluation for each timeframe
if self._signal_agg is None or self._tracker is None:
return
for tf_str in self._cfg.general.timeframes:
window = self._tracker.get_window(symbol, tf_str)
if window is None or window.start_price is None:
continue
# If no real Polymarket orderbooks, simulate them every tick
orderbooks = self._orderbooks
is_simulated = window.market is not None and window.market.condition_id.startswith("sim_")
if window.market is None or is_simulated:
orderbooks = self._simulate_orderbooks(symbol, price, window)
# Now window.market is guaranteed to be set
if window.market is not None and self._eval_queue is not None:
try:
self._eval_queue.put_nowait((symbol, price, window, orderbooks))
2026-03-28 11:38:53 +09:00
except asyncio.QueueFull:
self._queue_drop_count += 1
if self._queue_drop_count % 100 == 1:
self._log.warning("eval_queue_full", drops=self._queue_drop_count)
except Exception:
2026-03-22 09:28:14 +09:00
pass
def _on_orderbook_update(
self, token_id: str, snapshot: OrderBookSnapshot
) -> None:
"""Process each Polymarket orderbook update — store for strategy use."""
self._orderbooks[token_id] = snapshot
self._log.debug(
"poly_orderbook",
token_id=token_id[:12],
best_bid=snapshot.best_bid,
best_ask=snapshot.best_ask,
)
def _on_window_change(self, window: WindowState) -> None:
"""Fired when a price window transitions.
*window* is the COMPLETED window (with final current_price and
price_change_pct). Resolve open positions and snapshot to DB.
"""
self._log.info(
"window_completed",
asset=window.asset.value,
timeframe=window.timeframe.value,
start_price=window.start_price,
end_price=window.current_price,
change_pct=window.price_change_pct,
window_start=window.window_start_time,
window_end=window.window_end_time,
)
# Resolve positions from the completed window
self._on_window_change_resolve(window)
# Snapshot completed window to DB
if self._db is not None and window.start_price is not None:
try:
self._db.log_window(window)
except Exception:
self._log.exception("window_snapshot_failed")
def _on_signal(self, signal: Signal) -> None:
"""Handle a trading signal from the strategy engine — open a virtual position."""
if self._engine is None:
return
# Check balance
cost = signal.price * signal.size
if cost > self._balance * 0.5: # Don't risk more than 50% on one trade
self._log.warning("signal_rejected_balance", cost=round(cost, 2), balance=round(self._balance, 2))
return
position_id = self._engine.open_position(
asset=signal.asset,
timeframe=signal.timeframe,
direction=signal.direction,
token_id=signal.token_id,
entry_price=signal.price,
size=signal.size,
edge=signal.edge,
estimated_prob=signal.estimated_prob,
)
self._log.info(
"paper_trade_opened",
position_id=position_id,
asset=signal.asset.value,
direction=signal.direction.value,
price=signal.price,
size=signal.size,
edge=round(signal.edge, 4),
)
def _on_window_change_resolve(self, window: WindowState) -> None:
"""When a window changes, resolve any open positions from the PREVIOUS window.
If the price went UP and we held UP, we win ($1 payout).
If the price went DOWN and we held DOWN, we win ($1 payout).
Otherwise, we lose ($0 payout).
"""
if self._engine is None:
return
# Check if any active positions match this asset+timeframe
for pos in list(self._engine.active_positions):
if pos.asset == window.asset and pos.timeframe == window.timeframe:
# Determine outcome from the COMPLETED window's price change
if window.price_change_pct is not None:
actual_up = window.price_change_pct > 0
position_won = (
(pos.direction == Direction.UP and actual_up) or
(pos.direction == Direction.DOWN and not actual_up)
)
exit_price = 1.0 if position_won else 0.0
else:
# No price data — treat as loss
exit_price = 0.0
record = self._engine.close_position(pos.position_id, exit_price)
if record:
# Update balance
self._balance = self._starting_balance + self._engine.total_pnl
self._log.info(
"paper_trade_resolved",
asset=pos.asset.value,
direction=pos.direction.value,
pnl=round(record.pnl, 4),
balance=round(self._balance, 2),
won=record.pnl > 0,
)
def _simulate_orderbooks(
self, symbol: str, cex_price: float, window: WindowState
) -> dict[str, OrderBookSnapshot]:
"""Generate simulated Polymarket orderbooks when no real market exists.
Simulates the oracle-lag effect: Polymarket odds lag behind the CEX
price. A bigger price move = higher true prob, but the simulated
Polymarket price adjusts more slowly.
"""
import random
from src.data.models import OrderBookLevel
if window.start_price is None or window.start_price <= 0:
return {}
change_pct = (cex_price - window.start_price) / window.start_price * 100
# Simulate Polymarket's lagging price
# Real prob might be 85%, but Polymarket shows ~55-60% (the arb opportunity)
lag_factor = 0.35 # Polymarket adjusts at ~35% of true movement speed
noise = random.uniform(-0.02, 0.02)
if abs(change_pct) < 0.05:
# Flat — both sides near 50%
up_ask = 0.50 + noise
down_ask = 0.50 - noise
else:
# Up direction confirmed on CEX
# True prob: ~70-90%, but Polymarket shows ~52-65%
poly_adjustment = abs(change_pct) * lag_factor
if change_pct > 0:
up_ask = min(0.65, 0.50 + poly_adjustment + noise)
down_ask = max(0.35, 1.0 - up_ask - 0.02)
else:
down_ask = min(0.65, 0.50 + poly_adjustment + noise)
up_ask = max(0.35, 1.0 - down_ask - 0.02)
up_ask = round(max(0.01, min(0.99, up_ask)), 2)
down_ask = round(max(0.01, min(0.99, down_ask)), 2)
# Create synthetic token IDs
up_token = f"sim_up_{symbol}_{window.timeframe.value}"
down_token = f"sim_down_{symbol}_{window.timeframe.value}"
# Ensure window has a simulated market for signal generation
if window.market is None:
sim_market = ActiveMarket(
condition_id=f"sim_{symbol}_{window.timeframe.value}",
up_token_id=up_token,
down_token_id=down_token,
asset=window.asset,
timeframe=window.timeframe,
end_date="",
question=f"Simulated {symbol} {window.timeframe.value}",
)
window.market = sim_market
return {
up_token: OrderBookSnapshot(
token_id=up_token,
asks=[OrderBookLevel(price=up_ask, size=10000)],
bids=[OrderBookLevel(price=up_ask - 0.02, size=10000)],
),
down_token: OrderBookSnapshot(
token_id=down_token,
asks=[OrderBookLevel(price=down_ask, size=10000)],
bids=[OrderBookLevel(price=down_ask - 0.02, size=10000)],
),
}
def _on_new_markets(self, markets: list[ActiveMarket]) -> None:
"""Callback when MarketDiscovery finds new markets."""
2026-03-28 11:38:53 +09:00
for mkt in markets:
self._active_markets[mkt.condition_id] = mkt
2026-03-22 09:28:14 +09:00
if self._tracker is not None:
for mkt in markets:
self._tracker.link_market(
asset=mkt.asset.value,
timeframe=mkt.timeframe.value,
market=mkt,
)
if self._poly_feed is not None:
for mkt in markets:
self._poly_feed.subscribe_market(mkt.up_token_id)
self._poly_feed.subscribe_market(mkt.down_token_id)
# ------------------------------------------------------------------
# Status and summary loops
# ------------------------------------------------------------------
async def _strategy_eval_loop(self) -> None:
"""Consume price ticks from the queue and run strategy evaluations."""
eval_count = 0
while not self._shutdown_event.is_set():
try:
symbol, price, window, orderbooks = await asyncio.wait_for(
self._eval_queue.get(), timeout=1.0
)
except asyncio.TimeoutError:
continue
except Exception:
continue
eval_count += 1
if eval_count % 100 == 1:
self._log.info(
"strategy_eval_tick",
eval_count=eval_count,
symbol=symbol,
price=price,
change_pct=round(window.price_change_pct or 0, 4),
has_market=window.market is not None,
orderbook_tokens=list(orderbooks.keys())[:2],
)
if self._signal_agg is not None:
try:
await self._signal_agg.on_price_tick(
symbol=symbol,
cex_price=price,
window=window,
orderbooks=orderbooks,
)
except Exception:
self._log.exception("strategy_eval_error")
async def _status_loop(self) -> None:
"""Log a status summary every ``_STATUS_INTERVAL_S`` seconds."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=_STATUS_INTERVAL_S,
)
break
except asyncio.TimeoutError:
pass
windows = (
self._tracker.get_all_active_windows()
if self._tracker
else []
)
latest_prices: dict[str, Optional[float]] = {}
if self._binance_feed is not None:
for sym in self._cfg.general.assets:
latest_prices[sym] = self._binance_feed.get_latest_price(sym)
# Update balance from PnL
pnl = self._engine.total_pnl if self._engine else 0.0
self._balance = self._starting_balance + pnl
# Record balance snapshot to DB
if self._db is not None:
self._db.log_balance(self._balance, pnl, event="periodic")
self._log.info(
"status",
mode="paper",
balance=round(self._balance, 2),
pnl=round(pnl, 2),
binance_connected=(
self._binance_feed.is_connected
if self._binance_feed
else False
),
polymarket_connected=(
self._poly_feed.is_connected
if self._poly_feed
else False
),
active_windows=len(windows),
active_markets=len(self._active_markets),
latest_prices=latest_prices,
)
async def _oracle_snapshot_loop(self) -> None:
"""Log oracle vs CEX deviation to DB every 30 seconds."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(), timeout=30.0,
)
break
except asyncio.TimeoutError:
pass
if self._oracle is None or self._binance_feed is None or self._db is None:
continue
for asset in self._cfg.general.assets:
cex_price = self._binance_feed.get_latest_price(asset)
oracle_price = self._oracle._last_oracle_prices.get(asset)
if cex_price is None or oracle_price is None:
continue
deviation = self._oracle.get_oracle_vs_cex_deviation(asset, cex_price)
lag = self._oracle.get_estimated_lag(asset)
round_id = self._oracle._last_oracle_round_ids.get(asset, 0)
if deviation is not None and lag is not None:
self._db.log_oracle(
asset=asset,
oracle_price=oracle_price,
cex_price=cex_price,
deviation_pct=deviation,
oracle_lag_sec=lag,
oracle_round_id=round_id,
)
self._log.info(
"oracle_snapshot",
stats=self._oracle.get_stats(),
)
async def _summary_loop(self) -> None:
"""Print paper trading summary every ``_SUMMARY_INTERVAL_S`` seconds."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=_SUMMARY_INTERVAL_S,
)
break
except asyncio.TimeoutError:
pass
if self._engine is not None:
self._engine.print_summary()
if self._db is not None:
self._log.info(
"db_summary",
today_trades=self._db.get_today_trade_count(),
today_pnl=round(self._db.get_today_pnl(), 4),
total_pnl=round(self._db.get_total_pnl(), 4),
)
2026-03-28 11:38:53 +09:00
# Periodic DB maintenance (prune old data, WAL checkpoint)
try:
self._db.periodic_maintenance(retention_days=7)
except Exception:
self._log.exception("db_maintenance_error")
2026-03-22 09:28:14 +09:00
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> None:
"""Initialise all components, then run feeds concurrently."""
self._log.info("paper_bot_initialising")
# 0. Initialize eval queue (must be in async context)
self._eval_queue = asyncio.Queue(maxsize=1000)
# 1. Database (paper-specific path)
self._db = TradeDB(db_path="paper_trades.db")
# 2. Paper execution engine
self._engine = PaperExecutionEngine(
db=self._db, fees=self._cfg.fees,
)
# Record starting balance
self._db.log_balance(self._balance, 0.0, event="start")
self._log.info("starting_balance", balance=self._balance)
# 2b. Signal aggregator (strategy engine)
self._signal_agg = SignalAggregator(
config=self._cfg, balance=self._balance,
)
self._signal_agg.on_signal(self._on_signal)
self._log.info("strategy_engine_ready",
temporal_arb=self._cfg.temporal_arb.enabled,
sum_to_one=self._cfg.sum_to_one.enabled)
# 3. Market discovery
self._discovery = MarketDiscovery(
on_new_markets=self._on_new_markets,
)
self._log.info("running_initial_discovery")
initial_markets = await self._discovery.discover()
2026-03-28 11:38:53 +09:00
self._active_markets = {m.condition_id: m for m in initial_markets}
2026-03-22 09:28:14 +09:00
self._log.info(
"initial_discovery_complete",
count=len(initial_markets),
)
# 4. Window tracker
assets = [Asset(a) for a in self._cfg.general.assets]
timeframes = [Timeframe(t) for t in self._cfg.general.timeframes]
self._tracker = WindowTracker(assets=assets, timeframes=timeframes)
self._tracker.on_window_change(self._on_window_change)
# Link initial markets to windows
for mkt in initial_markets:
self._tracker.link_market(
asset=mkt.asset.value,
timeframe=mkt.timeframe.value,
market=mkt,
)
# 4b. Chainlink oracle monitor
self._oracle = OracleMonitor()
oracle_ok = await self._oracle.initialize()
self._log.info("oracle_init", success=oracle_ok)
# 5. Binance feed
ws_url = self._cfg.binance.ws_url
streams = "/".join(
f"{s}@trade" for s in self._cfg.binance.symbols
)
full_url = f"{ws_url}?streams={streams}"
self._binance_feed = BinanceFeed(url=full_url)
self._binance_feed.subscribe(self._on_binance_price)
# 6. Polymarket feed
self._poly_feed = PolymarketFeed(url=self._cfg.polymarket.ws_url)
self._poly_feed.on_orderbook_update(self._on_orderbook_update)
for mkt in initial_markets:
self._poly_feed.subscribe_market(mkt.up_token_id)
self._poly_feed.subscribe_market(mkt.down_token_id)
self._log.info("paper_bot_starting_feeds")
# 7. Run all tasks concurrently
tasks = [
self._binance_feed.start(),
self._poly_feed.start(),
self._discovery.discover_loop(interval_sec=_DISCOVERY_INTERVAL_S),
self._strategy_eval_loop(),
self._status_loop(),
self._summary_loop(),
]
if self._oracle and self._oracle._initialized:
2026-03-28 11:38:53 +09:00
tasks.append(self._oracle.poll_loop(interval=5.0, shutdown_event=self._shutdown_event))
2026-03-22 09:28:14 +09:00
tasks.append(self._oracle_snapshot_loop())
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
self._log.info("gather_cancelled")
async def shutdown(self) -> None:
"""Gracefully stop all components."""
self._log.info("paper_bot_shutting_down")
self._shutdown_event.set()
if self._binance_feed is not None:
await self._binance_feed.stop()
if self._poly_feed is not None:
await self._poly_feed.stop()
2026-03-28 11:38:53 +09:00
# Close discovery session
if self._discovery is not None:
await self._discovery._close_session_if_owned()
2026-03-22 09:28:14 +09:00
# Print final summary
if self._engine is not None:
self._engine.print_summary()
2026-03-28 11:38:53 +09:00
# Close database connection
if self._db is not None:
self._db.close()
2026-03-22 09:28:14 +09:00
self._log.info("paper_bot_stopped")
# ---------------------------------------------------------------------------
# Config override helper
# ---------------------------------------------------------------------------
def _force_paper_config(base: Config) -> Config:
"""Return a new Config with ``general.mode`` forced to ``'paper'``."""
return Config(
general=GeneralConfig(
mode="paper",
assets=base.general.assets,
timeframes=base.general.timeframes,
log_level=base.general.log_level,
starting_balance=base.general.starting_balance,
),
temporal_arb=base.temporal_arb,
sum_to_one=base.sum_to_one,
spread_capture=base.spread_capture,
risk=base.risk,
fees=base.fees,
binance=base.binance,
polymarket=base.polymarket,
notifications=base.notifications,
)
# ---------------------------------------------------------------------------
# Signal handling and entry point
# ---------------------------------------------------------------------------
def _install_signal_handlers(
loop: asyncio.AbstractEventLoop, bot: PaperTradingBot
) -> None:
"""Register SIGINT / SIGTERM handlers that trigger graceful shutdown."""
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(
sig,
lambda: asyncio.ensure_future(bot.shutdown()),
)
except NotImplementedError:
pass
async def async_main() -> None:
"""Async entry point for the paper trading wrapper."""
base_config = load_config()
config = _force_paper_config(base_config)
setup_logging(config.general.log_level)
print(_BANNER)
print(f" Mode: {config.general.mode} (forced)")
print(f" Assets: {', '.join(config.general.assets)}")
print(f" Timeframes: {', '.join(config.general.timeframes)}")
print(f" Log level: {config.general.log_level}")
print(f" Balance: ${config.general.starting_balance:,.2f}")
print(f" DB: paper_trades.db")
print()
bot = PaperTradingBot(config)
loop = asyncio.get_running_loop()
_install_signal_handlers(loop, bot)
try:
await bot.start()
except KeyboardInterrupt:
pass
finally:
await bot.shutdown()
def main() -> None:
"""Synchronous entry point for ``python paper_trade.py``."""
try:
asyncio.run(async_main())
except KeyboardInterrupt:
print("\nPaper trading shutdown complete.")
sys.exit(0)
if __name__ == "__main__":
main()