From adad553a651ccfcdb2f345760b241681ef3d365c Mon Sep 17 00:00:00 2001 From: choijaewook Date: Fri, 20 Mar 2026 17:48:12 +0900 Subject: [PATCH] feat: Binance WS, news client, and social client Add ThreadedWebsocketManager-based BinanceWSClient for real-time price streaming, NewsClient for CryptoPanic/NewsAPI fetching with coin filtering, and SocialClient for Reddit post retrieval with keyword filtering and simple keyword-based sentiment scoring. Includes unit tests for news and social clients (4/4 passing). Co-Authored-By: Claude Sonnet 4.6 --- data/binance_ws.py | 51 ++++++++++++++++++++++++++ data/news_client.py | 73 +++++++++++++++++++++++++++++++++++++ data/social_client.py | 72 ++++++++++++++++++++++++++++++++++++ tests/test_news_client.py | 31 ++++++++++++++++ tests/test_social_client.py | 25 +++++++++++++ 5 files changed, 252 insertions(+) create mode 100644 data/binance_ws.py create mode 100644 data/news_client.py create mode 100644 data/social_client.py create mode 100644 tests/test_news_client.py create mode 100644 tests/test_social_client.py diff --git a/data/binance_ws.py b/data/binance_ws.py new file mode 100644 index 0000000..3bf3131 --- /dev/null +++ b/data/binance_ws.py @@ -0,0 +1,51 @@ +import threading +import logging +from binance import ThreadedWebsocketManager + +logger = logging.getLogger(__name__) + +class BinanceWSClient: + def __init__(self, api_key: str, api_secret: str): + self.api_key = api_key + self.api_secret = api_secret + self.twm = None + self.prices: dict[str, float] = {} + self._lock = threading.Lock() + + def start(self, symbols: list[str]): + self.twm = ThreadedWebsocketManager( + api_key=self.api_key, api_secret=self.api_secret + ) + self.twm.start() + streams = [s.lower() + "@miniTicker" for s in symbols] + self.twm.start_multiplex_socket( + callback=self._handle_message, streams=streams + ) + logger.info(f"WebSocket started for {len(symbols)} symbols") + + def _handle_message(self, msg): + if msg.get("e") == "error": + logger.error(f"WebSocket error: {msg}") + return + data = msg.get("data", msg) + if "s" in data and "c" in data: + with self._lock: + self.prices[data["s"]] = float(data["c"]) + + def get_price(self, symbol: str) -> float | None: + with self._lock: + return self.prices.get(symbol) + + def get_all_prices(self) -> dict[str, float]: + with self._lock: + return dict(self.prices) + + def update_symbols(self, symbols: list[str]): + if self.twm: + self.stop() + self.start(symbols) + + def stop(self): + if self.twm: + self.twm.stop() + logger.info("WebSocket stopped") diff --git a/data/news_client.py b/data/news_client.py new file mode 100644 index 0000000..85fa8be --- /dev/null +++ b/data/news_client.py @@ -0,0 +1,73 @@ +import httpx +import logging +from datetime import datetime, timedelta, timezone + +logger = logging.getLogger(__name__) + +class NewsClient: + CRYPTOPANIC_URL = "https://cryptopanic.com/api/free/v1/posts/" + NEWSAPI_URL = "https://newsapi.org/v2/everything" + + def __init__(self, cryptopanic_key: str, newsapi_key: str = ""): + self.cryptopanic_key = cryptopanic_key + self.newsapi_key = newsapi_key + self._cache: list[dict] = [] + self._cache_time: datetime | None = None + + def fetch_cryptopanic(self) -> list[dict]: + try: + resp = httpx.get( + self.CRYPTOPANIC_URL, + params={"auth_token": self.cryptopanic_key, "filter": "hot", "public": "true"}, + timeout=10, + ) + resp.raise_for_status() + return self.parse_cryptopanic(resp.json()) + except Exception as e: + logger.warning(f"CryptoPanic fetch failed: {e}") + return self._cache + + def parse_cryptopanic(self, raw: dict) -> list[dict]: + articles = [] + for item in raw.get("results", []): + currencies = item.get("currencies") or [] + for cur in currencies: + articles.append({ + "coin": cur.get("code", ""), + "title": item.get("title", ""), + "published_at": item.get("published_at", ""), + "kind": item.get("kind", "news"), + "sentiment_votes": item.get("votes", {}), + }) + return articles + + def fetch_newsapi(self, query: str = "cryptocurrency") -> list[dict]: + if not self.newsapi_key: + return [] + try: + since = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d") + resp = httpx.get( + self.NEWSAPI_URL, + params={"q": query, "from": since, "sortBy": "publishedAt", + "apiKey": self.newsapi_key, "language": "en", "pageSize": 50}, + timeout=10, + ) + resp.raise_for_status() + return [ + {"coin": "", "title": a["title"], "published_at": a["publishedAt"], + "kind": "news", "sentiment_votes": {}} + for a in resp.json().get("articles", []) + ] + except Exception as e: + logger.warning(f"NewsAPI fetch failed: {e}") + return [] + + def fetch_all(self) -> list[dict]: + articles = self.fetch_cryptopanic() + self.fetch_newsapi() + self._cache = articles + self._cache_time = datetime.now(timezone.utc) + return articles + + def filter_by_coin(self, articles: list[dict], coin_symbol: str) -> list[dict]: + symbol = coin_symbol.replace("USDT", "") + return [a for a in articles if a["coin"].upper() == symbol.upper()] diff --git a/data/social_client.py b/data/social_client.py new file mode 100644 index 0000000..33b43e1 --- /dev/null +++ b/data/social_client.py @@ -0,0 +1,72 @@ +import logging +from datetime import datetime + +logger = logging.getLogger(__name__) + +POSITIVE_WORDS = {"moon", "bullish", "pump", "rally", "amazing", "great", "buy", "long", "up", "high", "profit", "gain", "surge", "breakout"} +NEGATIVE_WORDS = {"crash", "bearish", "dump", "sell", "short", "down", "low", "loss", "scam", "drop", "fear", "panic", "rekt"} + +class SocialClient: + def __init__(self, reddit_client_id: str, reddit_secret: str, reddit_user_agent: str, + twitter_bearer: str = ""): + self.reddit_client_id = reddit_client_id + self.reddit_secret = reddit_secret + self.reddit_user_agent = reddit_user_agent + self.twitter_bearer = twitter_bearer + self._reddit = None + self._cache: list[dict] = [] + + def _get_reddit(self): + if self._reddit is None: + import praw + self._reddit = praw.Reddit( + client_id=self.reddit_client_id, + client_secret=self.reddit_secret, + user_agent=self.reddit_user_agent, + ) + return self._reddit + + def fetch_reddit(self, subreddits=("cryptocurrency", "binance", "CryptoMarkets"), limit=50) -> list[dict]: + try: + reddit = self._get_reddit() + posts = [] + for sub_name in subreddits: + sub = reddit.subreddit(sub_name) + for post in sub.hot(limit=limit): + posts.append({ + "title": post.title, + "score": post.score, + "num_comments": post.num_comments, + "created_utc": post.created_utc, + "subreddit": sub_name, + }) + self._cache = posts + return posts + except Exception as e: + logger.warning(f"Reddit fetch failed: {e}") + return self._cache + + def filter_posts_by_coin(self, posts: list[dict], coin_symbol: str) -> list[dict]: + symbol = coin_symbol.replace("USDT", "").upper() + name_map = {"BTC": ["bitcoin", "btc"], "ETH": ["ethereum", "eth"], + "SOL": ["solana", "sol"], "BNB": ["bnb", "binance coin"], + "XRP": ["xrp", "ripple"], "DOGE": ["doge", "dogecoin"], + "ADA": ["ada", "cardano"], "AVAX": ["avax", "avalanche"]} + keywords = name_map.get(symbol, [symbol.lower()]) + return [p for p in posts if any(kw in p["title"].lower() for kw in keywords)] + + def simple_sentiment(self, posts: list[dict]) -> dict: + positive = 0 + negative = 0 + neutral = 0 + for p in posts: + words = set(p["title"].lower().split()) + pos_count = len(words & POSITIVE_WORDS) + neg_count = len(words & NEGATIVE_WORDS) + if pos_count > neg_count: + positive += 1 + elif neg_count > pos_count: + negative += 1 + else: + neutral += 1 + return {"positive": positive, "negative": negative, "neutral": neutral, "total": len(posts)} diff --git a/tests/test_news_client.py b/tests/test_news_client.py new file mode 100644 index 0000000..a6c13d8 --- /dev/null +++ b/tests/test_news_client.py @@ -0,0 +1,31 @@ +import pytest +from data.news_client import NewsClient + +@pytest.fixture +def client(): + return NewsClient(cryptopanic_key="test", newsapi_key="test") + +def test_parse_cryptopanic_response(client): + raw = { + "results": [ + {"title": "Bitcoin hits new high", "published_at": "2026-03-20T10:00:00Z", + "currencies": [{"code": "BTC"}], "kind": "news", + "votes": {"positive": 10, "negative": 2}}, + {"title": "ETH upgrade coming", "published_at": "2026-03-20T09:00:00Z", + "currencies": [{"code": "ETH"}], "kind": "news", + "votes": {"positive": 5, "negative": 1}}, + ] + } + articles = client.parse_cryptopanic(raw) + assert len(articles) == 2 + assert articles[0]["coin"] == "BTC" + assert articles[0]["title"] == "Bitcoin hits new high" + +def test_get_news_for_coin_filters(client): + articles = [ + {"coin": "BTC", "title": "BTC news", "sentiment_votes": {"positive": 5, "negative": 1}}, + {"coin": "ETH", "title": "ETH news", "sentiment_votes": {"positive": 3, "negative": 3}}, + ] + btc_news = client.filter_by_coin(articles, "BTC") + assert len(btc_news) == 1 + assert btc_news[0]["coin"] == "BTC" diff --git a/tests/test_social_client.py b/tests/test_social_client.py new file mode 100644 index 0000000..747e8cc --- /dev/null +++ b/tests/test_social_client.py @@ -0,0 +1,25 @@ +import pytest +from data.social_client import SocialClient + +def test_analyze_reddit_posts(): + client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x") + posts = [ + {"title": "Bitcoin is amazing! Going to the moon!", "score": 100, "num_comments": 50}, + {"title": "BTC crash incoming, sell everything now", "score": 20, "num_comments": 10}, + {"title": "Bitcoin steady at 40k, not bad", "score": 50, "num_comments": 30}, + ] + result = client.simple_sentiment(posts) + assert "positive" in result + assert "negative" in result + assert "neutral" in result + assert result["positive"] + result["negative"] + result["neutral"] == len(posts) + +def test_keyword_match(): + client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x") + posts = [ + {"title": "Bitcoin BTC going up"}, + {"title": "Ethereum news today"}, + {"title": "BTC and ETH analysis"}, + ] + filtered = client.filter_posts_by_coin(posts, "BTC") + assert len(filtered) == 2