Add ThreadedWebsocketManager-based BinanceWSClient for real-time price streaming, NewsClient for CryptoPanic/NewsAPI fetching with coin filtering, and SocialClient for Reddit post retrieval with keyword filtering and simple keyword-based sentiment scoring. Includes unit tests for news and social clients (4/4 passing). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
74 lines
2.8 KiB
Python
74 lines
2.8 KiB
Python
import httpx
|
|
import logging
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NewsClient:
|
|
CRYPTOPANIC_URL = "https://cryptopanic.com/api/free/v1/posts/"
|
|
NEWSAPI_URL = "https://newsapi.org/v2/everything"
|
|
|
|
def __init__(self, cryptopanic_key: str, newsapi_key: str = ""):
|
|
self.cryptopanic_key = cryptopanic_key
|
|
self.newsapi_key = newsapi_key
|
|
self._cache: list[dict] = []
|
|
self._cache_time: datetime | None = None
|
|
|
|
def fetch_cryptopanic(self) -> list[dict]:
|
|
try:
|
|
resp = httpx.get(
|
|
self.CRYPTOPANIC_URL,
|
|
params={"auth_token": self.cryptopanic_key, "filter": "hot", "public": "true"},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
return self.parse_cryptopanic(resp.json())
|
|
except Exception as e:
|
|
logger.warning(f"CryptoPanic fetch failed: {e}")
|
|
return self._cache
|
|
|
|
def parse_cryptopanic(self, raw: dict) -> list[dict]:
|
|
articles = []
|
|
for item in raw.get("results", []):
|
|
currencies = item.get("currencies") or []
|
|
for cur in currencies:
|
|
articles.append({
|
|
"coin": cur.get("code", ""),
|
|
"title": item.get("title", ""),
|
|
"published_at": item.get("published_at", ""),
|
|
"kind": item.get("kind", "news"),
|
|
"sentiment_votes": item.get("votes", {}),
|
|
})
|
|
return articles
|
|
|
|
def fetch_newsapi(self, query: str = "cryptocurrency") -> list[dict]:
|
|
if not self.newsapi_key:
|
|
return []
|
|
try:
|
|
since = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d")
|
|
resp = httpx.get(
|
|
self.NEWSAPI_URL,
|
|
params={"q": query, "from": since, "sortBy": "publishedAt",
|
|
"apiKey": self.newsapi_key, "language": "en", "pageSize": 50},
|
|
timeout=10,
|
|
)
|
|
resp.raise_for_status()
|
|
return [
|
|
{"coin": "", "title": a["title"], "published_at": a["publishedAt"],
|
|
"kind": "news", "sentiment_votes": {}}
|
|
for a in resp.json().get("articles", [])
|
|
]
|
|
except Exception as e:
|
|
logger.warning(f"NewsAPI fetch failed: {e}")
|
|
return []
|
|
|
|
def fetch_all(self) -> list[dict]:
|
|
articles = self.fetch_cryptopanic() + self.fetch_newsapi()
|
|
self._cache = articles
|
|
self._cache_time = datetime.now(timezone.utc)
|
|
return articles
|
|
|
|
def filter_by_coin(self, articles: list[dict], coin_symbol: str) -> list[dict]:
|
|
symbol = coin_symbol.replace("USDT", "")
|
|
return [a for a in articles if a["coin"].upper() == symbol.upper()]
|