"""Main bot orchestrator. Ties together all modules into the main trading loop: DataFeed -> ICTEngine -> MTFAnalyzer -> Confluence -> Signal -> Risk -> Order -> Position """ from __future__ import annotations import asyncio import signal as sys_signal from datetime import datetime from typing import Optional from loguru import logger from config import settings from core.data_feed import DataFeed from core.event_bus import event_bus from database.models import init_db from database.repository import TradingRepository, PositionRecord from execution.exchange_client import ExchangeClient from execution.order_manager import OrderManager from execution.position_manager import PositionManager from indicators.ict_engine import ICTEngine from indicators.multi_timeframe import MultiTimeframeAnalyzer from indicators.confluence import ConfluenceChecker from notification.alert_manager import AlertManager from notification.telegram_bot import TelegramNotifier from risk.risk_manager import RiskManager from risk.drawdown_monitor import DrawdownMonitor from strategy.entry_rules import EntryRules from strategy.exit_rules import ExitRules from strategy.signal_generator import SignalGenerator, TradeSignal class ICTBot: """ICT Smart Money Concepts trading bot orchestrator. Lifecycle: 1. Initialize all components 2. Connect to exchange 3. Enter main loop 4. Each iteration: a. Fetch/update data for all symbols b. Run ICT analysis per symbol c. Generate signals d. Execute approved trades e. Monitor open positions f. Send notifications 5. Graceful shutdown on SIGINT/SIGTERM """ def __init__(self, paper_mode: bool = False): # Core - use paper exchange if in paper/sandbox mode if paper_mode or settings.SANDBOX_MODE: from execution.paper_exchange import PaperExchangeClient self.exchange_client = PaperExchangeClient(initial_balance=300.0) logger.info("Using PAPER exchange client (simulated orders)") else: self.exchange_client = ExchangeClient() self.data_feed = DataFeed(self.exchange_client) # Indicators self.ict_engine = ICTEngine() self.mtf_analyzer = MultiTimeframeAnalyzer(self.ict_engine) self.confluence_checker = ConfluenceChecker() # Strategy self.entry_rules = EntryRules() self.exit_rules = ExitRules() self.signal_generator = SignalGenerator( ict_engine=self.ict_engine, mtf_analyzer=self.mtf_analyzer, confluence_checker=self.confluence_checker, entry_rules=self.entry_rules, exit_rules=self.exit_rules, ) # Risk self.risk_manager = RiskManager() self.drawdown_monitor = DrawdownMonitor( max_drawdown_limit=settings.MAX_DRAWDOWN ) # Execution self.order_manager = OrderManager(self.exchange_client, self.risk_manager) self.position_manager = PositionManager( self.order_manager, self.risk_manager, self.exit_rules ) # Database self.repo = TradingRepository() # Notification self.notifier = TelegramNotifier() self.alert_manager = AlertManager([self.notifier]) # State self._running = False self._loop_interval = 60 # seconds between analysis cycles # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ async def start(self) -> None: """Initialise all components and start the main loop.""" logger.info("=" * 60) logger.info(" ICT Smart Money Concepts Trading Bot") logger.info(" Exchange : {}", settings.EXCHANGE_ID) logger.info(" Sandbox : {}", settings.SANDBOX_MODE) logger.info(" Pairs : {}", settings.TRADING_PAIRS) logger.info("=" * 60) # Database self.repo.connect() # Exchange await self.exchange_client.connect() await self.data_feed.connect() # Futures initialization: set leverage and margin mode per symbol for symbol in settings.TRADING_PAIRS: await self.exchange_client.set_leverage(symbol, settings.DEFAULT_LEVERAGE) await self.exchange_client.set_margin_mode(symbol, "isolated") # Warm up data for symbol in settings.TRADING_PAIRS: logger.info("Warming up data for {}", symbol) await self.data_feed.fetch_multi_timeframe(symbol) self._running = True logger.info("Bot started -- entering main loop") try: await self._main_loop() except asyncio.CancelledError: logger.info("Bot cancelled") except Exception as e: logger.exception("Unhandled error in main loop: {}", e) await self.alert_manager.notify_error(str(e)) self.risk_manager.emergency_stop() finally: await self.stop() async def stop(self) -> None: """Graceful shutdown.""" self._running = False logger.info("Shutting down...") # Close all positions if emergency if self.risk_manager.is_stopped: closed = await self.position_manager.close_all("EMERGENCY") for p in closed: await self.alert_manager.notify_emergency( f"Emergency close: {p.symbol} PnL={p.realized_pnl:.2f}" ) await self.data_feed.disconnect() self.repo.close() logger.info("Bot stopped") # ------------------------------------------------------------------ # Main loop # ------------------------------------------------------------------ async def _main_loop(self) -> None: """Core trading loop.""" while self._running: try: for symbol in settings.TRADING_PAIRS: await self._process_symbol(symbol) # Check drawdown balance_info = await self.exchange_client.fetch_balance() equity = float(balance_info.get("total", {}).get("USDT", 0)) dd_state = self.drawdown_monitor.update(equity) self.risk_manager.update_equity(equity) if self.drawdown_monitor.is_breached(): logger.critical("Max drawdown breached! Emergency stop.") self.risk_manager.emergency_stop() await self.alert_manager.notify_emergency( f"Max drawdown {dd_state.max_drawdown:.2%} exceeded limit" ) break except Exception as e: logger.error("Loop iteration error: {}", e) await self.alert_manager.notify_error(str(e)) await asyncio.sleep(self._loop_interval) async def _process_symbol(self, symbol: str) -> None: """Run the full analysis and execution pipeline for one symbol.""" # 1. Update data await self.data_feed.fetch_multi_timeframe(symbol) # 2. Check existing positions for exit ltf = settings.LTF_TIMEFRAME try: ltf_df = self.data_feed.get_dataframe(symbol, ltf) current_price = float(ltf_df["close"].iloc[-1]) ltf_signals = self.ict_engine.analyze(ltf_df) closed = await self.position_manager.update_positions( symbol, current_price, ltf_signals ) for pos in closed: await self._on_position_closed(pos) except Exception as e: logger.error("Position update error for {}: {}", symbol, e) # 3. Generate new signals if self.risk_manager.is_stopped: return try: signal = await self.signal_generator.generate(symbol, self.data_feed) if signal: await self._on_signal(signal) except Exception as e: logger.exception("Signal generation error for {}: {}", symbol, e) async def _on_signal(self, signal: TradeSignal) -> None: """Handle a new trade signal.""" # Notify await self.alert_manager.notify_signal({ "symbol": signal.symbol, "direction": signal.direction.value, "entry_price": signal.entry_price, "stop_loss": signal.stop_loss, "take_profit": signal.take_profit, "confluence": signal.confidence, "reasons": signal.reasons, }) # Execute balance_info = await self.exchange_client.fetch_balance() balance = float(balance_info.get("free", {}).get("USDT", 0)) order = await self.order_manager.execute_signal(signal, balance) if order is None: return # Track position position = self.position_manager.open_position(signal, order) # Persist self.repo.save_position(PositionRecord( id=position.id, symbol=position.symbol, direction=position.direction.value, entry_price=position.entry_price, amount=position.amount, stop_loss=position.stop_loss, take_profit=position.take_profit, status="OPEN", opened_at=position.opened_at.isoformat(), confluence_score=position.confluence_score, entry_reasons=str(position.entry_reasons), )) await self.alert_manager.notify_fill({ "symbol": signal.symbol, "side": "BUY" if signal.direction.value == "LONG" else "SELL", "amount": position.amount, "price": position.entry_price, }) async def _on_position_closed(self, position) -> None: """Handle a closed position -- persist and notify.""" self.repo.save_position(PositionRecord( id=position.id, symbol=position.symbol, direction=position.direction.value, entry_price=position.entry_price, amount=position.amount, stop_loss=position.stop_loss, take_profit=position.take_profit, realized_pnl=position.realized_pnl, status="CLOSED", opened_at=position.opened_at.isoformat(), closed_at=position.closed_at.isoformat() if position.closed_at else None, close_reason=position.close_reason, confluence_score=position.confluence_score, entry_reasons=str(position.entry_reasons), )) self.repo.update_daily_performance( pnl=position.realized_pnl, is_win=position.realized_pnl > 0, max_dd=self.drawdown_monitor.max_drawdown, ) await self.alert_manager.notify_close({ "symbol": position.symbol, "direction": position.direction.value, "entry_price": position.entry_price, "exit_price": position.current_price, "pnl": position.realized_pnl, "reason": position.close_reason or "UNKNOWN", }) await event_bus.publish("position_closed", position)