Add ThreadedWebsocketManager-based BinanceWSClient for real-time price streaming, NewsClient for CryptoPanic/NewsAPI fetching with coin filtering, and SocialClient for Reddit post retrieval with keyword filtering and simple keyword-based sentiment scoring. Includes unit tests for news and social clients (4/4 passing). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
26 lines
1.0 KiB
Python
26 lines
1.0 KiB
Python
import pytest
|
|
from data.social_client import SocialClient
|
|
|
|
def test_analyze_reddit_posts():
|
|
client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x")
|
|
posts = [
|
|
{"title": "Bitcoin is amazing! Going to the moon!", "score": 100, "num_comments": 50},
|
|
{"title": "BTC crash incoming, sell everything now", "score": 20, "num_comments": 10},
|
|
{"title": "Bitcoin steady at 40k, not bad", "score": 50, "num_comments": 30},
|
|
]
|
|
result = client.simple_sentiment(posts)
|
|
assert "positive" in result
|
|
assert "negative" in result
|
|
assert "neutral" in result
|
|
assert result["positive"] + result["negative"] + result["neutral"] == len(posts)
|
|
|
|
def test_keyword_match():
|
|
client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x")
|
|
posts = [
|
|
{"title": "Bitcoin BTC going up"},
|
|
{"title": "Ethereum news today"},
|
|
{"title": "BTC and ETH analysis"},
|
|
]
|
|
filtered = client.filter_posts_by_coin(posts, "BTC")
|
|
assert len(filtered) == 2
|