diff --git a/.omc/state/hud-state.json b/.omc/state/hud-state.json index afbff59..bba6379 100644 --- a/.omc/state/hud-state.json +++ b/.omc/state/hud-state.json @@ -1,6 +1,6 @@ { - "timestamp": "2026-03-22T00:28:24.567Z", + "timestamp": "2026-03-28T02:17:33.052Z", "backgroundTasks": [], - "sessionStartTimestamp": "2026-03-22T00:24:25.089Z", - "sessionId": "f97eeeba-f610-41c4-9533-51eb495908b8" + "sessionStartTimestamp": "2026-03-28T02:15:55.812Z", + "sessionId": "a9e3d3e8-6b91-4c7a-9c52-a928a3b1fb98" } \ No newline at end of file diff --git a/.omc/state/hud-stdin-cache.json b/.omc/state/hud-stdin-cache.json index 36968e8..45fed51 100644 --- a/.omc/state/hud-stdin-cache.json +++ b/.omc/state/hud-stdin-cache.json @@ -1 +1 @@ -{"session_id":"f97eeeba-f610-41c4-9533-51eb495908b8","transcript_path":"C:\\Users\\User\\.claude\\projects\\D--PRJ-poly-company-dtr2-poly\\f97eeeba-f610-41c4-9533-51eb495908b8.jsonl","cwd":"D:\\PRJ\\poly_company\\dtr2_poly\\polymarket-arb-bot","model":{"id":"claude-opus-4-6[1m]","display_name":"Opus 4.6 (1M context)"},"workspace":{"current_dir":"D:\\PRJ\\poly_company\\dtr2_poly\\polymarket-arb-bot","project_dir":"D:\\PRJ\\poly_company\\dtr2_poly","added_dirs":["D:/PRJ/poly_company/dtr2_poly"]},"version":"2.1.78","output_style":{"name":"default"},"cost":{"total_cost_usd":0.6991065,"total_duration_ms":317862,"total_api_duration_ms":61650,"total_lines_added":7,"total_lines_removed":0},"context_window":{"total_input_tokens":33,"total_output_tokens":2230,"context_window_size":1000000,"current_usage":{"input_tokens":3,"output_tokens":148,"cache_creation_input_tokens":408,"cache_read_input_tokens":58264},"used_percentage":6,"remaining_percentage":94},"exceeds_200k_tokens":false} \ No newline at end of file +{"session_id":"a9e3d3e8-6b91-4c7a-9c52-a928a3b1fb98","transcript_path":"C:\\Users\\User\\.claude\\projects\\D--PRJ-poly-company-dtr2-poly\\a9e3d3e8-6b91-4c7a-9c52-a928a3b1fb98.jsonl","cwd":"D:\\PRJ\\poly_company\\dtr2_poly\\polymarket-arb-bot","model":{"id":"claude-opus-4-6[1m]","display_name":"Opus 4.6 (1M context)"},"workspace":{"current_dir":"D:\\PRJ\\poly_company\\dtr2_poly\\polymarket-arb-bot","project_dir":"D:\\PRJ\\poly_company\\dtr2_poly","added_dirs":["D:/PRJ/poly_company/dtr2_poly"]},"version":"2.1.78","output_style":{"name":"default"},"cost":{"total_cost_usd":7.426863299999999,"total_duration_ms":1802055,"total_api_duration_ms":717171,"total_lines_added":126,"total_lines_removed":36},"context_window":{"total_input_tokens":166,"total_output_tokens":44526,"context_window_size":1000000,"current_usage":{"input_tokens":3,"output_tokens":125,"cache_creation_input_tokens":104255,"cache_read_input_tokens":24263},"used_percentage":13,"remaining_percentage":87},"exceeds_200k_tokens":false} \ No newline at end of file diff --git a/.omc/state/idle-notif-cooldown.json b/.omc/state/idle-notif-cooldown.json new file mode 100644 index 0000000..94b270f --- /dev/null +++ b/.omc/state/idle-notif-cooldown.json @@ -0,0 +1,3 @@ +{ + "lastSentAt": "2026-03-28T02:24:21.283Z" +} \ No newline at end of file diff --git a/.omc/state/last-tool-error.json b/.omc/state/last-tool-error.json new file mode 100644 index 0000000..051e55f --- /dev/null +++ b/.omc/state/last-tool-error.json @@ -0,0 +1,7 @@ +{ + "tool_name": "Bash", + "tool_input_preview": "{\"command\":\".venv/Scripts/python.exe -c \\\"\\nimport sqlite3, tempfile, os\\nfrom pathlib import Path\\nimport pandas as pd\\nprint('=== Test 5: Dashboard query_db ===')\\n\\ndef query_db(db_path, sql):\\n ...", + "error": "Exit code 1\nTraceback (most recent call last):\r\n File \u001b[35m\"\"\u001b[0m, line \u001b[35m29\u001b[0m, in \u001b[35m\u001b[0m\r\n \u001b[31mos.unlink\u001b[0m\u001b[1;31m(tmp_path)\u001b[0m\r\n \u001b[31m~~~~~~~~~\u001b[0m\u001b[1;31m^^^^^^^^^^\u001b[0m\r\n\u001b[1;35mPermissionError\u001b[0m: \u001b[35m[WinError 32] �ٸ� ���μ����� ������ ��� ���̱� ������ ���μ����� �׼��� �� �� �����ϴ�: 'C:\\\\Users\\\\User\\\\AppData\\\\Local\\\\Temp\\\\tmpe_93w69z.db'\u001b[0m\r\n=== Test 5: Dashboard query_db ===\r\n Normal query OK\r\n Exception handling OK\r\n Nonexistent DB OK", + "timestamp": "2026-03-28T02:23:24.339Z", + "retry_count": 1 +} \ No newline at end of file diff --git a/dashboard/app.py b/dashboard/app.py index 0ad4900..9210077 100644 --- a/dashboard/app.py +++ b/dashboard/app.py @@ -352,10 +352,8 @@ def query_db(db_path: Path, sql: str) -> pd.DataFrame: if not db_path.exists(): return pd.DataFrame() try: - conn = sqlite3.connect(str(db_path)) - df = pd.read_sql_query(sql, conn) - conn.close() - return df + with sqlite3.connect(str(db_path)) as conn: + return pd.read_sql_query(sql, conn) except Exception: return pd.DataFrame() @@ -389,11 +387,11 @@ refresh_rate = st.sidebar.selectbox("Interval (sec)", [5, 10, 30], index=1, labe auto_refresh = st.sidebar.checkbox("Auto-refresh", value=True) # Load data -trades_df = query_db(db_path, "SELECT * FROM trades ORDER BY created_at DESC") -windows_df = query_db(db_path, "SELECT * FROM window_snapshots ORDER BY created_at DESC") -daily_df = query_db(db_path, "SELECT * FROM daily_summary ORDER BY date DESC") -balance_df = query_db(db_path, "SELECT * FROM balance_history ORDER BY timestamp ASC") -oracle_df = query_db(db_path, "SELECT * FROM oracle_snapshots ORDER BY timestamp DESC") +trades_df = query_db(db_path, "SELECT * FROM trades ORDER BY created_at DESC LIMIT 500") +windows_df = query_db(db_path, "SELECT * FROM window_snapshots ORDER BY created_at DESC LIMIT 1000") +daily_df = query_db(db_path, "SELECT * FROM daily_summary ORDER BY date DESC LIMIT 90") +balance_df = query_db(db_path, "SELECT * FROM (SELECT * FROM balance_history ORDER BY timestamp DESC LIMIT 2000) ORDER BY timestamp ASC") +oracle_df = query_db(db_path, "SELECT * FROM (SELECT * FROM oracle_snapshots ORDER BY timestamp DESC LIMIT 2000) ORDER BY timestamp ASC") # ================================================================== diff --git a/paper_trade.py b/paper_trade.py index 3494d4a..d6a3b16 100644 --- a/paper_trade.py +++ b/paper_trade.py @@ -16,6 +16,7 @@ import signal import sys import time import uuid +from collections import deque from dataclasses import dataclass, field from typing import Optional @@ -120,8 +121,10 @@ class PaperExecutionEngine: self._fees = fees self._log = get_logger("paper_engine") self._positions: dict[str, VirtualPosition] = {} - self._closed_trades: list[VirtualTradeRecord] = [] + self._closed_trades: deque[VirtualTradeRecord] = deque(maxlen=10000) self._total_pnl: float = 0.0 + self._win_count: int = 0 + self._loss_count: int = 0 # ------------------------------------------------------------------ # Public API @@ -141,10 +144,10 @@ class PaperExecutionEngine: @property def win_rate(self) -> float: - if not self._closed_trades: + total = self._win_count + self._loss_count + if total == 0: return 0.0 - wins = sum(1 for t in self._closed_trades if t.pnl > 0) - return wins / len(self._closed_trades) + return self._win_count / total def open_position( self, @@ -234,6 +237,10 @@ class PaperExecutionEngine: ) self._closed_trades.append(record) self._total_pnl += pnl + if pnl > 0: + self._win_count += 1 + else: + self._loss_count += 1 # Update the trade record in DB self._db.update_trade( @@ -302,12 +309,15 @@ class PaperTradingBot: self._signal_agg: Optional[SignalAggregator] = None self._oracle: Optional[OracleMonitor] = None - # Discovered markets cache - self._active_markets: list[ActiveMarket] = [] + # Discovered markets cache (keyed by condition_id to prevent duplicates) + self._active_markets: dict[str, ActiveMarket] = {} # Live orderbook state (token_id → snapshot) self._orderbooks: dict[str, OrderBookSnapshot] = {} + # Queue drop counter for monitoring + self._queue_drop_count: int = 0 + # Pending strategy evaluations (initialized in start()) self._eval_queue: Optional[asyncio.Queue] = None @@ -349,7 +359,11 @@ class PaperTradingBot: if window.market is not None and self._eval_queue is not None: try: self._eval_queue.put_nowait((symbol, price, window, orderbooks)) - except (asyncio.QueueFull, Exception): + except asyncio.QueueFull: + self._queue_drop_count += 1 + if self._queue_drop_count % 100 == 1: + self._log.warning("eval_queue_full", drops=self._queue_drop_count) + except Exception: pass def _on_orderbook_update( @@ -532,7 +546,8 @@ class PaperTradingBot: def _on_new_markets(self, markets: list[ActiveMarket]) -> None: """Callback when MarketDiscovery finds new markets.""" - self._active_markets.extend(markets) + for mkt in markets: + self._active_markets[mkt.condition_id] = mkt if self._tracker is not None: for mkt in markets: self._tracker.link_market( @@ -697,6 +712,11 @@ class PaperTradingBot: today_pnl=round(self._db.get_today_pnl(), 4), total_pnl=round(self._db.get_total_pnl(), 4), ) + # Periodic DB maintenance (prune old data, WAL checkpoint) + try: + self._db.periodic_maintenance(retention_days=7) + except Exception: + self._log.exception("db_maintenance_error") # ------------------------------------------------------------------ # Lifecycle @@ -736,7 +756,7 @@ class PaperTradingBot: ) self._log.info("running_initial_discovery") initial_markets = await self._discovery.discover() - self._active_markets = initial_markets + self._active_markets = {m.condition_id: m for m in initial_markets} self._log.info( "initial_discovery_complete", count=len(initial_markets), @@ -790,7 +810,7 @@ class PaperTradingBot: self._summary_loop(), ] if self._oracle and self._oracle._initialized: - tasks.append(self._oracle.poll_loop(interval=5.0)) + tasks.append(self._oracle.poll_loop(interval=5.0, shutdown_event=self._shutdown_event)) tasks.append(self._oracle_snapshot_loop()) try: await asyncio.gather(*tasks) @@ -808,10 +828,18 @@ class PaperTradingBot: if self._poly_feed is not None: await self._poly_feed.stop() + # Close discovery session + if self._discovery is not None: + await self._discovery._close_session_if_owned() + # Print final summary if self._engine is not None: self._engine.print_summary() + # Close database connection + if self._db is not None: + self._db.close() + self._log.info("paper_bot_stopped") diff --git a/src/data/db.py b/src/data/db.py index 3cfeef5..2717981 100644 --- a/src/data/db.py +++ b/src/data/db.py @@ -34,6 +34,26 @@ class TradeDB: self._ensure_tables() self._log.info("database_ready") + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def close(self) -> None: + """Flush WAL and close the underlying SQLite connection.""" + try: + if self._db and hasattr(self._db, "conn") and self._db.conn: + self._db.conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + self._db.conn.close() + self._log.info("database_closed") + except Exception: + self._log.exception("database_close_error") + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + # ------------------------------------------------------------------ # Schema # ------------------------------------------------------------------ @@ -321,3 +341,23 @@ class TradeDB: [today], ).fetchone() return int(row[0]) + + # ------------------------------------------------------------------ + # Maintenance + # ------------------------------------------------------------------ + + def periodic_maintenance(self, retention_days: int = 7) -> None: + """Prune old time-series data and checkpoint WAL.""" + cutoff = time.time() - retention_days * 86400 + pruned = {} + for table in ("oracle_snapshots", "balance_history"): + if table in self._db.table_names(): + before = self._db.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] + self._db.execute(f"DELETE FROM {table} WHERE timestamp < ?", [cutoff]) + after = self._db.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] + pruned[table] = before - after + try: + self._db.conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + except Exception: + pass + self._log.info("db_maintenance", pruned=pruned, retention_days=retention_days) diff --git a/src/main.py b/src/main.py index 5a9f827..a561eea 100644 --- a/src/main.py +++ b/src/main.py @@ -147,8 +147,8 @@ class ArbBot: self._signal_aggregator: Optional[SignalAggregator] = None self._telegram: Optional[TelegramNotifier] = None - # Discovered markets cache - self._active_markets: list[ActiveMarket] = [] + # Discovered markets cache (keyed by condition_id to prevent duplicates) + self._active_markets: dict[str, ActiveMarket] = {} # Orderbook state self._orderbooks: dict[str, OrderBookSnapshot] = {} @@ -302,7 +302,8 @@ class ArbBot: def _on_new_markets(self, markets: list[ActiveMarket]) -> None: """Callback invoked when MarketDiscovery finds new markets.""" - self._active_markets.extend(markets) + for mkt in markets: + self._active_markets[mkt.condition_id] = mkt if self._tracker is not None: _link_markets_to_tracker(markets, self._tracker, self._log) if self._poly_feed is not None: @@ -573,7 +574,7 @@ class ArbBot: ) self._log.info("running_initial_discovery") initial_markets = await self._discovery.discover() - self._active_markets = initial_markets + self._active_markets = {m.condition_id: m for m in initial_markets} self._log.info( "initial_discovery_complete", count=len(initial_markets), @@ -706,6 +707,14 @@ class ArbBot: ) await self._telegram.close() + # Close discovery session + if self._discovery is not None: + await self._discovery._close_session_if_owned() + + # Close database connection + if self._db is not None: + self._db.close() + self._log.info("bot_stopped") diff --git a/src/market/oracle.py b/src/market/oracle.py index 6b7b392..09a6d57 100644 --- a/src/market/oracle.py +++ b/src/market/oracle.py @@ -54,6 +54,7 @@ class OracleMonitor: } def __init__(self, rpc_url: Optional[str] = None) -> None: + from collections import deque self._rpc_url = rpc_url or "https://polygon.drpc.org" self._w3 = None self._contracts: dict[str, object] = {} @@ -61,10 +62,11 @@ class OracleMonitor: self._last_oracle_prices: dict[str, float] = {} self._last_oracle_timestamps: dict[str, float] = {} self._last_oracle_round_ids: dict[str, int] = {} - self._update_intervals: dict[str, list[float]] = { - "BTC": [], "ETH": [], "SOL": [] + self._update_intervals: dict[str, deque[float]] = { + asset: deque(maxlen=100) for asset in self.FEEDS } self._initialized = False + self._shutdown_event: Optional[asyncio.Event] = None async def initialize(self) -> bool: """Initialize web3 connection and contract instances.""" @@ -106,7 +108,10 @@ class OracleMonitor: try: contract = self._contracts[asset] - result = contract.functions.latestRoundData().call() + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, contract.functions.latestRoundData().call + ) round_id, answer, started_at, updated_at, answered_in_round = result decimals = self._decimals.get(asset, 8) @@ -127,11 +132,7 @@ class OracleMonitor: prev_ts = self._last_oracle_timestamps.get(asset) if prev_ts is not None and timestamp > prev_ts: interval = timestamp - prev_ts - intervals = self._update_intervals[asset] - intervals.append(interval) - # Keep last 100 intervals - if len(intervals) > 100: - intervals.pop(0) + self._update_intervals[asset].append(interval) self._last_oracle_prices[asset] = price self._last_oracle_timestamps[asset] = timestamp @@ -159,19 +160,33 @@ class OracleMonitor: return None return (cex_price - oracle_price) / oracle_price * 100 - async def poll_loop(self, interval: float = 5.0) -> None: + async def poll_loop( + self, interval: float = 5.0, shutdown_event: Optional[asyncio.Event] = None + ) -> None: """Continuously poll oracle prices.""" if not self._initialized: log.warning("oracle_poll_not_initialized") return - while True: + if shutdown_event is not None: + self._shutdown_event = shutdown_event + + while not (self._shutdown_event and self._shutdown_event.is_set()): for asset in self.FEEDS: try: await self.get_oracle_price(asset) except Exception: log.exception("oracle_poll_error", asset=asset) - await asyncio.sleep(interval) + try: + if self._shutdown_event: + await asyncio.wait_for( + self._shutdown_event.wait(), timeout=interval + ) + break + else: + await asyncio.sleep(interval) + except asyncio.TimeoutError: + pass def get_stats(self) -> dict: return {