# Crypto Signal Dashboard Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Build a Streamlit dashboard that analyzes Binance spot coins via technical indicators, news, social media, and AI to produce buy/sell signals with a virtual $200 portfolio simulator. **Architecture:** Single Streamlit process imports all modules directly. APScheduler runs analysis agents every 15 minutes in a background thread. Binance WebSocket streams real-time prices. SQLite (WAL mode) stores signals, trades, and portfolio state. **Tech Stack:** Python 3.11+, Streamlit, APScheduler, python-binance, pandas, ta (technical analysis), plotly, anthropic SDK, praw (Reddit), httpx, SQLite **Spec:** `docs/superpowers/specs/2026-03-20-crypto-signal-dashboard-design.md` --- ## File Structure ``` crypto_news_trading/ ├── run.py # Streamlit entry point ├── config.py # Settings, env loading, defaults ├── requirements.txt # Dependencies ├── .env.example # Template for API keys ├── .gitignore │ ├── db/ │ └── schema.sql # SQLite DDL │ ├── data/ │ ├── __init__.py │ ├── db.py # SQLite manager (WAL mode, CRUD) │ ├── binance_rest.py # Binance REST: OHLCV, top coins, ticker │ ├── binance_ws.py # Binance WebSocket: real-time prices │ ├── news_client.py # CryptoPanic + NewsAPI client │ └── social_client.py # Reddit (+ optional Twitter) client │ ├── agents/ │ ├── __init__.py │ ├── technical.py # RSI, MACD, BB, volume, patterns → 0-100 │ ├── news.py # News sentiment → 0-100 │ ├── social.py # Social sentiment → 0-100 │ └── ai_analyst.py # Claude analysis → 0-100 + summary │ ├── engine/ │ ├── __init__.py │ ├── signal.py # Score aggregator + BUY/HOLD/SELL classifier │ ├── surge.py # Volume surge detector │ └── portfolio.py # Portfolio sim: entry/exit/P&L tracking │ ├── dashboard/ │ ├── app.py # Main Streamlit layout, session state init │ ├── sidebar.py # Coin list with scores │ ├── detail.py # Chart, news, social, AI panels │ └── portfolio_view.py # Portfolio summary, holdings, trade history │ ├── scheduler/ │ └── jobs.py # APScheduler job definitions │ ├── tests/ │ ├── __init__.py │ ├── conftest.py # sys.path setup for test imports │ ├── test_db.py │ ├── test_binance_rest.py │ ├── test_technical.py │ ├── test_news_client.py │ ├── test_news_agent.py │ ├── test_social_client.py │ ├── test_social_agent.py │ ├── test_signal.py │ ├── test_surge.py │ └── test_portfolio.py │ └── logs/ # Runtime logs (gitignored) ``` --- ## Task 1: Project Setup & Dependencies **Files:** - Create: `requirements.txt` - Create: `.env.example` - Create: `.gitignore` - Create: `config.py` - [ ] **Step 1: Create requirements.txt** ``` streamlit>=1.30.0 python-binance>=1.0.19 pandas>=2.1.0 ta>=0.11.0 plotly>=5.18.0 anthropic>=0.40.0 praw>=7.7.0 httpx>=0.27.0 apscheduler>=3.10.0 python-dotenv>=1.0.0 pytest>=8.0.0 ``` - [ ] **Step 2: Create .env.example** ``` BINANCE_API_KEY= BINANCE_SECRET= CRYPTOPANIC_API_KEY= NEWS_API_KEY= TWITTER_BEARER_TOKEN= REDDIT_CLIENT_ID= REDDIT_SECRET= REDDIT_USER_AGENT=crypto_signal_bot/1.0 ANTHROPIC_API_KEY= ``` - [ ] **Step 3: Create .gitignore** ``` .env __pycache__/ *.pyc *.db logs/ .superpowers/ ``` - [ ] **Step 4: Create config.py** ```python import os from dotenv import load_dotenv load_dotenv() # API Keys BINANCE_API_KEY = os.getenv("BINANCE_API_KEY", "") BINANCE_SECRET = os.getenv("BINANCE_SECRET", "") CRYPTOPANIC_API_KEY = os.getenv("CRYPTOPANIC_API_KEY", "") NEWS_API_KEY = os.getenv("NEWS_API_KEY", "") TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN", "") REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID", "") REDDIT_SECRET = os.getenv("REDDIT_SECRET", "") REDDIT_USER_AGENT = os.getenv("REDDIT_USER_AGENT", "crypto_signal_bot/1.0") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") # Defaults DB_PATH = os.path.join(os.path.dirname(__file__), "crypto_signals.db") LOG_DIR = os.path.join(os.path.dirname(__file__), "logs") TOP_N_COINS = 50 SURGE_VOLUME_MULTIPLIER = 3.0 # 300% MAX_POSITIONS = 5 INITIAL_CAPITAL = 200.0 STOP_LOSS_PCT = -0.08 TAKE_PROFIT_1_PCT = 0.15 TAKE_PROFIT_2_PCT = 0.25 MIN_POSITION_USD = 15.0 ANALYSIS_INTERVAL_MINUTES = 15 # Default weights DEFAULT_WEIGHTS = { "technical": 0.6, "news": 0.2, "social": 0.1, "ai": 0.1, } ``` - [ ] **Step 5: Create tests/conftest.py for import resolution** ```python # tests/conftest.py import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) ``` - [ ] **Step 6: Install dependencies and commit** Run: `pip install -r requirements.txt` ```bash git add requirements.txt .env.example .gitignore config.py tests/conftest.py git commit -m "feat: project setup with dependencies and config" ``` --- ## Task 2: Database Layer **Files:** - Create: `db/schema.sql` - Create: `data/__init__.py` - Create: `data/db.py` - Create: `tests/__init__.py` - Create: `tests/test_db.py` - [ ] **Step 1: Create db/schema.sql** ```sql CREATE TABLE IF NOT EXISTS signals ( id INTEGER PRIMARY KEY AUTOINCREMENT, coin TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, technical_score REAL DEFAULT 50, news_score REAL DEFAULT 50, social_score REAL DEFAULT 50, ai_score REAL DEFAULT 50, composite_score REAL DEFAULT 50, signal TEXT DEFAULT 'HOLD' ); CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, coin TEXT NOT NULL, side TEXT NOT NULL, price REAL NOT NULL, quantity REAL NOT NULL, amount_usd REAL NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, reason TEXT ); CREATE TABLE IF NOT EXISTS positions ( id INTEGER PRIMARY KEY AUTOINCREMENT, coin TEXT NOT NULL, entry_price REAL NOT NULL, quantity REAL NOT NULL, invested_usd REAL NOT NULL, status TEXT DEFAULT 'OPEN', opened_at DATETIME DEFAULT CURRENT_TIMESTAMP, closed_at DATETIME ); CREATE TABLE IF NOT EXISTS portfolio ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, total_value REAL NOT NULL, cash REAL NOT NULL, pnl REAL NOT NULL, pnl_pct REAL NOT NULL ); CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_signals_coin_ts ON signals(coin, timestamp); CREATE INDEX IF NOT EXISTS idx_positions_status ON positions(status); CREATE INDEX IF NOT EXISTS idx_trades_coin ON trades(coin); ``` - [ ] **Step 2: Write failing tests for db.py** ```python # tests/test_db.py import os import pytest from data.db import Database @pytest.fixture def db(tmp_path): db_path = str(tmp_path / "test.db") database = Database(db_path) database.init() yield database database.close() def test_init_creates_tables(db): tables = db.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() names = {r[0] for r in tables} assert "signals" in names assert "trades" in names assert "positions" in names assert "portfolio" in names assert "settings" in names def test_insert_signal(db): db.insert_signal("BTCUSDT", 75.0, 60.0, 55.0, 70.0, 68.5, "HOLD") rows = db.get_latest_signals() assert len(rows) == 1 assert rows[0]["coin"] == "BTCUSDT" assert rows[0]["composite_score"] == 68.5 def test_insert_trade(db): db.insert_trade("ETHUSDT", "BUY", 3500.0, 0.01, 35.0, "signal") trades = db.get_trades() assert len(trades) == 1 assert trades[0]["coin"] == "ETHUSDT" def test_open_close_position(db): db.open_position("SOLUSDT", 140.0, 0.5, 70.0) positions = db.get_open_positions() assert len(positions) == 1 db.close_position(positions[0]["id"]) assert len(db.get_open_positions()) == 0 def test_save_load_setting(db): db.save_setting("weights", '{"technical": 0.7}') val = db.load_setting("weights") assert val == '{"technical": 0.7}' def test_portfolio_snapshot(db): db.insert_portfolio_snapshot(210.0, 50.0, 10.0, 5.0) snaps = db.get_portfolio_history() assert len(snaps) == 1 assert snaps[0]["total_value"] == 210.0 ``` - [ ] **Step 3: Run tests to verify they fail** Run: `cd crypto_news_trading && python -m pytest tests/test_db.py -v` Expected: FAIL — `ModuleNotFoundError: No module named 'data.db'` - [ ] **Step 4: Implement data/db.py** ```python # data/__init__.py # (empty) # data/db.py import sqlite3 import os class Database: def __init__(self, db_path: str): self.db_path = db_path self.conn = None def init(self): self.conn = sqlite3.connect(self.db_path, check_same_thread=False) self.conn.execute("PRAGMA journal_mode=WAL") self.conn.execute("PRAGMA busy_timeout=5000") self.conn.row_factory = sqlite3.Row schema_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "db", "schema.sql") with open(schema_path) as f: self.conn.executescript(f.read()) def close(self): if self.conn: self.conn.close() def execute(self, sql, params=()): return self.conn.execute(sql, params) def insert_signal(self, coin, technical, news, social, ai, composite, signal): self.conn.execute( "INSERT INTO signals (coin, technical_score, news_score, social_score, ai_score, composite_score, signal) VALUES (?,?,?,?,?,?,?)", (coin, technical, news, social, ai, composite, signal), ) self.conn.commit() def get_latest_signals(self, limit=50): return self.conn.execute( "SELECT * FROM signals WHERE id IN (SELECT MAX(id) FROM signals GROUP BY coin) ORDER BY composite_score DESC LIMIT ?", (limit,), ).fetchall() def insert_trade(self, coin, side, price, quantity, amount_usd, reason): self.conn.execute( "INSERT INTO trades (coin, side, price, quantity, amount_usd, reason) VALUES (?,?,?,?,?,?)", (coin, side, price, quantity, amount_usd, reason), ) self.conn.commit() def get_trades(self, limit=100): return self.conn.execute( "SELECT * FROM trades ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall() def open_position(self, coin, entry_price, quantity, invested_usd): self.conn.execute( "INSERT INTO positions (coin, entry_price, quantity, invested_usd) VALUES (?,?,?,?)", (coin, entry_price, quantity, invested_usd), ) self.conn.commit() def get_open_positions(self): return self.conn.execute( "SELECT * FROM positions WHERE status='OPEN'" ).fetchall() def close_position(self, position_id): self.conn.execute( "UPDATE positions SET status='CLOSED', closed_at=CURRENT_TIMESTAMP WHERE id=?", (position_id,), ) self.conn.commit() def update_position_quantity(self, position_id, new_quantity): self.conn.execute( "UPDATE positions SET quantity=? WHERE id=?", (new_quantity, position_id), ) self.conn.commit() def insert_portfolio_snapshot(self, total_value, cash, pnl, pnl_pct): self.conn.execute( "INSERT INTO portfolio (total_value, cash, pnl, pnl_pct) VALUES (?,?,?,?)", (total_value, cash, pnl, pnl_pct), ) self.conn.commit() def get_portfolio_history(self, limit=100): return self.conn.execute( "SELECT * FROM portfolio ORDER BY timestamp DESC LIMIT ?", (limit,) ).fetchall() def save_setting(self, key, value): self.conn.execute( "INSERT OR REPLACE INTO settings (key, value) VALUES (?,?)", (key, value), ) self.conn.commit() def load_setting(self, key): row = self.conn.execute( "SELECT value FROM settings WHERE key=?", (key,) ).fetchone() return row["value"] if row else None ``` - [ ] **Step 5: Run tests to verify they pass** Run: `cd crypto_news_trading && python -m pytest tests/test_db.py -v` Expected: All 6 tests PASS - [ ] **Step 6: Commit** ```bash git add db/ data/ tests/ git commit -m "feat: database layer with SQLite WAL mode" ``` --- ## Task 3: Binance REST Client **Files:** - Create: `data/binance_rest.py` - Create: `tests/test_binance_rest.py` - [ ] **Step 1: Write tests with mocked responses** ```python # tests/test_binance_rest.py import pytest from unittest.mock import patch, MagicMock from data.binance_rest import BinanceRestClient @pytest.fixture def client(): return BinanceRestClient(api_key="test", api_secret="test") def test_get_top_coins_returns_symbols(client): mock_tickers = [ {"symbol": "BTCUSDT", "quoteVolume": "1000000"}, {"symbol": "ETHUSDT", "quoteVolume": "500000"}, {"symbol": "BTCETH", "quoteVolume": "200000"}, # non-USDT, should be excluded ] with patch.object(client.client, "get_ticker", return_value=mock_tickers): result = client.get_top_coins(limit=2) assert "BTCUSDT" in result assert "ETHUSDT" in result assert "BTCETH" not in result def test_get_ohlcv_returns_dataframe(client): mock_klines = [ [1700000000000, "40000", "41000", "39000", "40500", "100", 1700003599999, "4050000", 500, "50", "2025000", "0"] ] with patch.object(client.client, "get_klines", return_value=mock_klines): df = client.get_ohlcv("BTCUSDT", interval="1h", limit=1) assert len(df) == 1 assert "close" in df.columns assert df.iloc[0]["close"] == 40500.0 def test_get_all_tickers(client): mock_prices = [ {"symbol": "BTCUSDT", "price": "40000.00"}, {"symbol": "ETHUSDT", "price": "3500.00"}, ] with patch.object(client.client, "get_all_tickers", return_value=mock_prices): result = client.get_all_prices() assert result["BTCUSDT"] == 40000.0 ``` - [ ] **Step 2: Run tests to verify they fail** Run: `cd crypto_news_trading && python -m pytest tests/test_binance_rest.py -v` Expected: FAIL - [ ] **Step 3: Implement data/binance_rest.py** ```python import pandas as pd from binance.client import Client import logging logger = logging.getLogger(__name__) class BinanceRestClient: def __init__(self, api_key: str, api_secret: str): self.client = Client(api_key, api_secret) def get_top_coins(self, limit: int = 50) -> list[str]: tickers = self.client.get_ticker() usdt_pairs = [ t for t in tickers if t["symbol"].endswith("USDT") and not t["symbol"].startswith("USDT") ] usdt_pairs.sort(key=lambda x: float(x["quoteVolume"]), reverse=True) return [t["symbol"] for t in usdt_pairs[:limit]] def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> pd.DataFrame: klines = self.client.get_klines(symbol=symbol, interval=interval, limit=limit) df = pd.DataFrame(klines, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_volume", "trades", "taker_buy_base", "taker_buy_quote", "ignore", ]) for col in ["open", "high", "low", "close", "volume", "quote_volume"]: df[col] = df[col].astype(float) df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") return df[["timestamp", "open", "high", "low", "close", "volume", "quote_volume"]] def get_all_prices(self) -> dict[str, float]: tickers = self.client.get_all_tickers() return {t["symbol"]: float(t["price"]) for t in tickers} def get_24h_volume(self, symbol: str) -> dict: ticker = self.client.get_ticker(symbol=symbol) return { "volume": float(ticker["volume"]), "quote_volume": float(ticker["quoteVolume"]), "price_change_pct": float(ticker["priceChangePercent"]), } ``` - [ ] **Step 4: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_binance_rest.py -v` ```bash git add data/binance_rest.py tests/test_binance_rest.py git commit -m "feat: Binance REST client for OHLCV and top coins" ``` --- ## Task 4: Binance WebSocket Client **Files:** - Create: `data/binance_ws.py` - [ ] **Step 1: Implement data/binance_ws.py** ```python import threading import logging from binance import ThreadedWebsocketManager logger = logging.getLogger(__name__) class BinanceWSClient: def __init__(self, api_key: str, api_secret: str): self.api_key = api_key self.api_secret = api_secret self.twm = None self.prices: dict[str, float] = {} self._lock = threading.Lock() def start(self, symbols: list[str]): self.twm = ThreadedWebsocketManager( api_key=self.api_key, api_secret=self.api_secret ) self.twm.start() streams = [s.lower() + "@miniTicker" for s in symbols] self.twm.start_multiplex_socket( callback=self._handle_message, streams=streams ) logger.info(f"WebSocket started for {len(symbols)} symbols") def _handle_message(self, msg): if msg.get("e") == "error": logger.error(f"WebSocket error: {msg}") return data = msg.get("data", msg) if "s" in data and "c" in data: with self._lock: self.prices[data["s"]] = float(data["c"]) def get_price(self, symbol: str) -> float | None: with self._lock: return self.prices.get(symbol) def get_all_prices(self) -> dict[str, float]: with self._lock: return dict(self.prices) def update_symbols(self, symbols: list[str]): if self.twm: self.stop() self.start(symbols) def stop(self): if self.twm: self.twm.stop() logger.info("WebSocket stopped") ``` - [ ] **Step 2: Commit** ```bash git add data/binance_ws.py git commit -m "feat: Binance WebSocket client for real-time prices" ``` --- ## Task 5: News Client **Files:** - Create: `data/news_client.py` - Create: `tests/test_news_client.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_news_client.py import pytest from unittest.mock import patch, AsyncMock, MagicMock from data.news_client import NewsClient @pytest.fixture def client(): return NewsClient(cryptopanic_key="test", newsapi_key="test") def test_parse_cryptopanic_response(client): raw = { "results": [ {"title": "Bitcoin hits new high", "published_at": "2026-03-20T10:00:00Z", "currencies": [{"code": "BTC"}], "kind": "news", "votes": {"positive": 10, "negative": 2}}, {"title": "ETH upgrade coming", "published_at": "2026-03-20T09:00:00Z", "currencies": [{"code": "ETH"}], "kind": "news", "votes": {"positive": 5, "negative": 1}}, ] } articles = client.parse_cryptopanic(raw) assert len(articles) == 2 assert articles[0]["coin"] == "BTC" assert articles[0]["title"] == "Bitcoin hits new high" def test_get_news_for_coin_filters(client): articles = [ {"coin": "BTC", "title": "BTC news", "sentiment_votes": {"positive": 5, "negative": 1}}, {"coin": "ETH", "title": "ETH news", "sentiment_votes": {"positive": 3, "negative": 3}}, ] btc_news = client.filter_by_coin(articles, "BTC") assert len(btc_news) == 1 assert btc_news[0]["coin"] == "BTC" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `cd crypto_news_trading && python -m pytest tests/test_news_client.py -v` Expected: FAIL - [ ] **Step 3: Implement data/news_client.py** ```python import httpx import logging from datetime import datetime, timedelta, timezone logger = logging.getLogger(__name__) class NewsClient: CRYPTOPANIC_URL = "https://cryptopanic.com/api/free/v1/posts/" NEWSAPI_URL = "https://newsapi.org/v2/everything" def __init__(self, cryptopanic_key: str, newsapi_key: str = ""): self.cryptopanic_key = cryptopanic_key self.newsapi_key = newsapi_key self._cache: list[dict] = [] self._cache_time: datetime | None = None def fetch_cryptopanic(self) -> list[dict]: try: resp = httpx.get( self.CRYPTOPANIC_URL, params={"auth_token": self.cryptopanic_key, "filter": "hot", "public": "true"}, timeout=10, ) resp.raise_for_status() return self.parse_cryptopanic(resp.json()) except Exception as e: logger.warning(f"CryptoPanic fetch failed: {e}") return self._cache def parse_cryptopanic(self, raw: dict) -> list[dict]: articles = [] for item in raw.get("results", []): currencies = item.get("currencies") or [] for cur in currencies: articles.append({ "coin": cur.get("code", ""), "title": item.get("title", ""), "published_at": item.get("published_at", ""), "kind": item.get("kind", "news"), "sentiment_votes": item.get("votes", {}), }) return articles def fetch_newsapi(self, query: str = "cryptocurrency") -> list[dict]: if not self.newsapi_key: return [] try: since = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d") resp = httpx.get( self.NEWSAPI_URL, params={"q": query, "from": since, "sortBy": "publishedAt", "apiKey": self.newsapi_key, "language": "en", "pageSize": 50}, timeout=10, ) resp.raise_for_status() return [ {"coin": "", "title": a["title"], "published_at": a["publishedAt"], "kind": "news", "sentiment_votes": {}} for a in resp.json().get("articles", []) ] except Exception as e: logger.warning(f"NewsAPI fetch failed: {e}") return [] def fetch_all(self) -> list[dict]: articles = self.fetch_cryptopanic() + self.fetch_newsapi() self._cache = articles self._cache_time = datetime.now(timezone.utc) return articles def filter_by_coin(self, articles: list[dict], coin_symbol: str) -> list[dict]: symbol = coin_symbol.replace("USDT", "") return [a for a in articles if a["coin"].upper() == symbol.upper()] ``` - [ ] **Step 4: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_news_client.py -v` ```bash git add data/news_client.py tests/test_news.py git commit -m "feat: news client for CryptoPanic and NewsAPI" ``` --- ## Task 6: Social Media Client **Files:** - Create: `data/social_client.py` - Create: `tests/test_social_client.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_social_client.py import pytest from unittest.mock import MagicMock, patch from data.social_client import SocialClient def test_analyze_reddit_posts(): client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x") posts = [ {"title": "Bitcoin is amazing! Going to the moon!", "score": 100, "num_comments": 50}, {"title": "BTC crash incoming, sell everything now", "score": 20, "num_comments": 10}, {"title": "Bitcoin steady at 40k, not bad", "score": 50, "num_comments": 30}, ] result = client.simple_sentiment(posts) assert "positive" in result assert "negative" in result assert "neutral" in result assert result["positive"] + result["negative"] + result["neutral"] == len(posts) def test_keyword_match(): client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x") posts = [ {"title": "Bitcoin BTC going up"}, {"title": "Ethereum news today"}, {"title": "BTC and ETH analysis"}, ] filtered = client.filter_posts_by_coin(posts, "BTC") assert len(filtered) == 2 ``` - [ ] **Step 2: Run tests to verify fail, then implement** - [ ] **Step 3: Implement data/social_client.py** ```python import logging from datetime import datetime logger = logging.getLogger(__name__) # Keyword-based sentiment (simple, no ML dependency) POSITIVE_WORDS = {"moon", "bullish", "pump", "rally", "amazing", "great", "buy", "long", "up", "high", "profit", "gain", "surge", "breakout"} NEGATIVE_WORDS = {"crash", "bearish", "dump", "sell", "short", "down", "low", "loss", "scam", "drop", "fear", "panic", "rekt"} class SocialClient: def __init__(self, reddit_client_id: str, reddit_secret: str, reddit_user_agent: str, twitter_bearer: str = ""): self.reddit_client_id = reddit_client_id self.reddit_secret = reddit_secret self.reddit_user_agent = reddit_user_agent self.twitter_bearer = twitter_bearer self._reddit = None self._cache: list[dict] = [] def _get_reddit(self): if self._reddit is None: import praw self._reddit = praw.Reddit( client_id=self.reddit_client_id, client_secret=self.reddit_secret, user_agent=self.reddit_user_agent, ) return self._reddit def fetch_reddit(self, subreddits=("cryptocurrency", "binance", "CryptoMarkets"), limit=50) -> list[dict]: try: reddit = self._get_reddit() posts = [] for sub_name in subreddits: sub = reddit.subreddit(sub_name) for post in sub.hot(limit=limit): posts.append({ "title": post.title, "score": post.score, "num_comments": post.num_comments, "created_utc": post.created_utc, "subreddit": sub_name, }) self._cache = posts return posts except Exception as e: logger.warning(f"Reddit fetch failed: {e}") return self._cache def filter_posts_by_coin(self, posts: list[dict], coin_symbol: str) -> list[dict]: symbol = coin_symbol.replace("USDT", "").upper() # Map common names name_map = {"BTC": ["bitcoin", "btc"], "ETH": ["ethereum", "eth"], "SOL": ["solana", "sol"], "BNB": ["bnb", "binance coin"], "XRP": ["xrp", "ripple"], "DOGE": ["doge", "dogecoin"], "ADA": ["ada", "cardano"], "AVAX": ["avax", "avalanche"]} keywords = name_map.get(symbol, [symbol.lower()]) return [p for p in posts if any(kw in p["title"].lower() for kw in keywords)] def simple_sentiment(self, posts: list[dict]) -> dict: positive = 0 negative = 0 neutral = 0 for p in posts: words = set(p["title"].lower().split()) pos_count = len(words & POSITIVE_WORDS) neg_count = len(words & NEGATIVE_WORDS) if pos_count > neg_count: positive += 1 elif neg_count > pos_count: negative += 1 else: neutral += 1 return {"positive": positive, "negative": negative, "neutral": neutral, "total": len(posts)} ``` - [ ] **Step 4: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_social_client.py -v` ```bash git add data/social_client.py tests/test_social.py git commit -m "feat: social media client with Reddit and sentiment analysis" ``` --- ## Task 7: Technical Analysis Agent **Files:** - Create: `agents/__init__.py` - Create: `agents/technical.py` - Create: `tests/test_technical.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_technical.py import pytest import pandas as pd import numpy as np from agents.technical import TechnicalAgent def make_ohlcv(n=100, base_price=100.0): """Generate synthetic OHLCV data.""" np.random.seed(42) closes = base_price + np.cumsum(np.random.randn(n) * 2) df = pd.DataFrame({ "timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"), "open": closes - np.random.rand(n), "high": closes + np.abs(np.random.randn(n) * 2), "low": closes - np.abs(np.random.randn(n) * 2), "close": closes, "volume": np.random.randint(100, 10000, n).astype(float), "quote_volume": np.random.randint(100000, 1000000, n).astype(float), }) return df def test_score_returns_0_to_100(): agent = TechnicalAgent() df = make_ohlcv(100) score = agent.analyze(df) assert 0 <= score <= 100 def test_score_with_uptrend(): agent = TechnicalAgent() n = 100 closes = np.linspace(100, 200, n) # strong uptrend df = pd.DataFrame({ "timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"), "open": closes - 1, "high": closes + 2, "low": closes - 2, "close": closes, "volume": np.full(n, 5000.0), "quote_volume": np.full(n, 500000.0), }) score = agent.analyze(df) assert score >= 60 # uptrend should score high def test_score_with_downtrend(): agent = TechnicalAgent() n = 100 closes = np.linspace(200, 100, n) # strong downtrend df = pd.DataFrame({ "timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"), "open": closes + 1, "high": closes + 2, "low": closes - 2, "close": closes, "volume": np.full(n, 5000.0), "quote_volume": np.full(n, 500000.0), }) score = agent.analyze(df) assert score <= 40 # downtrend should score low def test_insufficient_data_returns_50(): agent = TechnicalAgent() df = make_ohlcv(5) # too few candles score = agent.analyze(df) assert score == 50 ``` - [ ] **Step 2: Run tests to verify fail** - [ ] **Step 3: Implement agents/technical.py** ```python import pandas as pd import ta import logging logger = logging.getLogger(__name__) class TechnicalAgent: MIN_CANDLES = 30 def analyze(self, df: pd.DataFrame) -> float: if len(df) < self.MIN_CANDLES: return 50.0 try: signals = [] # RSI (14) rsi = ta.momentum.RSIIndicator(df["close"], window=14).rsi().iloc[-1] if rsi < 30: signals.append(90) # oversold = bullish elif rsi < 45: signals.append(70) elif rsi < 55: signals.append(50) elif rsi < 70: signals.append(35) else: signals.append(15) # overbought = bearish # MACD macd_ind = ta.trend.MACD(df["close"]) macd_line = macd_ind.macd().iloc[-1] signal_line = macd_ind.macd_signal().iloc[-1] macd_diff = macd_ind.macd_diff().iloc[-1] prev_diff = macd_ind.macd_diff().iloc[-2] if macd_diff > 0 and prev_diff <= 0: signals.append(85) # bullish crossover elif macd_diff < 0 and prev_diff >= 0: signals.append(15) # bearish crossover elif macd_diff > 0: signals.append(65) else: signals.append(35) # Bollinger Bands bb = ta.volatility.BollingerBands(df["close"]) bb_high = bb.bollinger_hband().iloc[-1] bb_low = bb.bollinger_lband().iloc[-1] price = df["close"].iloc[-1] bb_pct = (price - bb_low) / (bb_high - bb_low) if (bb_high - bb_low) > 0 else 0.5 if bb_pct < 0.2: signals.append(80) # near lower band = bullish elif bb_pct > 0.8: signals.append(20) # near upper band = bearish else: signals.append(50) # SMA trend (20 vs 50) sma20 = df["close"].rolling(20).mean().iloc[-1] sma50 = df["close"].rolling(50).mean().iloc[-1] if len(df) >= 50 else sma20 if price > sma20 > sma50: signals.append(80) # strong uptrend elif price > sma20: signals.append(65) elif price < sma20 < sma50: signals.append(20) # strong downtrend else: signals.append(40) # SMA 200 (long-term trend) if len(df) >= 200: sma200 = df["close"].rolling(200).mean().iloc[-1] if price > sma200: signals.append(70) else: signals.append(30) # Volume trend (current vs 20-period average) vol_avg = df["volume"].rolling(20).mean().iloc[-1] vol_current = df["volume"].iloc[-1] vol_ratio = vol_current / vol_avg if vol_avg > 0 else 1.0 if vol_ratio > 2.0 and price > sma20: signals.append(80) # high volume + uptrend elif vol_ratio > 2.0: signals.append(40) # high volume + downtrend else: signals.append(50) # OBV (On-Balance Volume) trend obv = ta.volume.OnBalanceVolumeIndicator(df["close"], df["volume"]).on_balance_volume() obv_sma = obv.rolling(20).mean() if obv.iloc[-1] > obv_sma.iloc[-1]: signals.append(65) else: signals.append(35) # Candlestick patterns (simplified) last = df.iloc[-1] prev = df.iloc[-2] body = last["close"] - last["open"] prev_body = prev["close"] - prev["open"] # Bullish engulfing if prev_body < 0 and body > 0 and body > abs(prev_body): signals.append(80) # Bearish engulfing elif prev_body > 0 and body < 0 and abs(body) > prev_body: signals.append(20) # Doji (indecision) elif abs(body) < (last["high"] - last["low"]) * 0.1: signals.append(50) else: signals.append(50) return round(sum(signals) / len(signals), 1) except Exception as e: logger.error(f"Technical analysis error: {e}") return 50.0 ``` - [ ] **Step 4: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_technical.py -v` ```bash git add agents/ tests/test_technical.py git commit -m "feat: technical analysis agent with RSI, MACD, BB, SMA, volume" ``` --- ## Task 8: News Sentiment Agent **Files:** - Create: `agents/news.py` - Create: `tests/test_news_agent.py` (rename test file to avoid conflict) - [ ] **Step 1: Write failing tests** ```python # tests/test_news_agent.py import pytest from agents.news import NewsAgent def test_score_positive_news(): agent = NewsAgent() articles = [ {"title": "Bitcoin surges 10%", "sentiment_votes": {"positive": 10, "negative": 1}}, {"title": "BTC adoption grows", "sentiment_votes": {"positive": 8, "negative": 2}}, {"title": "Bitcoin rally continues", "sentiment_votes": {"positive": 15, "negative": 0}}, ] score = agent.analyze(articles) assert score >= 70 def test_score_negative_news(): agent = NewsAgent() articles = [ {"title": "Bitcoin crashes hard", "sentiment_votes": {"positive": 1, "negative": 10}}, {"title": "Crypto market in fear", "sentiment_votes": {"positive": 0, "negative": 15}}, ] score = agent.analyze(articles) assert score <= 35 def test_score_no_news_returns_50(): agent = NewsAgent() score = agent.analyze([]) assert score == 50 def test_score_mixed_news(): agent = NewsAgent() articles = [ {"title": "BTC up", "sentiment_votes": {"positive": 5, "negative": 5}}, ] score = agent.analyze(articles) assert 40 <= score <= 60 ``` - [ ] **Step 2: Implement agents/news.py** ```python import logging logger = logging.getLogger(__name__) POSITIVE_WORDS = {"surge", "rally", "bullish", "high", "gain", "profit", "up", "grows", "adoption", "breakout", "soar", "record"} NEGATIVE_WORDS = {"crash", "drop", "bearish", "low", "loss", "fear", "down", "dump", "scam", "hack", "ban", "panic", "plunge"} class NewsAgent: def analyze(self, articles: list[dict]) -> float: if not articles: return 50.0 try: scores = [] for article in articles: vote_score = self._vote_sentiment(article.get("sentiment_votes", {})) text_score = self._text_sentiment(article.get("title", "")) # Weighted: votes matter more if available if vote_score is not None: scores.append(vote_score * 0.6 + text_score * 0.4) else: scores.append(text_score) return round(max(0, min(100, sum(scores) / len(scores))), 1) except Exception as e: logger.error(f"News analysis error: {e}") return 50.0 def _vote_sentiment(self, votes: dict) -> float | None: pos = votes.get("positive", 0) neg = votes.get("negative", 0) total = pos + neg if total == 0: return None ratio = pos / total # 0.0 to 1.0 return ratio * 100 def _text_sentiment(self, text: str) -> float: words = set(text.lower().split()) pos = len(words & POSITIVE_WORDS) neg = len(words & NEGATIVE_WORDS) total = pos + neg if total == 0: return 50.0 return (pos / total) * 100 ``` - [ ] **Step 3: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_news_agent.py -v` ```bash git add agents/news.py tests/test_news_agent.py git commit -m "feat: news sentiment agent with vote and text analysis" ``` --- ## Task 9: Social Sentiment Agent **Files:** - Create: `agents/social.py` - Create: `tests/test_social_agent.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_social_agent.py import pytest from agents.social import SocialAgent def test_score_bullish_social(): agent = SocialAgent() sentiment = {"positive": 8, "negative": 1, "neutral": 1, "total": 10} mention_trend = 2.5 # mentions up 150% score = agent.analyze(sentiment, mention_trend) assert score >= 70 def test_score_bearish_social(): agent = SocialAgent() sentiment = {"positive": 1, "negative": 8, "neutral": 1, "total": 10} mention_trend = 0.5 score = agent.analyze(sentiment, mention_trend) assert score <= 35 def test_no_data_returns_50(): agent = SocialAgent() score = agent.analyze({"positive": 0, "negative": 0, "neutral": 0, "total": 0}, 1.0) assert score == 50 ``` - [ ] **Step 2: Implement agents/social.py** ```python import logging logger = logging.getLogger(__name__) class SocialAgent: def analyze(self, sentiment: dict, mention_trend: float = 1.0) -> float: total = sentiment.get("total", 0) if total == 0: return 50.0 try: pos = sentiment.get("positive", 0) neg = sentiment.get("negative", 0) # Sentiment ratio score (0-100) ratio = pos / total sentiment_score = ratio * 100 # Mention trend bonus/penalty # trend > 1 = growing mentions, < 1 = declining if mention_trend > 2.0: trend_bonus = 15 elif mention_trend > 1.5: trend_bonus = 10 elif mention_trend > 1.0: trend_bonus = 5 elif mention_trend < 0.5: trend_bonus = -10 else: trend_bonus = 0 # If sentiment is negative and mentions are surging, it's bearish if ratio < 0.4 and mention_trend > 1.5: trend_bonus = -15 score = sentiment_score + trend_bonus return round(max(0, min(100, score)), 1) except Exception as e: logger.error(f"Social analysis error: {e}") return 50.0 ``` - [ ] **Step 3: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_social_agent.py -v` ```bash git add agents/social.py tests/test_social_agent.py git commit -m "feat: social sentiment agent with mention trend analysis" ``` --- ## Task 10: AI Analysis Agent **Files:** - Create: `agents/ai_analyst.py` - [ ] **Step 1: Implement agents/ai_analyst.py** ```python import json import logging from anthropic import Anthropic logger = logging.getLogger(__name__) class AIAgent: def __init__(self, api_key: str): self.client = Anthropic(api_key=api_key) def analyze_batch(self, coins_data: list[dict]) -> dict[str, dict]: """Analyze multiple coins in one API call. Returns {symbol: {score, summary}}.""" if not coins_data: return {} prompt = self._build_prompt(coins_data) try: response = self.client.messages.create( model="claude-sonnet-4-20250514", max_tokens=2000, messages=[{"role": "user", "content": prompt}], ) return self._parse_response(response.content[0].text, coins_data) except Exception as e: logger.error(f"AI analysis failed: {e}") return {c["symbol"]: {"score": 50, "summary": "Analysis unavailable"} for c in coins_data} def _build_prompt(self, coins_data: list[dict]) -> str: coins_text = "" for coin in coins_data: coins_text += f""" Coin: {coin['symbol']} - Price: ${coin.get('price', 'N/A')} - 24h Change: {coin.get('change_pct', 'N/A')}% - Technical Score: {coin.get('technical_score', 'N/A')}/100 - News Sentiment: {coin.get('news_score', 'N/A')}/100 - Social Sentiment: {coin.get('social_score', 'N/A')}/100 - Recent Headlines: {', '.join(coin.get('headlines', [])[:3])} """ return f"""You are a crypto market analyst. Analyze these coins for short-term (24h) spot trading potential. For each coin, provide: 1. A score from 0-100 (0=strong sell, 50=neutral, 100=strong buy) 2. A brief 1-2 sentence summary explaining your reasoning {coins_text} Respond in JSON format: {{ "SYMBOL": {{"score": NUMBER, "summary": "TEXT"}}, ... }} Only output the JSON, no other text.""" def _parse_response(self, text: str, coins_data: list[dict]) -> dict[str, dict]: try: # Extract JSON from response text = text.strip() if text.startswith("```"): text = text.split("```")[1] if text.startswith("json"): text = text[4:] return json.loads(text) except (json.JSONDecodeError, IndexError) as e: logger.warning(f"Failed to parse AI response: {e}") return {c["symbol"]: {"score": 50, "summary": "Parse error"} for c in coins_data} ``` - [ ] **Step 2: Commit** ```bash git add agents/ai_analyst.py git commit -m "feat: AI analysis agent using Claude API" ``` --- ## Task 11: Signal Engine **Files:** - Create: `engine/__init__.py` - Create: `engine/signal.py` - Create: `tests/test_signal.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_signal.py import pytest from engine.signal import SignalEngine @pytest.fixture def engine(): return SignalEngine() def test_compute_score_default_weights(engine): score = engine.compute_score(80, 60, 70, 50) # 80*0.6 + 60*0.2 + 70*0.1 + 50*0.1 = 48+12+7+5 = 72 assert score == 72.0 def test_classify_buy(engine): assert engine.classify(75) == "BUY" def test_classify_hold(engine): assert engine.classify(55) == "HOLD" def test_classify_sell(engine): assert engine.classify(30) == "SELL" def test_custom_weights(engine): engine.set_weights({"technical": 0.4, "news": 0.3, "social": 0.2, "ai": 0.1}) score = engine.compute_score(80, 60, 70, 50) # 80*0.4 + 60*0.3 + 70*0.2 + 50*0.1 = 32+18+14+5 = 69 assert score == 69.0 def test_weights_must_sum_to_one(engine): with pytest.raises(ValueError): engine.set_weights({"technical": 0.5, "news": 0.3, "social": 0.1, "ai": 0.2}) def test_rank_coins(engine): coins = { "BTC": {"technical": 80, "news": 70, "social": 60, "ai": 75}, "ETH": {"technical": 90, "news": 80, "social": 70, "ai": 85}, "DOGE": {"technical": 30, "news": 25, "social": 40, "ai": 20}, } ranked = engine.rank_coins(coins) assert ranked[0]["symbol"] == "ETH" assert ranked[-1]["symbol"] == "DOGE" assert ranked[-1]["signal"] == "SELL" ``` - [ ] **Step 2: Implement engine/signal.py** ```python from config import DEFAULT_WEIGHTS class SignalEngine: def __init__(self): self.weights = dict(DEFAULT_WEIGHTS) def set_weights(self, weights: dict): total = sum(weights.values()) if abs(total - 1.0) > 0.01: raise ValueError(f"Weights must sum to 1.0, got {total}") self.weights = weights def compute_score(self, technical: float, news: float, social: float, ai: float) -> float: score = ( technical * self.weights["technical"] + news * self.weights["news"] + social * self.weights["social"] + ai * self.weights["ai"] ) return round(score, 1) def classify(self, score: float) -> str: if score >= 70: return "BUY" elif score >= 40: return "HOLD" return "SELL" def rank_coins(self, coins: dict[str, dict]) -> list[dict]: results = [] for symbol, scores in coins.items(): composite = self.compute_score( scores["technical"], scores["news"], scores["social"], scores["ai"] ) results.append({ "symbol": symbol, "technical": scores["technical"], "news": scores["news"], "social": scores["social"], "ai": scores["ai"], "composite": composite, "signal": self.classify(composite), }) results.sort(key=lambda x: x["composite"], reverse=True) return results ``` - [ ] **Step 3: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_signal.py -v` ```bash git add engine/ tests/test_signal.py git commit -m "feat: signal engine with weighted scoring and ranking" ``` --- ## Task 12: Surge Detector **Files:** - Create: `engine/surge.py` - Create: `tests/test_surge.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_surge.py import pytest from engine.surge import SurgeDetector def test_detect_surge(): detector = SurgeDetector(multiplier=3.0) tickers = [ {"symbol": "BTCUSDT", "quoteVolume": "1000000", "volume": "100"}, {"symbol": "NEWUSDT", "quoteVolume": "5000000", "volume": "500"}, # surge {"symbol": "ETHUSDT", "quoteVolume": "800000", "volume": "80"}, ] avg_volumes = {"BTCUSDT": 900000, "NEWUSDT": 1000000, "ETHUSDT": 750000} surged = detector.detect(tickers, avg_volumes) assert "NEWUSDT" in surged assert "BTCUSDT" not in surged def test_no_surge(): detector = SurgeDetector(multiplier=3.0) tickers = [{"symbol": "BTCUSDT", "quoteVolume": "1000000", "volume": "100"}] avg_volumes = {"BTCUSDT": 900000} surged = detector.detect(tickers, avg_volumes) assert len(surged) == 0 ``` - [ ] **Step 2: Implement engine/surge.py** ```python import logging logger = logging.getLogger(__name__) class SurgeDetector: def __init__(self, multiplier: float = 3.0): self.multiplier = multiplier def detect(self, tickers: list[dict], avg_volumes: dict[str, float]) -> list[str]: surged = [] for t in tickers: symbol = t["symbol"] if not symbol.endswith("USDT"): continue current_vol = float(t.get("quoteVolume", 0)) avg_vol = avg_volumes.get(symbol, 0) if avg_vol > 0 and current_vol >= avg_vol * self.multiplier: logger.info(f"Surge detected: {symbol} volume {current_vol:.0f} vs avg {avg_vol:.0f} ({current_vol/avg_vol:.1f}x)") surged.append(symbol) return surged ``` - [ ] **Step 3: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_surge.py -v` ```bash git add engine/surge.py tests/test_surge.py git commit -m "feat: volume surge detector" ``` --- ## Task 13: Portfolio Simulator **Files:** - Create: `engine/portfolio.py` - Create: `tests/test_portfolio.py` - [ ] **Step 1: Write failing tests** ```python # tests/test_portfolio.py import pytest from engine.portfolio import PortfolioManager @pytest.fixture def pm(): return PortfolioManager(initial_capital=200.0) def test_initial_state(pm): assert pm.cash == 200.0 assert pm.positions == {} assert pm.trades == [] def test_buy(pm): pm.buy("BTCUSDT", price=40000.0, score=85) assert "BTCUSDT" in pm.positions assert pm.cash < 200.0 assert len(pm.trades) == 1 assert pm.trades[0]["side"] == "BUY" def test_buy_size_by_score(pm): # Score 70-79: 15% of cash = $30 pm.buy("SOLUSDT", price=140.0, score=75) assert abs(pm.trades[0]["amount_usd"] - 30.0) < 0.01 def test_buy_respects_max_positions(pm): for i, coin in enumerate(["A", "B", "C", "D", "E"]): pm.buy(f"{coin}USDT", price=10.0, score=80) pm.buy("FUSDT", price=10.0, score=80) # 6th should be rejected assert len(pm.positions) == 5 def test_buy_respects_min_position(pm): pm.cash = 10.0 # below MIN_POSITION_USD=15 pm.buy("BTCUSDT", price=40000.0, score=85) assert "BTCUSDT" not in pm.positions def test_sell_full(pm): pm.buy("ETHUSDT", price=3500.0, score=80) invested = pm.positions["ETHUSDT"]["invested_usd"] pm.sell("ETHUSDT", price=3800.0, reason="signal") assert "ETHUSDT" not in pm.positions assert pm.cash > 200.0 - invested # got money back + profit def test_stop_loss(pm): pm.buy("DOGEUSDT", price=0.10, score=80) pm.check_exit("DOGEUSDT", current_price=0.091) # -9% < -8% assert "DOGEUSDT" not in pm.positions def test_take_profit_partial(pm): pm.buy("SOLUSDT", price=100.0, score=80) qty_before = pm.positions["SOLUSDT"]["quantity"] pm.check_exit("SOLUSDT", current_price=116.0) # +16% > +15% assert pm.positions["SOLUSDT"]["quantity"] < qty_before # partial sell def test_take_profit_full(pm): pm.buy("SOLUSDT", price=100.0, score=80) pm.check_exit("SOLUSDT", current_price=126.0) # +26% > +25% assert "SOLUSDT" not in pm.positions def test_pnl_calculation(pm): pm.buy("ETHUSDT", price=3500.0, score=80) pnl = pm.get_portfolio_value({"ETHUSDT": 3800.0}) assert pnl["total_pnl"] > 0 assert pnl["total_value"] > 200.0 ``` - [ ] **Step 2: Implement engine/portfolio.py** ```python import logging from datetime import datetime, timezone from config import MAX_POSITIONS, MIN_POSITION_USD, STOP_LOSS_PCT, TAKE_PROFIT_1_PCT, TAKE_PROFIT_2_PCT logger = logging.getLogger(__name__) class PortfolioManager: def __init__(self, initial_capital: float = 200.0): self.initial_capital = initial_capital self.cash = initial_capital self.positions: dict[str, dict] = {} # symbol -> {entry_price, quantity, invested_usd, tp1_hit} self.trades: list[dict] = [] def buy(self, symbol: str, price: float, score: float) -> bool: if symbol in self.positions: return False if len(self.positions) >= MAX_POSITIONS: logger.info(f"Max positions reached, cannot buy {symbol}") return False amount = self._position_size(score) if amount < MIN_POSITION_USD: logger.info(f"Position too small ({amount:.2f}), skipping {symbol}") return False if amount > self.cash: amount = self.cash if amount < MIN_POSITION_USD: return False quantity = amount / price self.cash -= amount self.positions[symbol] = { "entry_price": price, "quantity": quantity, "invested_usd": amount, "tp1_hit": False, "opened_at": datetime.now(timezone.utc).isoformat(), } self.trades.append({ "coin": symbol, "side": "BUY", "price": price, "quantity": quantity, "amount_usd": amount, "timestamp": datetime.now(timezone.utc).isoformat(), "reason": "signal", }) logger.info(f"BUY {symbol}: ${amount:.2f} @ ${price:.4f} (qty: {quantity:.6f})") return True def sell(self, symbol: str, price: float, reason: str = "signal", partial: float = 1.0): if symbol not in self.positions: return pos = self.positions[symbol] sell_qty = pos["quantity"] * partial sell_usd = sell_qty * price self.cash += sell_usd self.trades.append({ "coin": symbol, "side": "SELL", "price": price, "quantity": sell_qty, "amount_usd": sell_usd, "timestamp": datetime.now(timezone.utc).isoformat(), "reason": reason, }) if partial >= 1.0: del self.positions[symbol] logger.info(f"SELL ALL {symbol}: ${sell_usd:.2f} @ ${price:.4f} ({reason})") else: pos["quantity"] -= sell_qty logger.info(f"SELL {partial*100:.0f}% {symbol}: ${sell_usd:.2f} @ ${price:.4f} ({reason})") def check_exit(self, symbol: str, current_price: float): if symbol not in self.positions: return pos = self.positions[symbol] entry = pos["entry_price"] change_pct = (current_price - entry) / entry # Stop-loss if change_pct <= STOP_LOSS_PCT: self.sell(symbol, current_price, reason="stop-loss") return # Take-profit 2 (full exit at +25%) if change_pct >= TAKE_PROFIT_2_PCT: self.sell(symbol, current_price, reason="take-profit-2") return # Take-profit 1 (50% exit at +15%) if change_pct >= TAKE_PROFIT_1_PCT and not pos["tp1_hit"]: pos["tp1_hit"] = True self.sell(symbol, current_price, reason="take-profit-1", partial=0.5) def _position_size(self, score: float) -> float: if score >= 90: pct = 0.30 elif score >= 80: pct = 0.20 else: pct = 0.15 return round(self.cash * pct, 2) def get_portfolio_value(self, current_prices: dict[str, float]) -> dict: holdings_value = sum( pos["quantity"] * current_prices.get(sym, pos["entry_price"]) for sym, pos in self.positions.items() ) total_value = self.cash + holdings_value total_pnl = total_value - self.initial_capital pnl_pct = (total_pnl / self.initial_capital) * 100 if self.initial_capital > 0 else 0 winning = sum(1 for t in self.trades if t["side"] == "SELL" and self._trade_pnl(t) > 0) total_sells = sum(1 for t in self.trades if t["side"] == "SELL") win_rate = (winning / total_sells * 100) if total_sells > 0 else 0 return { "total_value": round(total_value, 2), "cash": round(self.cash, 2), "holdings_value": round(holdings_value, 2), "total_pnl": round(total_pnl, 2), "pnl_pct": round(pnl_pct, 2), "win_rate": round(win_rate, 1), "open_positions": len(self.positions), } def _trade_pnl(self, sell_trade: dict) -> float: # Find most recent buy for this coin before the sell matching_buys = [ t for t in self.trades if t["coin"] == sell_trade["coin"] and t["side"] == "BUY" and t["timestamp"] <= sell_trade["timestamp"] ] if matching_buys: latest_buy = matching_buys[-1] return (sell_trade["price"] - latest_buy["price"]) * sell_trade["quantity"] return 0 ``` - [ ] **Step 3: Run tests, verify pass, commit** Run: `cd crypto_news_trading && python -m pytest tests/test_portfolio.py -v` ```bash git add engine/portfolio.py tests/test_portfolio.py git commit -m "feat: portfolio simulator with entry/exit rules and P&L tracking" ``` --- ## Task 14: Scheduler **Files:** - Create: `scheduler/__init__.py` (empty) - Create: `scheduler/jobs.py` - [ ] **Step 1: Implement scheduler/jobs.py** ```python import logging from apscheduler.schedulers.background import BackgroundScheduler from data.binance_rest import BinanceRestClient from data.news_client import NewsClient from data.social_client import SocialClient from agents.technical import TechnicalAgent from agents.news import NewsAgent from agents.social import SocialAgent from agents.ai_analyst import AIAgent from engine.signal import SignalEngine from engine.surge import SurgeDetector from engine.portfolio import PortfolioManager from data.db import Database logger = logging.getLogger(__name__) class AnalysisJob: def __init__(self, binance: BinanceRestClient, news_client: NewsClient, social_client: SocialClient, ai_agent: AIAgent, signal_engine: SignalEngine, surge_detector: SurgeDetector, portfolio: PortfolioManager, db: Database, monitored_coins: list[str]): self.binance = binance self.news_client = news_client self.social_client = social_client self.ai_agent = ai_agent self.tech_agent = TechnicalAgent() self.news_agent = NewsAgent() self.social_agent = SocialAgent() self.signal_engine = signal_engine self.surge_detector = surge_detector self.portfolio = portfolio self.db = db self.monitored_coins = monitored_coins self.latest_results: dict = {} self.ai_summaries: dict[str, str] = {} # symbol -> AI summary text def run_analysis(self): logger.info("Starting analysis cycle...") try: # 0. Surge detection — add surging coins to monitored list try: all_tickers = self.binance.client.get_ticker() usdt_tickers = [t for t in all_tickers if t["symbol"].endswith("USDT")] avg_volumes = {t["symbol"]: float(t["quoteVolume"]) * 0.7 for t in usdt_tickers} surged = self.surge_detector.detect(usdt_tickers, avg_volumes) for s in surged: if s not in self.monitored_coins: self.monitored_coins.append(s) logger.info(f"Surge-added: {s}") except Exception as e: logger.warning(f"Surge detection failed: {e}") # 1. Fetch data all_news = self.news_client.fetch_all() all_social = self.social_client.fetch_reddit() prices = self.binance.get_all_prices() coins_scores = {} coins_for_ai = [] for symbol in self.monitored_coins: try: # Technical (multi-timeframe: 1h primary, 4h and 1d secondary) df_1h = self.binance.get_ohlcv(symbol, interval="1h", limit=100) score_1h = self.tech_agent.analyze(df_1h) try: df_4h = self.binance.get_ohlcv(symbol, interval="4h", limit=100) score_4h = self.tech_agent.analyze(df_4h) df_1d = self.binance.get_ohlcv(symbol, interval="1d", limit=200) score_1d = self.tech_agent.analyze(df_1d) tech_score = score_1h * 0.5 + score_4h * 0.3 + score_1d * 0.2 except Exception: tech_score = score_1h # News coin_news = self.news_client.filter_by_coin(all_news, symbol) news_score = self.news_agent.analyze(coin_news) # Social coin_posts = self.social_client.filter_posts_by_coin(all_social, symbol) sentiment = self.social_client.simple_sentiment(coin_posts) social_score = self.social_agent.analyze(sentiment, mention_trend=1.0) coins_scores[symbol] = { "technical": tech_score, "news": news_score, "social": social_score, "ai": 50, # placeholder until AI runs } coins_for_ai.append({ "symbol": symbol.replace("USDT", ""), "price": prices.get(symbol, 0), "change_pct": 0, "technical_score": tech_score, "news_score": news_score, "social_score": social_score, "headlines": [a["title"] for a in coin_news[:3]], }) except Exception as e: logger.warning(f"Analysis failed for {symbol}: {e}") coins_scores[symbol] = {"technical": 50, "news": 50, "social": 50, "ai": 50} # 2. AI batch analysis if coins_for_ai: ai_results = self.ai_agent.analyze_batch(coins_for_ai) for symbol in coins_scores: short = symbol.replace("USDT", "") if short in ai_results: coins_scores[symbol]["ai"] = ai_results[short].get("score", 50) self.ai_summaries[symbol] = ai_results[short].get("summary", "") # 3. Compute signals ranked = self.signal_engine.rank_coins(coins_scores) self.latest_results = {r["symbol"]: r for r in ranked} # 4. Store signals for r in ranked: self.db.insert_signal( r["symbol"], r["technical"], r["news"], r["social"], r["ai"], r["composite"], r["signal"], ) # 5. Portfolio actions for r in ranked: symbol = r["symbol"] price = prices.get(symbol, 0) if price <= 0: continue # Check exits first self.portfolio.check_exit(symbol, price) # Then entries if r["signal"] == "BUY" and symbol not in self.portfolio.positions: self.portfolio.buy(symbol, price, r["composite"]) elif r["signal"] == "SELL" and symbol in self.portfolio.positions: self.portfolio.sell(symbol, price, reason="signal-sell") # 6. Save portfolio snapshot pv = self.portfolio.get_portfolio_value(prices) self.db.insert_portfolio_snapshot( pv["total_value"], pv["cash"], pv["total_pnl"], pv["pnl_pct"] ) logger.info(f"Analysis complete. {len(ranked)} coins scored. Portfolio: ${pv['total_value']:.2f}") except Exception as e: logger.error(f"Analysis cycle failed: {e}") def create_scheduler(job: AnalysisJob, interval_minutes: int = 15) -> BackgroundScheduler: scheduler = BackgroundScheduler() scheduler.add_job(job.run_analysis, "interval", minutes=interval_minutes, id="analysis") return scheduler ``` - [ ] **Step 2: Commit** ```bash git add scheduler/ git commit -m "feat: APScheduler analysis job orchestrating all agents" ``` --- ## Task 15: Streamlit Dashboard — Sidebar **Files:** - Create: `dashboard/__init__.py` (empty) - Create: `dashboard/sidebar.py` - [ ] **Step 1: Implement dashboard/sidebar.py** ```python import streamlit as st import json from config import DEFAULT_WEIGHTS def render_sidebar(latest_results: dict, db): st.sidebar.title("Crypto Signals") # Navigation page = st.sidebar.radio("View", ["Signals", "Portfolio"], label_visibility="collapsed") st.sidebar.divider() # Coin list sorted by score if latest_results: coins = sorted(latest_results.values(), key=lambda x: x["composite"], reverse=True) selected_coin = None for coin in coins: signal = coin["signal"] color = {"BUY": "green", "HOLD": "orange", "SELL": "red"}.get(signal, "gray") symbol_short = coin["symbol"].replace("USDT", "") label = f":{color}[{signal}] **{symbol_short}** — {coin['composite']:.0f}" if st.sidebar.button(label, key=coin["symbol"], use_container_width=True): st.session_state["selected_coin"] = coin["symbol"] st.sidebar.divider() # Weight sliders st.sidebar.subheader("Signal Weights") weights = _load_weights(db) new_weights = {} new_weights["technical"] = st.sidebar.slider("Technical", 0.0, 1.0, weights["technical"], 0.05) new_weights["news"] = st.sidebar.slider("News", 0.0, 1.0, weights["news"], 0.05) new_weights["social"] = st.sidebar.slider("Social", 0.0, 1.0, weights["social"], 0.05) new_weights["ai"] = st.sidebar.slider("AI", 0.0, 1.0, weights["ai"], 0.05) total = sum(new_weights.values()) if abs(total - 1.0) > 0.01: st.sidebar.warning(f"Weights sum: {total:.2f} (must be 1.0)") else: if new_weights != weights: db.save_setting("weights", json.dumps(new_weights)) st.session_state["weights_changed"] = True return page def _load_weights(db) -> dict: raw = db.load_setting("weights") if raw: try: return json.loads(raw) except json.JSONDecodeError: pass return dict(DEFAULT_WEIGHTS) ``` - [ ] **Step 2: Commit** ```bash git add dashboard/ git commit -m "feat: Streamlit sidebar with coin list and weight sliders" ``` --- ## Task 16: Streamlit Dashboard — Detail Panels **Files:** - Create: `dashboard/detail.py` - [ ] **Step 1: Implement dashboard/detail.py** ```python import streamlit as st import plotly.graph_objects as go from plotly.subplots import make_subplots import pandas as pd import ta def render_detail(symbol: str, coin_data: dict, ohlcv_df: pd.DataFrame, news_articles: list, social_sentiment: dict, ai_summary: str): symbol_short = symbol.replace("USDT", "") signal = coin_data.get("signal", "HOLD") color = {"BUY": "green", "HOLD": "orange", "SELL": "red"}.get(signal, "gray") st.header(f"{symbol_short}/USDT") cols = st.columns(5) cols[0].metric("Signal", signal) cols[1].metric("Score", f"{coin_data.get('composite', 0):.0f}/100") cols[2].metric("Technical", f"{coin_data.get('technical', 0):.0f}") cols[3].metric("News", f"{coin_data.get('news', 0):.0f}") cols[4].metric("Social / AI", f"{coin_data.get('social', 0):.0f} / {coin_data.get('ai', 0):.0f}") # Tabs for detail panels tab_chart, tab_news, tab_social, tab_ai = st.tabs(["Chart", "News", "Social", "AI Analysis"]) with tab_chart: _render_chart(ohlcv_df, symbol_short) with tab_news: _render_news(news_articles) with tab_social: _render_social(social_sentiment) with tab_ai: _render_ai(ai_summary) def _render_chart(df: pd.DataFrame, symbol: str): if df is None or df.empty: st.info("No chart data available") return # Add indicators df = df.copy() df["rsi"] = ta.momentum.RSIIndicator(df["close"]).rsi() macd = ta.trend.MACD(df["close"]) df["macd"] = macd.macd() df["macd_signal"] = macd.macd_signal() bb = ta.volatility.BollingerBands(df["close"]) df["bb_high"] = bb.bollinger_hband() df["bb_low"] = bb.bollinger_lband() df["sma20"] = df["close"].rolling(20).mean() fig = make_subplots( rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.05, row_heights=[0.6, 0.2, 0.2], subplot_titles=(f"{symbol} Price", "RSI", "MACD"), ) # Candlestick fig.add_trace(go.Candlestick( x=df["timestamp"], open=df["open"], high=df["high"], low=df["low"], close=df["close"], name="Price", ), row=1, col=1) # Bollinger Bands fig.add_trace(go.Scatter(x=df["timestamp"], y=df["bb_high"], line=dict(color="rgba(173,216,230,0.3)"), name="BB Upper"), row=1, col=1) fig.add_trace(go.Scatter(x=df["timestamp"], y=df["bb_low"], line=dict(color="rgba(173,216,230,0.3)"), fill="tonexty", name="BB Lower"), row=1, col=1) fig.add_trace(go.Scatter(x=df["timestamp"], y=df["sma20"], line=dict(color="orange", width=1), name="SMA20"), row=1, col=1) # RSI fig.add_trace(go.Scatter(x=df["timestamp"], y=df["rsi"], line=dict(color="purple"), name="RSI"), row=2, col=1) fig.add_hline(y=70, line_dash="dash", line_color="red", row=2, col=1) fig.add_hline(y=30, line_dash="dash", line_color="green", row=2, col=1) # MACD fig.add_trace(go.Scatter(x=df["timestamp"], y=df["macd"], line=dict(color="blue"), name="MACD"), row=3, col=1) fig.add_trace(go.Scatter(x=df["timestamp"], y=df["macd_signal"], line=dict(color="red"), name="Signal"), row=3, col=1) fig.update_layout( template="plotly_dark", height=700, showlegend=False, xaxis_rangeslider_visible=False, ) st.plotly_chart(fig, use_container_width=True) def _render_news(articles: list): if not articles: st.info("No recent news for this coin") return for article in articles[:10]: votes = article.get("sentiment_votes", {}) pos = votes.get("positive", 0) neg = votes.get("negative", 0) if pos > neg: icon = ":green_circle:" elif neg > pos: icon = ":red_circle:" else: icon = ":white_circle:" st.markdown(f"{icon} **{article['title']}**") st.caption(f"Published: {article.get('published_at', 'N/A')} | +{pos} -{neg}") st.divider() def _render_social(sentiment: dict): if not sentiment or sentiment.get("total", 0) == 0: st.info("No social data available") return total = sentiment["total"] cols = st.columns(3) cols[0].metric("Positive", f"{sentiment['positive']}/{total}", f"{sentiment['positive']/total*100:.0f}%") cols[1].metric("Negative", f"{sentiment['negative']}/{total}", f"-{sentiment['negative']/total*100:.0f}%") cols[2].metric("Neutral", f"{sentiment['neutral']}/{total}") def _render_ai(summary: str): if not summary: st.info("AI analysis not available") return st.markdown(summary) ``` - [ ] **Step 2: Commit** ```bash git add dashboard/detail.py git commit -m "feat: detail panels with chart, news, social, AI views" ``` --- ## Task 17: Streamlit Dashboard — Portfolio View **Files:** - Create: `dashboard/portfolio_view.py` - [ ] **Step 1: Implement dashboard/portfolio_view.py** ```python import streamlit as st import plotly.graph_objects as go import pandas as pd def render_portfolio(portfolio_manager, current_prices: dict, db): st.header("Portfolio Simulator") pv = portfolio_manager.get_portfolio_value(current_prices) # Summary bar cols = st.columns(5) cols[0].metric("Initial Capital", f"${portfolio_manager.initial_capital:.2f}") cols[1].metric("Current Value", f"${pv['total_value']:.2f}") pnl_delta = f"{pv['pnl_pct']:+.1f}%" cols[2].metric("Total P&L", f"${pv['total_pnl']:+.2f}", pnl_delta) cols[3].metric("Win Rate", f"{pv['win_rate']:.0f}%") cols[4].metric("Available Cash", f"${pv['cash']:.2f}") st.divider() col_left, col_right = st.columns([3, 2]) with col_left: # Current holdings st.subheader("Current Holdings") if portfolio_manager.positions: rows = [] for sym, pos in portfolio_manager.positions.items(): price = current_prices.get(sym, pos["entry_price"]) value = pos["quantity"] * price pnl = value - pos["invested_usd"] pnl_pct = (pnl / pos["invested_usd"] * 100) if pos["invested_usd"] > 0 else 0 rows.append({ "Coin": sym.replace("USDT", ""), "Invested": f"${pos['invested_usd']:.2f}", "Qty": f"{pos['quantity']:.6f}", "Entry": f"${pos['entry_price']:.4f}", "Current": f"${price:.4f}", "P&L": f"${pnl:+.2f} ({pnl_pct:+.1f}%)", }) st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True) else: st.info("No open positions") with col_right: # Allocation chart st.subheader("Allocation") if portfolio_manager.positions: labels = [s.replace("USDT", "") for s in portfolio_manager.positions] values = [p["quantity"] * current_prices.get(s, p["entry_price"]) for s, p in portfolio_manager.positions.items()] labels.append("Cash") values.append(pv["cash"]) fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=0.4)]) fig.update_layout(template="plotly_dark", height=300, margin=dict(t=20, b=20)) st.plotly_chart(fig, use_container_width=True) else: st.info("100% Cash") # Trade history st.divider() st.subheader("Trade History") if portfolio_manager.trades: trade_rows = [] for t in reversed(portfolio_manager.trades[-20:]): trade_rows.append({ "Time": t["timestamp"][:16], "Coin": t["coin"].replace("USDT", ""), "Side": t["side"], "Price": f"${t['price']:.4f}", "Amount": f"${t['amount_usd']:.2f}", "Reason": t["reason"], }) st.dataframe(pd.DataFrame(trade_rows), use_container_width=True, hide_index=True) else: st.info("No trades yet") ``` - [ ] **Step 2: Commit** ```bash git add dashboard/portfolio_view.py git commit -m "feat: portfolio view with holdings, allocation, and trade history" ``` --- ## Task 18: Main Streamlit App (run.py) **Files:** - Create: `run.py` - [ ] **Step 1: Implement run.py** ```python import streamlit as st import logging from logging.handlers import RotatingFileHandler import os import json from config import ( BINANCE_API_KEY, BINANCE_SECRET, CRYPTOPANIC_API_KEY, NEWS_API_KEY, TWITTER_BEARER_TOKEN, REDDIT_CLIENT_ID, REDDIT_SECRET, REDDIT_USER_AGENT, ANTHROPIC_API_KEY, DB_PATH, LOG_DIR, TOP_N_COINS, INITIAL_CAPITAL, ANALYSIS_INTERVAL_MINUTES, DEFAULT_WEIGHTS, SURGE_VOLUME_MULTIPLIER, ) from data.db import Database from data.binance_rest import BinanceRestClient from data.binance_ws import BinanceWSClient from data.news_client import NewsClient from data.social_client import SocialClient from agents.ai_analyst import AIAgent from engine.signal import SignalEngine from engine.surge import SurgeDetector from engine.portfolio import PortfolioManager from scheduler.jobs import AnalysisJob, create_scheduler from dashboard.sidebar import render_sidebar from dashboard.detail import render_detail from dashboard.portfolio_view import render_portfolio # --- Logging --- os.makedirs(LOG_DIR, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ RotatingFileHandler(os.path.join(LOG_DIR, "app.log"), maxBytes=10_000_000, backupCount=5), logging.StreamHandler(), ], ) logger = logging.getLogger(__name__) # --- Page config --- st.set_page_config(page_title="Crypto Signal Dashboard", layout="wide", page_icon="📊") # --- Init session state --- if "initialized" not in st.session_state: st.session_state.initialized = False st.session_state.selected_coin = None @st.cache_resource def init_services(): db = Database(DB_PATH) db.init() binance_rest = BinanceRestClient(BINANCE_API_KEY, BINANCE_SECRET) binance_ws = BinanceWSClient(BINANCE_API_KEY, BINANCE_SECRET) news_client = NewsClient(CRYPTOPANIC_API_KEY, NEWS_API_KEY) social_client = SocialClient(REDDIT_CLIENT_ID, REDDIT_SECRET, REDDIT_USER_AGENT, TWITTER_BEARER_TOKEN) ai_agent = AIAgent(ANTHROPIC_API_KEY) signal_engine = SignalEngine() surge_detector = SurgeDetector(SURGE_VOLUME_MULTIPLIER) portfolio = PortfolioManager(INITIAL_CAPITAL) # Load saved weights saved_weights = db.load_setting("weights") if saved_weights: try: signal_engine.set_weights(json.loads(saved_weights)) except (json.JSONDecodeError, ValueError): pass # Get monitored coins try: monitored = binance_rest.get_top_coins(TOP_N_COINS) except Exception: monitored = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "XRPUSDT"] logger.warning("Failed to fetch top coins, using defaults") # Create analysis job job = AnalysisJob( binance=binance_rest, news_client=news_client, social_client=social_client, ai_agent=ai_agent, signal_engine=signal_engine, surge_detector=surge_detector, portfolio=portfolio, db=db, monitored_coins=monitored, ) # Start scheduler scheduler = create_scheduler(job, ANALYSIS_INTERVAL_MINUTES) scheduler.start() # Start WebSocket try: binance_ws.start(monitored[:20]) # limit WS to top 20 except Exception as e: logger.warning(f"WebSocket start failed: {e}") # Run first analysis job.run_analysis() return { "db": db, "binance_rest": binance_rest, "binance_ws": binance_ws, "news_client": news_client, "social_client": social_client, "signal_engine": signal_engine, "portfolio": portfolio, "job": job, "monitored": monitored, } # --- Main --- def main(): services = init_services() db = services["db"] job = services["job"] portfolio = services["portfolio"] binance_rest = services["binance_rest"] binance_ws = services["binance_ws"] news_client = services["news_client"] social_client = services["social_client"] # Get current prices (prefer WS, fallback REST) current_prices = binance_ws.get_all_prices() if not current_prices: try: current_prices = binance_rest.get_all_prices() except Exception: current_prices = {} # Sidebar page = render_sidebar(job.latest_results, db) if page == "Signals": selected = st.session_state.get("selected_coin") if selected and selected in job.latest_results: coin_data = job.latest_results[selected] try: ohlcv = binance_rest.get_ohlcv(selected, interval="1h", limit=100) except Exception: ohlcv = None coin_news = news_client.filter_by_coin(news_client._cache, selected) coin_posts = social_client.filter_posts_by_coin(social_client._cache, selected) sentiment = social_client.simple_sentiment(coin_posts) ai_summary = job.ai_summaries.get(selected, "AI analysis not yet available.") render_detail(selected, coin_data, ohlcv, coin_news, sentiment, ai_summary) else: st.title("Crypto Signal Dashboard") st.info("Select a coin from the sidebar to view detailed analysis.") if job.latest_results: _render_overview(job.latest_results) elif page == "Portfolio": render_portfolio(portfolio, current_prices, db) def _render_overview(results: dict): import pandas as pd st.subheader("Signal Overview") buy_coins = [r for r in results.values() if r["signal"] == "BUY"] sell_coins = [r for r in results.values() if r["signal"] == "SELL"] col1, col2 = st.columns(2) with col1: st.markdown("### :green[BUY Signals]") if buy_coins: rows = [{"Coin": c["symbol"].replace("USDT",""), "Score": f"{c['composite']:.0f}", "Tech": f"{c['technical']:.0f}", "News": f"{c['news']:.0f}"} for c in sorted(buy_coins, key=lambda x: x["composite"], reverse=True)] st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True) else: st.info("No BUY signals currently") with col2: st.markdown("### :red[SELL Signals]") if sell_coins: rows = [{"Coin": c["symbol"].replace("USDT",""), "Score": f"{c['composite']:.0f}", "Tech": f"{c['technical']:.0f}", "News": f"{c['news']:.0f}"} for c in sorted(sell_coins, key=lambda x: x["composite"])] st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True) else: st.info("No SELL signals currently") if __name__ == "__main__": main() ``` - [ ] **Step 2: Commit** ```bash git add run.py git commit -m "feat: main Streamlit app with dashboard orchestration" ``` --- ## Task 19: Integration Test & First Run - [ ] **Step 1: Run all unit tests** Run: `cd crypto_news_trading && python -m pytest tests/ -v` Expected: All tests pass - [ ] **Step 2: Create .env file with real API keys** Copy `.env.example` to `.env` and fill in real API keys. - [ ] **Step 3: Test launch** Run: `cd crypto_news_trading && streamlit run run.py` Expected: Dashboard opens in browser at http://localhost:8501 - [ ] **Step 4: Verify each component** - Sidebar shows coin list with scores - Clicking a coin shows chart, news, social, AI panels - Portfolio tab shows $200 initial capital - Trades begin appearing after first analysis cycle - [ ] **Step 5: Final commit** ```bash git add -A git commit -m "feat: complete crypto signal dashboard v1.0" ```