import httpx import logging from datetime import datetime, timedelta, timezone logger = logging.getLogger(__name__) class NewsClient: CRYPTOPANIC_URL = "https://cryptopanic.com/api/free/v1/posts/" NEWSAPI_URL = "https://newsapi.org/v2/everything" def __init__(self, cryptopanic_key: str, newsapi_key: str = ""): self.cryptopanic_key = cryptopanic_key self.newsapi_key = newsapi_key self._cache: list[dict] = [] self._cache_time: datetime | None = None def fetch_cryptopanic(self) -> list[dict]: try: resp = httpx.get( self.CRYPTOPANIC_URL, params={"auth_token": self.cryptopanic_key, "filter": "hot", "public": "true"}, timeout=10, ) resp.raise_for_status() return self.parse_cryptopanic(resp.json()) except Exception as e: logger.warning(f"CryptoPanic fetch failed: {e}") return self._cache def parse_cryptopanic(self, raw: dict) -> list[dict]: articles = [] for item in raw.get("results", []): currencies = item.get("currencies") or [] for cur in currencies: articles.append({ "coin": cur.get("code", ""), "title": item.get("title", ""), "published_at": item.get("published_at", ""), "kind": item.get("kind", "news"), "sentiment_votes": item.get("votes", {}), }) return articles def fetch_newsapi(self, query: str = "cryptocurrency") -> list[dict]: if not self.newsapi_key: return [] try: since = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d") resp = httpx.get( self.NEWSAPI_URL, params={"q": query, "from": since, "sortBy": "publishedAt", "apiKey": self.newsapi_key, "language": "en", "pageSize": 50}, timeout=10, ) resp.raise_for_status() return [ {"coin": "", "title": a["title"], "published_at": a["publishedAt"], "kind": "news", "sentiment_votes": {}} for a in resp.json().get("articles", []) ] except Exception as e: logger.warning(f"NewsAPI fetch failed: {e}") return [] def fetch_all(self) -> list[dict]: articles = self.fetch_cryptopanic() + self.fetch_newsapi() self._cache = articles self._cache_time = datetime.now(timezone.utc) return articles def filter_by_coin(self, articles: list[dict], coin_symbol: str) -> list[dict]: symbol = coin_symbol.replace("USDT", "") return [a for a in articles if a["coin"].upper() == symbol.upper()]