"""CCXT-based exchange client wrapper for FUTURES trading. Provides a unified async interface to any CCXT-supported exchange with automatic sandbox/testnet support. Configured for futures (perpetual swaps) with leverage and margin mode management. """ from __future__ import annotations import asyncio from typing import Any, Dict, List, Optional import ccxt.pro as ccxtpro import pandas as pd from loguru import logger from config import settings class ExchangeClient: """Async exchange client wrapping ccxt.pro for live and sandbox trading.""" def __init__( self, exchange_id: str | None = None, api_key: str | None = None, api_secret: str | None = None, sandbox: bool | None = None, ): self._exchange_id = exchange_id or settings.EXCHANGE_ID self._api_key = api_key or settings.API_KEY self._api_secret = api_secret or settings.API_SECRET self._sandbox = sandbox if sandbox is not None else settings.SANDBOX_MODE self._exchange: Optional[ccxtpro.Exchange] = None # ------------------------------------------------------------------ # Connection lifecycle # ------------------------------------------------------------------ async def connect(self) -> None: """Initialise and authenticate the exchange connection.""" exchange_class = getattr(ccxtpro, self._exchange_id, None) if exchange_class is None: raise ValueError(f"Exchange '{self._exchange_id}' is not supported by ccxt.pro") self._exchange = exchange_class( { "apiKey": self._api_key, "secret": self._api_secret, "enableRateLimit": True, "options": {"defaultType": "future"}, } ) if self._sandbox: self._exchange.set_sandbox_mode(True) logger.info("Exchange connected in SANDBOX mode ({})", self._exchange_id) else: logger.info("Exchange connected in LIVE mode ({})", self._exchange_id) async def disconnect(self) -> None: """Gracefully close the exchange connection.""" if self._exchange: await self._exchange.close() logger.info("Exchange disconnected") async def is_connected(self) -> bool: """Return True if the exchange instance is initialised.""" return self._exchange is not None @property def exchange(self) -> ccxtpro.Exchange: if self._exchange is None: raise RuntimeError("Exchange not connected. Call connect() first.") return self._exchange # ------------------------------------------------------------------ # Market data # ------------------------------------------------------------------ async def watch_ohlcv(self, symbol: str, timeframe: str = "1m") -> List: """Watch real-time OHLCV candles via WebSocket.""" return await self.exchange.watch_ohlcv(symbol, timeframe) async def fetch_ohlcv( self, symbol: str, timeframe: str = "1h", since: int | None = None, limit: int = 500, ) -> pd.DataFrame: """Fetch historical OHLCV data and return as a DataFrame.""" data = await self.exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) df = pd.DataFrame(data, columns=["timestamp", "open", "high", "low", "close", "volume"]) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") df.set_index("timestamp", inplace=True) return df async def fetch_balance(self) -> Dict[str, Any]: """Fetch account balance.""" return await self.exchange.fetch_balance() async def fetch_ticker(self, symbol: str) -> Dict[str, Any]: """Fetch current ticker information.""" return await self.exchange.fetch_ticker(symbol) # ------------------------------------------------------------------ # Order management # ------------------------------------------------------------------ async def create_limit_buy(self, symbol: str, amount: float, price: float) -> Dict: """Place a limit buy order.""" logger.info("LIMIT BUY {} {} @ {}", symbol, amount, price) return await self.exchange.create_limit_buy_order(symbol, amount, price) async def create_limit_sell(self, symbol: str, amount: float, price: float) -> Dict: """Place a limit sell order.""" logger.info("LIMIT SELL {} {} @ {}", symbol, amount, price) return await self.exchange.create_limit_sell_order(symbol, amount, price) async def create_market_buy(self, symbol: str, amount: float) -> Dict: """Place a market buy order.""" logger.info("MARKET BUY {} {}", symbol, amount) return await self.exchange.create_market_buy_order(symbol, amount) async def create_market_sell(self, symbol: str, amount: float) -> Dict: """Place a market sell order.""" logger.info("MARKET SELL {} {}", symbol, amount) return await self.exchange.create_market_sell_order(symbol, amount) async def create_stop_loss( self, symbol: str, side: str, amount: float, stop_price: float ) -> Dict: """Place a stop-loss order (stop-market).""" logger.info("STOP {} {} {} trigger={}", side, symbol, amount, stop_price) params = {"stopPrice": stop_price} return await self.exchange.create_order( symbol, "stop", side, amount, stop_price, params ) async def cancel_order(self, order_id: str, symbol: str) -> Dict: """Cancel an existing order.""" logger.info("CANCEL order {} on {}", order_id, symbol) return await self.exchange.cancel_order(order_id, symbol) async def fetch_order(self, order_id: str, symbol: str) -> Dict: """Fetch a single order by id.""" return await self.exchange.fetch_order(order_id, symbol) async def fetch_open_orders(self, symbol: str | None = None) -> List[Dict]: """Fetch all open orders, optionally filtered by symbol.""" return await self.exchange.fetch_open_orders(symbol) # ------------------------------------------------------------------ # Futures-specific # ------------------------------------------------------------------ async def set_leverage(self, symbol: str, leverage: int) -> None: """Set leverage for a futures symbol.""" try: await self.exchange.set_leverage(leverage, symbol) logger.info("Set leverage {} = {}x", symbol, leverage) except Exception as e: logger.warning("set_leverage failed for {} (may already be set): {}", symbol, e) async def set_margin_mode(self, symbol: str, mode: str = "isolated") -> None: """Set margin mode (isolated/cross) for a futures symbol.""" try: await self.exchange.set_margin_mode(mode, symbol) logger.info("Set margin mode {} = {}", symbol, mode) except Exception as e: logger.warning("set_margin_mode failed for {} (may already be set): {}", symbol, e)