Files
polymarket-arb-bot/paper_trade.py
2026-03-28 11:38:53 +09:00

930 lines
32 KiB
Python

"""Paper trading wrapper for the Polymarket Temporal Arbitrage Bot.
Runs the same event loop as ``src.main`` but forces paper-trading mode,
adds a simulated order execution layer that tracks virtual positions
and PnL, and prints periodic performance summaries.
Usage::
python paper_trade.py
"""
from __future__ import annotations
import asyncio
import signal
import sys
import time
import uuid
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
import structlog
from src.config import (
BinanceConfig,
Config,
FeesConfig,
GeneralConfig,
NotificationConfig,
PolymarketConfig,
RiskConfig,
SpreadCaptureConfig,
SumToOneConfig,
TemporalArbConfig,
load_config,
)
from src.data import TradeDB
from src.data.models import (
ActiveMarket,
Asset,
Direction,
OrderBookSnapshot,
Signal,
Timeframe,
Trade,
TradeStatus,
WindowState,
)
from src.feeds import BinanceFeed, PolymarketFeed
from src.market import MarketDiscovery, WindowTracker
from src.market.oracle import OracleMonitor
from src.strategy import SignalAggregator
from src.utils import get_logger, setup_logging
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_STATUS_INTERVAL_S: float = 30.0
_SUMMARY_INTERVAL_S: float = 300.0 # 5 minutes
_DISCOVERY_INTERVAL_S: float = 30.0
_BANNER = r"""
____ _____ _
| _ \ __ _ _ __ ___ _ __ |_ _| __ __ _ __| | ___
| |_) / _` | '_ \ / _ \ '__| | || '__/ _` |/ _` |/ _ \
| __/ (_| | |_) | __/ | | || | | (_| | (_| | __/
|_| \__,_| .__/ \___|_| |_||_| \__,_|\__,_|\___|
|_|
"""
# ---------------------------------------------------------------------------
# Virtual position tracking
# ---------------------------------------------------------------------------
@dataclass
class VirtualPosition:
"""A simulated open position."""
position_id: str
asset: Asset
timeframe: Timeframe
direction: Direction
token_id: str
entry_price: float
size: int
opened_at: float = field(default_factory=time.time)
@dataclass
class VirtualTradeRecord:
"""A completed (closed) virtual trade."""
trade_id: str
asset: Asset
timeframe: Timeframe
direction: Direction
token_id: str
entry_price: float
exit_price: float
size: int
fee: float
pnl: float
opened_at: float
closed_at: float = field(default_factory=time.time)
class PaperExecutionEngine:
"""Simulated order execution that tracks virtual positions and PnL.
Accepts trade signals, creates virtual positions, and resolves them
when the window expires. All trades are logged to SQLite via
:class:`TradeDB`.
"""
def __init__(self, db: TradeDB, fees: FeesConfig) -> None:
self._db = db
self._fees = fees
self._log = get_logger("paper_engine")
self._positions: dict[str, VirtualPosition] = {}
self._closed_trades: deque[VirtualTradeRecord] = deque(maxlen=10000)
self._total_pnl: float = 0.0
self._win_count: int = 0
self._loss_count: int = 0
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
@property
def active_positions(self) -> list[VirtualPosition]:
return list(self._positions.values())
@property
def total_pnl(self) -> float:
return self._total_pnl
@property
def trade_count(self) -> int:
return len(self._closed_trades)
@property
def win_rate(self) -> float:
total = self._win_count + self._loss_count
if total == 0:
return 0.0
return self._win_count / total
def open_position(
self,
asset: Asset,
timeframe: Timeframe,
direction: Direction,
token_id: str,
entry_price: float,
size: int,
edge: float,
estimated_prob: float,
) -> str:
"""Simulate opening a position. Returns the position ID."""
position_id = uuid.uuid4().hex[:12]
pos = VirtualPosition(
position_id=position_id,
asset=asset,
timeframe=timeframe,
direction=direction,
token_id=token_id,
entry_price=entry_price,
size=size,
)
self._positions[position_id] = pos
# Log the pending trade to DB
sig = Signal(
direction=direction,
asset=asset,
timeframe=timeframe,
token_id=token_id,
price=entry_price,
size=size,
edge=edge,
estimated_prob=estimated_prob,
)
trade = Trade(
id=f"paper-{position_id}",
signal=sig,
status=TradeStatus.FILLED,
fill_price=entry_price,
fill_size=size,
fee=0.0,
pnl=0.0,
)
self._db.log_trade(trade)
self._log.info(
"virtual_position_opened",
position_id=position_id,
asset=asset.value,
timeframe=timeframe.value,
direction=direction.value,
entry_price=entry_price,
size=size,
)
return position_id
def close_position(self, position_id: str, exit_price: float) -> Optional[VirtualTradeRecord]:
"""Simulate closing a position at *exit_price*. Returns the trade record."""
pos = self._positions.pop(position_id, None)
if pos is None:
self._log.warning("close_unknown_position", position_id=position_id)
return None
# Calculate fee based on timeframe
fee_rate = self._fees.fee_for_timeframe(pos.timeframe.value)
notional = pos.size * pos.entry_price
fee = notional * fee_rate
# PnL: if direction is UP and price went up, the binary resolves to 1
# Simplified: pnl = size * (exit_price - entry_price) - fee
pnl = pos.size * (exit_price - pos.entry_price) - fee
record = VirtualTradeRecord(
trade_id=f"paper-{pos.position_id}",
asset=pos.asset,
timeframe=pos.timeframe,
direction=pos.direction,
token_id=pos.token_id,
entry_price=pos.entry_price,
exit_price=exit_price,
size=pos.size,
fee=fee,
pnl=pnl,
opened_at=pos.opened_at,
)
self._closed_trades.append(record)
self._total_pnl += pnl
if pnl > 0:
self._win_count += 1
else:
self._loss_count += 1
# Update the trade record in DB
self._db.update_trade(
record.trade_id,
fill_price=exit_price,
fee=fee,
pnl=pnl,
status=TradeStatus.FILLED.value,
)
self._log.info(
"virtual_position_closed",
position_id=pos.position_id,
asset=pos.asset.value,
direction=pos.direction.value,
entry=pos.entry_price,
exit=exit_price,
pnl=round(pnl, 4),
fee=round(fee, 4),
total_pnl=round(self._total_pnl, 4),
)
return record
def print_summary(self) -> None:
"""Print a performance summary to the log."""
self._log.info(
"paper_trade_summary",
total_pnl=round(self._total_pnl, 4),
trade_count=self.trade_count,
win_rate=round(self.win_rate * 100, 1),
active_positions=len(self._positions),
positions=[
{
"id": p.position_id,
"asset": p.asset.value,
"dir": p.direction.value,
"entry": p.entry_price,
"size": p.size,
}
for p in self._positions.values()
],
)
# ---------------------------------------------------------------------------
# Paper trading bot
# ---------------------------------------------------------------------------
class PaperTradingBot:
"""Orchestrates Phase 1 components with a paper execution layer."""
def __init__(self, config: Config) -> None:
self._cfg = config
self._log = get_logger("paper_bot")
self._starting_balance = config.general.starting_balance
self._balance = config.general.starting_balance
# Components
self._db: Optional[TradeDB] = None
self._engine: Optional[PaperExecutionEngine] = None
self._discovery: Optional[MarketDiscovery] = None
self._tracker: Optional[WindowTracker] = None
self._binance_feed: Optional[BinanceFeed] = None
self._poly_feed: Optional[PolymarketFeed] = None
self._signal_agg: Optional[SignalAggregator] = None
self._oracle: Optional[OracleMonitor] = None
# Discovered markets cache (keyed by condition_id to prevent duplicates)
self._active_markets: dict[str, ActiveMarket] = {}
# Live orderbook state (token_id → snapshot)
self._orderbooks: dict[str, OrderBookSnapshot] = {}
# Queue drop counter for monitoring
self._queue_drop_count: int = 0
# Pending strategy evaluations (initialized in start())
self._eval_queue: Optional[asyncio.Queue] = None
# Shutdown coordination
self._shutdown_event = asyncio.Event()
# ------------------------------------------------------------------
# Callbacks
# ------------------------------------------------------------------
def _on_binance_price(
self, symbol: str, price: float, timestamp: float, volume: float
) -> None:
"""Process each Binance trade tick — also triggers strategy evaluation."""
self._log.debug(
"binance_price",
symbol=symbol,
price=price,
volume=volume,
)
if self._tracker is not None:
self._tracker.update_price(symbol, price, timestamp)
# Trigger strategy evaluation for each timeframe
if self._signal_agg is None or self._tracker is None:
return
for tf_str in self._cfg.general.timeframes:
window = self._tracker.get_window(symbol, tf_str)
if window is None or window.start_price is None:
continue
# If no real Polymarket orderbooks, simulate them every tick
orderbooks = self._orderbooks
is_simulated = window.market is not None and window.market.condition_id.startswith("sim_")
if window.market is None or is_simulated:
orderbooks = self._simulate_orderbooks(symbol, price, window)
# Now window.market is guaranteed to be set
if window.market is not None and self._eval_queue is not None:
try:
self._eval_queue.put_nowait((symbol, price, window, orderbooks))
except asyncio.QueueFull:
self._queue_drop_count += 1
if self._queue_drop_count % 100 == 1:
self._log.warning("eval_queue_full", drops=self._queue_drop_count)
except Exception:
pass
def _on_orderbook_update(
self, token_id: str, snapshot: OrderBookSnapshot
) -> None:
"""Process each Polymarket orderbook update — store for strategy use."""
self._orderbooks[token_id] = snapshot
self._log.debug(
"poly_orderbook",
token_id=token_id[:12],
best_bid=snapshot.best_bid,
best_ask=snapshot.best_ask,
)
def _on_window_change(self, window: WindowState) -> None:
"""Fired when a price window transitions.
*window* is the COMPLETED window (with final current_price and
price_change_pct). Resolve open positions and snapshot to DB.
"""
self._log.info(
"window_completed",
asset=window.asset.value,
timeframe=window.timeframe.value,
start_price=window.start_price,
end_price=window.current_price,
change_pct=window.price_change_pct,
window_start=window.window_start_time,
window_end=window.window_end_time,
)
# Resolve positions from the completed window
self._on_window_change_resolve(window)
# Snapshot completed window to DB
if self._db is not None and window.start_price is not None:
try:
self._db.log_window(window)
except Exception:
self._log.exception("window_snapshot_failed")
def _on_signal(self, signal: Signal) -> None:
"""Handle a trading signal from the strategy engine — open a virtual position."""
if self._engine is None:
return
# Check balance
cost = signal.price * signal.size
if cost > self._balance * 0.5: # Don't risk more than 50% on one trade
self._log.warning("signal_rejected_balance", cost=round(cost, 2), balance=round(self._balance, 2))
return
position_id = self._engine.open_position(
asset=signal.asset,
timeframe=signal.timeframe,
direction=signal.direction,
token_id=signal.token_id,
entry_price=signal.price,
size=signal.size,
edge=signal.edge,
estimated_prob=signal.estimated_prob,
)
self._log.info(
"paper_trade_opened",
position_id=position_id,
asset=signal.asset.value,
direction=signal.direction.value,
price=signal.price,
size=signal.size,
edge=round(signal.edge, 4),
)
def _on_window_change_resolve(self, window: WindowState) -> None:
"""When a window changes, resolve any open positions from the PREVIOUS window.
If the price went UP and we held UP, we win ($1 payout).
If the price went DOWN and we held DOWN, we win ($1 payout).
Otherwise, we lose ($0 payout).
"""
if self._engine is None:
return
# Check if any active positions match this asset+timeframe
for pos in list(self._engine.active_positions):
if pos.asset == window.asset and pos.timeframe == window.timeframe:
# Determine outcome from the COMPLETED window's price change
if window.price_change_pct is not None:
actual_up = window.price_change_pct > 0
position_won = (
(pos.direction == Direction.UP and actual_up) or
(pos.direction == Direction.DOWN and not actual_up)
)
exit_price = 1.0 if position_won else 0.0
else:
# No price data — treat as loss
exit_price = 0.0
record = self._engine.close_position(pos.position_id, exit_price)
if record:
# Update balance
self._balance = self._starting_balance + self._engine.total_pnl
self._log.info(
"paper_trade_resolved",
asset=pos.asset.value,
direction=pos.direction.value,
pnl=round(record.pnl, 4),
balance=round(self._balance, 2),
won=record.pnl > 0,
)
def _simulate_orderbooks(
self, symbol: str, cex_price: float, window: WindowState
) -> dict[str, OrderBookSnapshot]:
"""Generate simulated Polymarket orderbooks when no real market exists.
Simulates the oracle-lag effect: Polymarket odds lag behind the CEX
price. A bigger price move = higher true prob, but the simulated
Polymarket price adjusts more slowly.
"""
import random
from src.data.models import OrderBookLevel
if window.start_price is None or window.start_price <= 0:
return {}
change_pct = (cex_price - window.start_price) / window.start_price * 100
# Simulate Polymarket's lagging price
# Real prob might be 85%, but Polymarket shows ~55-60% (the arb opportunity)
lag_factor = 0.35 # Polymarket adjusts at ~35% of true movement speed
noise = random.uniform(-0.02, 0.02)
if abs(change_pct) < 0.05:
# Flat — both sides near 50%
up_ask = 0.50 + noise
down_ask = 0.50 - noise
else:
# Up direction confirmed on CEX
# True prob: ~70-90%, but Polymarket shows ~52-65%
poly_adjustment = abs(change_pct) * lag_factor
if change_pct > 0:
up_ask = min(0.65, 0.50 + poly_adjustment + noise)
down_ask = max(0.35, 1.0 - up_ask - 0.02)
else:
down_ask = min(0.65, 0.50 + poly_adjustment + noise)
up_ask = max(0.35, 1.0 - down_ask - 0.02)
up_ask = round(max(0.01, min(0.99, up_ask)), 2)
down_ask = round(max(0.01, min(0.99, down_ask)), 2)
# Create synthetic token IDs
up_token = f"sim_up_{symbol}_{window.timeframe.value}"
down_token = f"sim_down_{symbol}_{window.timeframe.value}"
# Ensure window has a simulated market for signal generation
if window.market is None:
sim_market = ActiveMarket(
condition_id=f"sim_{symbol}_{window.timeframe.value}",
up_token_id=up_token,
down_token_id=down_token,
asset=window.asset,
timeframe=window.timeframe,
end_date="",
question=f"Simulated {symbol} {window.timeframe.value}",
)
window.market = sim_market
return {
up_token: OrderBookSnapshot(
token_id=up_token,
asks=[OrderBookLevel(price=up_ask, size=10000)],
bids=[OrderBookLevel(price=up_ask - 0.02, size=10000)],
),
down_token: OrderBookSnapshot(
token_id=down_token,
asks=[OrderBookLevel(price=down_ask, size=10000)],
bids=[OrderBookLevel(price=down_ask - 0.02, size=10000)],
),
}
def _on_new_markets(self, markets: list[ActiveMarket]) -> None:
"""Callback when MarketDiscovery finds new markets."""
for mkt in markets:
self._active_markets[mkt.condition_id] = mkt
if self._tracker is not None:
for mkt in markets:
self._tracker.link_market(
asset=mkt.asset.value,
timeframe=mkt.timeframe.value,
market=mkt,
)
if self._poly_feed is not None:
for mkt in markets:
self._poly_feed.subscribe_market(mkt.up_token_id)
self._poly_feed.subscribe_market(mkt.down_token_id)
# ------------------------------------------------------------------
# Status and summary loops
# ------------------------------------------------------------------
async def _strategy_eval_loop(self) -> None:
"""Consume price ticks from the queue and run strategy evaluations."""
eval_count = 0
while not self._shutdown_event.is_set():
try:
symbol, price, window, orderbooks = await asyncio.wait_for(
self._eval_queue.get(), timeout=1.0
)
except asyncio.TimeoutError:
continue
except Exception:
continue
eval_count += 1
if eval_count % 100 == 1:
self._log.info(
"strategy_eval_tick",
eval_count=eval_count,
symbol=symbol,
price=price,
change_pct=round(window.price_change_pct or 0, 4),
has_market=window.market is not None,
orderbook_tokens=list(orderbooks.keys())[:2],
)
if self._signal_agg is not None:
try:
await self._signal_agg.on_price_tick(
symbol=symbol,
cex_price=price,
window=window,
orderbooks=orderbooks,
)
except Exception:
self._log.exception("strategy_eval_error")
async def _status_loop(self) -> None:
"""Log a status summary every ``_STATUS_INTERVAL_S`` seconds."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=_STATUS_INTERVAL_S,
)
break
except asyncio.TimeoutError:
pass
windows = (
self._tracker.get_all_active_windows()
if self._tracker
else []
)
latest_prices: dict[str, Optional[float]] = {}
if self._binance_feed is not None:
for sym in self._cfg.general.assets:
latest_prices[sym] = self._binance_feed.get_latest_price(sym)
# Update balance from PnL
pnl = self._engine.total_pnl if self._engine else 0.0
self._balance = self._starting_balance + pnl
# Record balance snapshot to DB
if self._db is not None:
self._db.log_balance(self._balance, pnl, event="periodic")
self._log.info(
"status",
mode="paper",
balance=round(self._balance, 2),
pnl=round(pnl, 2),
binance_connected=(
self._binance_feed.is_connected
if self._binance_feed
else False
),
polymarket_connected=(
self._poly_feed.is_connected
if self._poly_feed
else False
),
active_windows=len(windows),
active_markets=len(self._active_markets),
latest_prices=latest_prices,
)
async def _oracle_snapshot_loop(self) -> None:
"""Log oracle vs CEX deviation to DB every 30 seconds."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(), timeout=30.0,
)
break
except asyncio.TimeoutError:
pass
if self._oracle is None or self._binance_feed is None or self._db is None:
continue
for asset in self._cfg.general.assets:
cex_price = self._binance_feed.get_latest_price(asset)
oracle_price = self._oracle._last_oracle_prices.get(asset)
if cex_price is None or oracle_price is None:
continue
deviation = self._oracle.get_oracle_vs_cex_deviation(asset, cex_price)
lag = self._oracle.get_estimated_lag(asset)
round_id = self._oracle._last_oracle_round_ids.get(asset, 0)
if deviation is not None and lag is not None:
self._db.log_oracle(
asset=asset,
oracle_price=oracle_price,
cex_price=cex_price,
deviation_pct=deviation,
oracle_lag_sec=lag,
oracle_round_id=round_id,
)
self._log.info(
"oracle_snapshot",
stats=self._oracle.get_stats(),
)
async def _summary_loop(self) -> None:
"""Print paper trading summary every ``_SUMMARY_INTERVAL_S`` seconds."""
while not self._shutdown_event.is_set():
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=_SUMMARY_INTERVAL_S,
)
break
except asyncio.TimeoutError:
pass
if self._engine is not None:
self._engine.print_summary()
if self._db is not None:
self._log.info(
"db_summary",
today_trades=self._db.get_today_trade_count(),
today_pnl=round(self._db.get_today_pnl(), 4),
total_pnl=round(self._db.get_total_pnl(), 4),
)
# Periodic DB maintenance (prune old data, WAL checkpoint)
try:
self._db.periodic_maintenance(retention_days=7)
except Exception:
self._log.exception("db_maintenance_error")
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
async def start(self) -> None:
"""Initialise all components, then run feeds concurrently."""
self._log.info("paper_bot_initialising")
# 0. Initialize eval queue (must be in async context)
self._eval_queue = asyncio.Queue(maxsize=1000)
# 1. Database (paper-specific path)
self._db = TradeDB(db_path="paper_trades.db")
# 2. Paper execution engine
self._engine = PaperExecutionEngine(
db=self._db, fees=self._cfg.fees,
)
# Record starting balance
self._db.log_balance(self._balance, 0.0, event="start")
self._log.info("starting_balance", balance=self._balance)
# 2b. Signal aggregator (strategy engine)
self._signal_agg = SignalAggregator(
config=self._cfg, balance=self._balance,
)
self._signal_agg.on_signal(self._on_signal)
self._log.info("strategy_engine_ready",
temporal_arb=self._cfg.temporal_arb.enabled,
sum_to_one=self._cfg.sum_to_one.enabled)
# 3. Market discovery
self._discovery = MarketDiscovery(
on_new_markets=self._on_new_markets,
)
self._log.info("running_initial_discovery")
initial_markets = await self._discovery.discover()
self._active_markets = {m.condition_id: m for m in initial_markets}
self._log.info(
"initial_discovery_complete",
count=len(initial_markets),
)
# 4. Window tracker
assets = [Asset(a) for a in self._cfg.general.assets]
timeframes = [Timeframe(t) for t in self._cfg.general.timeframes]
self._tracker = WindowTracker(assets=assets, timeframes=timeframes)
self._tracker.on_window_change(self._on_window_change)
# Link initial markets to windows
for mkt in initial_markets:
self._tracker.link_market(
asset=mkt.asset.value,
timeframe=mkt.timeframe.value,
market=mkt,
)
# 4b. Chainlink oracle monitor
self._oracle = OracleMonitor()
oracle_ok = await self._oracle.initialize()
self._log.info("oracle_init", success=oracle_ok)
# 5. Binance feed
ws_url = self._cfg.binance.ws_url
streams = "/".join(
f"{s}@trade" for s in self._cfg.binance.symbols
)
full_url = f"{ws_url}?streams={streams}"
self._binance_feed = BinanceFeed(url=full_url)
self._binance_feed.subscribe(self._on_binance_price)
# 6. Polymarket feed
self._poly_feed = PolymarketFeed(url=self._cfg.polymarket.ws_url)
self._poly_feed.on_orderbook_update(self._on_orderbook_update)
for mkt in initial_markets:
self._poly_feed.subscribe_market(mkt.up_token_id)
self._poly_feed.subscribe_market(mkt.down_token_id)
self._log.info("paper_bot_starting_feeds")
# 7. Run all tasks concurrently
tasks = [
self._binance_feed.start(),
self._poly_feed.start(),
self._discovery.discover_loop(interval_sec=_DISCOVERY_INTERVAL_S),
self._strategy_eval_loop(),
self._status_loop(),
self._summary_loop(),
]
if self._oracle and self._oracle._initialized:
tasks.append(self._oracle.poll_loop(interval=5.0, shutdown_event=self._shutdown_event))
tasks.append(self._oracle_snapshot_loop())
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
self._log.info("gather_cancelled")
async def shutdown(self) -> None:
"""Gracefully stop all components."""
self._log.info("paper_bot_shutting_down")
self._shutdown_event.set()
if self._binance_feed is not None:
await self._binance_feed.stop()
if self._poly_feed is not None:
await self._poly_feed.stop()
# Close discovery session
if self._discovery is not None:
await self._discovery._close_session_if_owned()
# Print final summary
if self._engine is not None:
self._engine.print_summary()
# Close database connection
if self._db is not None:
self._db.close()
self._log.info("paper_bot_stopped")
# ---------------------------------------------------------------------------
# Config override helper
# ---------------------------------------------------------------------------
def _force_paper_config(base: Config) -> Config:
"""Return a new Config with ``general.mode`` forced to ``'paper'``."""
return Config(
general=GeneralConfig(
mode="paper",
assets=base.general.assets,
timeframes=base.general.timeframes,
log_level=base.general.log_level,
starting_balance=base.general.starting_balance,
),
temporal_arb=base.temporal_arb,
sum_to_one=base.sum_to_one,
spread_capture=base.spread_capture,
risk=base.risk,
fees=base.fees,
binance=base.binance,
polymarket=base.polymarket,
notifications=base.notifications,
)
# ---------------------------------------------------------------------------
# Signal handling and entry point
# ---------------------------------------------------------------------------
def _install_signal_handlers(
loop: asyncio.AbstractEventLoop, bot: PaperTradingBot
) -> None:
"""Register SIGINT / SIGTERM handlers that trigger graceful shutdown."""
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(
sig,
lambda: asyncio.ensure_future(bot.shutdown()),
)
except NotImplementedError:
pass
async def async_main() -> None:
"""Async entry point for the paper trading wrapper."""
base_config = load_config()
config = _force_paper_config(base_config)
setup_logging(config.general.log_level)
print(_BANNER)
print(f" Mode: {config.general.mode} (forced)")
print(f" Assets: {', '.join(config.general.assets)}")
print(f" Timeframes: {', '.join(config.general.timeframes)}")
print(f" Log level: {config.general.log_level}")
print(f" Balance: ${config.general.starting_balance:,.2f}")
print(f" DB: paper_trades.db")
print()
bot = PaperTradingBot(config)
loop = asyncio.get_running_loop()
_install_signal_handlers(loop, bot)
try:
await bot.start()
except KeyboardInterrupt:
pass
finally:
await bot.shutdown()
def main() -> None:
"""Synchronous entry point for ``python paper_trade.py``."""
try:
asyncio.run(async_main())
except KeyboardInterrupt:
print("\nPaper trading shutdown complete.")
sys.exit(0)
if __name__ == "__main__":
main()