Add ThreadedWebsocketManager-based BinanceWSClient for real-time price streaming, NewsClient for CryptoPanic/NewsAPI fetching with coin filtering, and SocialClient for Reddit post retrieval with keyword filtering and simple keyword-based sentiment scoring. Includes unit tests for news and social clients (4/4 passing). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
73 lines
3.0 KiB
Python
73 lines
3.0 KiB
Python
import logging
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
POSITIVE_WORDS = {"moon", "bullish", "pump", "rally", "amazing", "great", "buy", "long", "up", "high", "profit", "gain", "surge", "breakout"}
|
|
NEGATIVE_WORDS = {"crash", "bearish", "dump", "sell", "short", "down", "low", "loss", "scam", "drop", "fear", "panic", "rekt"}
|
|
|
|
class SocialClient:
|
|
def __init__(self, reddit_client_id: str, reddit_secret: str, reddit_user_agent: str,
|
|
twitter_bearer: str = ""):
|
|
self.reddit_client_id = reddit_client_id
|
|
self.reddit_secret = reddit_secret
|
|
self.reddit_user_agent = reddit_user_agent
|
|
self.twitter_bearer = twitter_bearer
|
|
self._reddit = None
|
|
self._cache: list[dict] = []
|
|
|
|
def _get_reddit(self):
|
|
if self._reddit is None:
|
|
import praw
|
|
self._reddit = praw.Reddit(
|
|
client_id=self.reddit_client_id,
|
|
client_secret=self.reddit_secret,
|
|
user_agent=self.reddit_user_agent,
|
|
)
|
|
return self._reddit
|
|
|
|
def fetch_reddit(self, subreddits=("cryptocurrency", "binance", "CryptoMarkets"), limit=50) -> list[dict]:
|
|
try:
|
|
reddit = self._get_reddit()
|
|
posts = []
|
|
for sub_name in subreddits:
|
|
sub = reddit.subreddit(sub_name)
|
|
for post in sub.hot(limit=limit):
|
|
posts.append({
|
|
"title": post.title,
|
|
"score": post.score,
|
|
"num_comments": post.num_comments,
|
|
"created_utc": post.created_utc,
|
|
"subreddit": sub_name,
|
|
})
|
|
self._cache = posts
|
|
return posts
|
|
except Exception as e:
|
|
logger.warning(f"Reddit fetch failed: {e}")
|
|
return self._cache
|
|
|
|
def filter_posts_by_coin(self, posts: list[dict], coin_symbol: str) -> list[dict]:
|
|
symbol = coin_symbol.replace("USDT", "").upper()
|
|
name_map = {"BTC": ["bitcoin", "btc"], "ETH": ["ethereum", "eth"],
|
|
"SOL": ["solana", "sol"], "BNB": ["bnb", "binance coin"],
|
|
"XRP": ["xrp", "ripple"], "DOGE": ["doge", "dogecoin"],
|
|
"ADA": ["ada", "cardano"], "AVAX": ["avax", "avalanche"]}
|
|
keywords = name_map.get(symbol, [symbol.lower()])
|
|
return [p for p in posts if any(kw in p["title"].lower() for kw in keywords)]
|
|
|
|
def simple_sentiment(self, posts: list[dict]) -> dict:
|
|
positive = 0
|
|
negative = 0
|
|
neutral = 0
|
|
for p in posts:
|
|
words = set(p["title"].lower().split())
|
|
pos_count = len(words & POSITIVE_WORDS)
|
|
neg_count = len(words & NEGATIVE_WORDS)
|
|
if pos_count > neg_count:
|
|
positive += 1
|
|
elif neg_count > pos_count:
|
|
negative += 1
|
|
else:
|
|
neutral += 1
|
|
return {"positive": positive, "negative": negative, "neutral": neutral, "total": len(posts)}
|