"""Real-time and historical market data feed. Wraps ExchangeClient to manage multi-timeframe DataFrames and provide a streaming data interface for the strategy engine. """ from __future__ import annotations import asyncio from typing import Any, Dict, List, Optional import pandas as pd from loguru import logger from config import settings from execution.exchange_client import ExchangeClient class DataFeed: """Manages market data collection for one or more symbols/timeframes.""" def __init__(self, exchange_client: ExchangeClient): self._client = exchange_client # Cache: {(symbol, timeframe): pd.DataFrame} self._dataframes: Dict[tuple, pd.DataFrame] = {} self._running = False # ------------------------------------------------------------------ # Connection # ------------------------------------------------------------------ async def connect(self) -> None: """Ensure the underlying exchange is connected.""" if not await self._client.is_connected(): await self._client.connect() logger.info("DataFeed ready") async def disconnect(self) -> None: """Stop feeds and disconnect.""" self._running = False await self._client.disconnect() logger.info("DataFeed disconnected") # ------------------------------------------------------------------ # Historical data (REST) # ------------------------------------------------------------------ async def fetch_ohlcv( self, symbol: str, timeframe: str, since: int | None = None, limit: int = 500, ) -> pd.DataFrame: """Fetch historical OHLCV and cache the result.""" df = await self._client.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) self._dataframes[(symbol, timeframe)] = df logger.debug("Fetched {} candles for {} {}", len(df), symbol, timeframe) return df async def fetch_multi_timeframe( self, symbol: str, timeframes: List[str] | None = None, limit: int = 500 ) -> Dict[str, pd.DataFrame]: """Fetch OHLCV for multiple timeframes concurrently.""" tfs = timeframes or [ settings.HTF_TIMEFRAME, settings.MTF_TIMEFRAME, settings.LTF_TIMEFRAME, ] tasks = [self.fetch_ohlcv(symbol, tf, limit=limit) for tf in tfs] results = await asyncio.gather(*tasks) return dict(zip(tfs, results)) # ------------------------------------------------------------------ # Real-time data (WebSocket) # ------------------------------------------------------------------ async def watch_ohlcv(self, symbol: str, timeframe: str) -> List: """Watch live OHLCV candles and update the internal DataFrame.""" candles = await self._client.watch_ohlcv(symbol, timeframe) self._update_dataframe(symbol, timeframe, candles) return candles async def watch_ticker(self, symbol: str) -> Dict[str, Any]: """Watch the live ticker for a symbol.""" return await self._client.exchange.watch_ticker(symbol) async def watch_order_book(self, symbol: str) -> Dict[str, Any]: """Watch the live order book for a symbol.""" return await self._client.exchange.watch_order_book(symbol) async def start_streaming( self, symbols: List[str], timeframe: str = "1m", callback=None ) -> None: """Continuously stream OHLCV data for multiple symbols.""" self._running = True logger.info("Streaming started for {} on {}", symbols, timeframe) while self._running: for symbol in symbols: try: candles = await self.watch_ohlcv(symbol, timeframe) if callback: await callback(symbol, timeframe, candles) except Exception as e: logger.error("Streaming error for {}: {}", symbol, e) await asyncio.sleep(1) def stop_streaming(self) -> None: """Signal the streaming loop to stop.""" self._running = False # ------------------------------------------------------------------ # DataFrame management # ------------------------------------------------------------------ def get_dataframe(self, symbol: str, timeframe: str) -> pd.DataFrame: """Return the cached DataFrame for a symbol/timeframe pair.""" key = (symbol, timeframe) if key not in self._dataframes: raise KeyError(f"No data cached for {symbol} {timeframe}. Fetch first.") return self._dataframes[key] def _update_dataframe( self, symbol: str, timeframe: str, candles: List ) -> None: """Merge incoming WebSocket candles into the cached DataFrame.""" if not candles: return new_df = pd.DataFrame( candles, columns=["timestamp", "open", "high", "low", "close", "volume"] ) new_df["timestamp"] = pd.to_datetime(new_df["timestamp"], unit="ms") new_df.set_index("timestamp", inplace=True) key = (symbol, timeframe) if key in self._dataframes: existing = self._dataframes[key] combined = pd.concat([existing, new_df]) combined = combined[~combined.index.duplicated(keep="last")] self._dataframes[key] = combined.sort_index() else: self._dataframes[key] = new_df