19 tasks covering: project setup, DB layer, Binance REST/WS, news/social clients, 4 analysis agents, signal engine, surge detector, portfolio simulator, Streamlit dashboard (sidebar, detail, portfolio), scheduler, and integration test. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
81 KiB
Crypto Signal Dashboard Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Build a Streamlit dashboard that analyzes Binance spot coins via technical indicators, news, social media, and AI to produce buy/sell signals with a virtual $200 portfolio simulator.
Architecture: Single Streamlit process imports all modules directly. APScheduler runs analysis agents every 15 minutes in a background thread. Binance WebSocket streams real-time prices. SQLite (WAL mode) stores signals, trades, and portfolio state.
Tech Stack: Python 3.11+, Streamlit, APScheduler, python-binance, pandas, ta (technical analysis), plotly, anthropic SDK, praw (Reddit), httpx, SQLite
Spec: docs/superpowers/specs/2026-03-20-crypto-signal-dashboard-design.md
File Structure
crypto_news_trading/
├── run.py # Streamlit entry point
├── config.py # Settings, env loading, defaults
├── requirements.txt # Dependencies
├── .env.example # Template for API keys
├── .gitignore
│
├── db/
│ └── schema.sql # SQLite DDL
│
├── data/
│ ├── __init__.py
│ ├── db.py # SQLite manager (WAL mode, CRUD)
│ ├── binance_rest.py # Binance REST: OHLCV, top coins, ticker
│ ├── binance_ws.py # Binance WebSocket: real-time prices
│ ├── news_client.py # CryptoPanic + NewsAPI client
│ └── social_client.py # Reddit (+ optional Twitter) client
│
├── agents/
│ ├── __init__.py
│ ├── technical.py # RSI, MACD, BB, volume, patterns → 0-100
│ ├── news.py # News sentiment → 0-100
│ ├── social.py # Social sentiment → 0-100
│ └── ai_analyst.py # Claude analysis → 0-100 + summary
│
├── engine/
│ ├── __init__.py
│ ├── signal.py # Score aggregator + BUY/HOLD/SELL classifier
│ ├── surge.py # Volume surge detector
│ └── portfolio.py # Portfolio sim: entry/exit/P&L tracking
│
├── dashboard/
│ ├── app.py # Main Streamlit layout, session state init
│ ├── sidebar.py # Coin list with scores
│ ├── detail.py # Chart, news, social, AI panels
│ └── portfolio_view.py # Portfolio summary, holdings, trade history
│
├── scheduler/
│ └── jobs.py # APScheduler job definitions
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py # sys.path setup for test imports
│ ├── test_db.py
│ ├── test_binance_rest.py
│ ├── test_technical.py
│ ├── test_news_client.py
│ ├── test_news_agent.py
│ ├── test_social_client.py
│ ├── test_social_agent.py
│ ├── test_signal.py
│ ├── test_surge.py
│ └── test_portfolio.py
│
└── logs/ # Runtime logs (gitignored)
Task 1: Project Setup & Dependencies
Files:
-
Create:
requirements.txt -
Create:
.env.example -
Create:
.gitignore -
Create:
config.py -
Step 1: Create requirements.txt
streamlit>=1.30.0
python-binance>=1.0.19
pandas>=2.1.0
ta>=0.11.0
plotly>=5.18.0
anthropic>=0.40.0
praw>=7.7.0
httpx>=0.27.0
apscheduler>=3.10.0
python-dotenv>=1.0.0
pytest>=8.0.0
- Step 2: Create .env.example
BINANCE_API_KEY=
BINANCE_SECRET=
CRYPTOPANIC_API_KEY=
NEWS_API_KEY=
TWITTER_BEARER_TOKEN=
REDDIT_CLIENT_ID=
REDDIT_SECRET=
REDDIT_USER_AGENT=crypto_signal_bot/1.0
ANTHROPIC_API_KEY=
- Step 3: Create .gitignore
.env
__pycache__/
*.pyc
*.db
logs/
.superpowers/
- Step 4: Create config.py
import os
from dotenv import load_dotenv
load_dotenv()
# API Keys
BINANCE_API_KEY = os.getenv("BINANCE_API_KEY", "")
BINANCE_SECRET = os.getenv("BINANCE_SECRET", "")
CRYPTOPANIC_API_KEY = os.getenv("CRYPTOPANIC_API_KEY", "")
NEWS_API_KEY = os.getenv("NEWS_API_KEY", "")
TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN", "")
REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID", "")
REDDIT_SECRET = os.getenv("REDDIT_SECRET", "")
REDDIT_USER_AGENT = os.getenv("REDDIT_USER_AGENT", "crypto_signal_bot/1.0")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
# Defaults
DB_PATH = os.path.join(os.path.dirname(__file__), "crypto_signals.db")
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
TOP_N_COINS = 50
SURGE_VOLUME_MULTIPLIER = 3.0 # 300%
MAX_POSITIONS = 5
INITIAL_CAPITAL = 200.0
STOP_LOSS_PCT = -0.08
TAKE_PROFIT_1_PCT = 0.15
TAKE_PROFIT_2_PCT = 0.25
MIN_POSITION_USD = 15.0
ANALYSIS_INTERVAL_MINUTES = 15
# Default weights
DEFAULT_WEIGHTS = {
"technical": 0.6,
"news": 0.2,
"social": 0.1,
"ai": 0.1,
}
- Step 5: Create tests/conftest.py for import resolution
# tests/conftest.py
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
- Step 6: Install dependencies and commit
Run: pip install -r requirements.txt
git add requirements.txt .env.example .gitignore config.py tests/conftest.py
git commit -m "feat: project setup with dependencies and config"
Task 2: Database Layer
Files:
-
Create:
db/schema.sql -
Create:
data/__init__.py -
Create:
data/db.py -
Create:
tests/__init__.py -
Create:
tests/test_db.py -
Step 1: Create db/schema.sql
CREATE TABLE IF NOT EXISTS signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
technical_score REAL DEFAULT 50,
news_score REAL DEFAULT 50,
social_score REAL DEFAULT 50,
ai_score REAL DEFAULT 50,
composite_score REAL DEFAULT 50,
signal TEXT DEFAULT 'HOLD'
);
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin TEXT NOT NULL,
side TEXT NOT NULL,
price REAL NOT NULL,
quantity REAL NOT NULL,
amount_usd REAL NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
reason TEXT
);
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin TEXT NOT NULL,
entry_price REAL NOT NULL,
quantity REAL NOT NULL,
invested_usd REAL NOT NULL,
status TEXT DEFAULT 'OPEN',
opened_at DATETIME DEFAULT CURRENT_TIMESTAMP,
closed_at DATETIME
);
CREATE TABLE IF NOT EXISTS portfolio (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
total_value REAL NOT NULL,
cash REAL NOT NULL,
pnl REAL NOT NULL,
pnl_pct REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_signals_coin_ts ON signals(coin, timestamp);
CREATE INDEX IF NOT EXISTS idx_positions_status ON positions(status);
CREATE INDEX IF NOT EXISTS idx_trades_coin ON trades(coin);
- Step 2: Write failing tests for db.py
# tests/test_db.py
import os
import pytest
from data.db import Database
@pytest.fixture
def db(tmp_path):
db_path = str(tmp_path / "test.db")
database = Database(db_path)
database.init()
yield database
database.close()
def test_init_creates_tables(db):
tables = db.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
names = {r[0] for r in tables}
assert "signals" in names
assert "trades" in names
assert "positions" in names
assert "portfolio" in names
assert "settings" in names
def test_insert_signal(db):
db.insert_signal("BTCUSDT", 75.0, 60.0, 55.0, 70.0, 68.5, "HOLD")
rows = db.get_latest_signals()
assert len(rows) == 1
assert rows[0]["coin"] == "BTCUSDT"
assert rows[0]["composite_score"] == 68.5
def test_insert_trade(db):
db.insert_trade("ETHUSDT", "BUY", 3500.0, 0.01, 35.0, "signal")
trades = db.get_trades()
assert len(trades) == 1
assert trades[0]["coin"] == "ETHUSDT"
def test_open_close_position(db):
db.open_position("SOLUSDT", 140.0, 0.5, 70.0)
positions = db.get_open_positions()
assert len(positions) == 1
db.close_position(positions[0]["id"])
assert len(db.get_open_positions()) == 0
def test_save_load_setting(db):
db.save_setting("weights", '{"technical": 0.7}')
val = db.load_setting("weights")
assert val == '{"technical": 0.7}'
def test_portfolio_snapshot(db):
db.insert_portfolio_snapshot(210.0, 50.0, 10.0, 5.0)
snaps = db.get_portfolio_history()
assert len(snaps) == 1
assert snaps[0]["total_value"] == 210.0
- Step 3: Run tests to verify they fail
Run: cd crypto_news_trading && python -m pytest tests/test_db.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'data.db'
- Step 4: Implement data/db.py
# data/__init__.py
# (empty)
# data/db.py
import sqlite3
import os
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = None
def init(self):
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA busy_timeout=5000")
self.conn.row_factory = sqlite3.Row
schema_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "db", "schema.sql")
with open(schema_path) as f:
self.conn.executescript(f.read())
def close(self):
if self.conn:
self.conn.close()
def execute(self, sql, params=()):
return self.conn.execute(sql, params)
def insert_signal(self, coin, technical, news, social, ai, composite, signal):
self.conn.execute(
"INSERT INTO signals (coin, technical_score, news_score, social_score, ai_score, composite_score, signal) VALUES (?,?,?,?,?,?,?)",
(coin, technical, news, social, ai, composite, signal),
)
self.conn.commit()
def get_latest_signals(self, limit=50):
return self.conn.execute(
"SELECT * FROM signals WHERE id IN (SELECT MAX(id) FROM signals GROUP BY coin) ORDER BY composite_score DESC LIMIT ?",
(limit,),
).fetchall()
def insert_trade(self, coin, side, price, quantity, amount_usd, reason):
self.conn.execute(
"INSERT INTO trades (coin, side, price, quantity, amount_usd, reason) VALUES (?,?,?,?,?,?)",
(coin, side, price, quantity, amount_usd, reason),
)
self.conn.commit()
def get_trades(self, limit=100):
return self.conn.execute(
"SELECT * FROM trades ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
def open_position(self, coin, entry_price, quantity, invested_usd):
self.conn.execute(
"INSERT INTO positions (coin, entry_price, quantity, invested_usd) VALUES (?,?,?,?)",
(coin, entry_price, quantity, invested_usd),
)
self.conn.commit()
def get_open_positions(self):
return self.conn.execute(
"SELECT * FROM positions WHERE status='OPEN'"
).fetchall()
def close_position(self, position_id):
self.conn.execute(
"UPDATE positions SET status='CLOSED', closed_at=CURRENT_TIMESTAMP WHERE id=?",
(position_id,),
)
self.conn.commit()
def update_position_quantity(self, position_id, new_quantity):
self.conn.execute(
"UPDATE positions SET quantity=? WHERE id=?",
(new_quantity, position_id),
)
self.conn.commit()
def insert_portfolio_snapshot(self, total_value, cash, pnl, pnl_pct):
self.conn.execute(
"INSERT INTO portfolio (total_value, cash, pnl, pnl_pct) VALUES (?,?,?,?)",
(total_value, cash, pnl, pnl_pct),
)
self.conn.commit()
def get_portfolio_history(self, limit=100):
return self.conn.execute(
"SELECT * FROM portfolio ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
def save_setting(self, key, value):
self.conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?,?)",
(key, value),
)
self.conn.commit()
def load_setting(self, key):
row = self.conn.execute(
"SELECT value FROM settings WHERE key=?", (key,)
).fetchone()
return row["value"] if row else None
- Step 5: Run tests to verify they pass
Run: cd crypto_news_trading && python -m pytest tests/test_db.py -v
Expected: All 6 tests PASS
- Step 6: Commit
git add db/ data/ tests/
git commit -m "feat: database layer with SQLite WAL mode"
Task 3: Binance REST Client
Files:
-
Create:
data/binance_rest.py -
Create:
tests/test_binance_rest.py -
Step 1: Write tests with mocked responses
# tests/test_binance_rest.py
import pytest
from unittest.mock import patch, MagicMock
from data.binance_rest import BinanceRestClient
@pytest.fixture
def client():
return BinanceRestClient(api_key="test", api_secret="test")
def test_get_top_coins_returns_symbols(client):
mock_tickers = [
{"symbol": "BTCUSDT", "quoteVolume": "1000000"},
{"symbol": "ETHUSDT", "quoteVolume": "500000"},
{"symbol": "BTCETH", "quoteVolume": "200000"}, # non-USDT, should be excluded
]
with patch.object(client.client, "get_ticker", return_value=mock_tickers):
result = client.get_top_coins(limit=2)
assert "BTCUSDT" in result
assert "ETHUSDT" in result
assert "BTCETH" not in result
def test_get_ohlcv_returns_dataframe(client):
mock_klines = [
[1700000000000, "40000", "41000", "39000", "40500", "100",
1700003599999, "4050000", 500, "50", "2025000", "0"]
]
with patch.object(client.client, "get_klines", return_value=mock_klines):
df = client.get_ohlcv("BTCUSDT", interval="1h", limit=1)
assert len(df) == 1
assert "close" in df.columns
assert df.iloc[0]["close"] == 40500.0
def test_get_all_tickers(client):
mock_prices = [
{"symbol": "BTCUSDT", "price": "40000.00"},
{"symbol": "ETHUSDT", "price": "3500.00"},
]
with patch.object(client.client, "get_all_tickers", return_value=mock_prices):
result = client.get_all_prices()
assert result["BTCUSDT"] == 40000.0
- Step 2: Run tests to verify they fail
Run: cd crypto_news_trading && python -m pytest tests/test_binance_rest.py -v
Expected: FAIL
- Step 3: Implement data/binance_rest.py
import pandas as pd
from binance.client import Client
import logging
logger = logging.getLogger(__name__)
class BinanceRestClient:
def __init__(self, api_key: str, api_secret: str):
self.client = Client(api_key, api_secret)
def get_top_coins(self, limit: int = 50) -> list[str]:
tickers = self.client.get_ticker()
usdt_pairs = [
t for t in tickers
if t["symbol"].endswith("USDT") and not t["symbol"].startswith("USDT")
]
usdt_pairs.sort(key=lambda x: float(x["quoteVolume"]), reverse=True)
return [t["symbol"] for t in usdt_pairs[:limit]]
def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> pd.DataFrame:
klines = self.client.get_klines(symbol=symbol, interval=interval, limit=limit)
df = pd.DataFrame(klines, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_volume", "trades", "taker_buy_base",
"taker_buy_quote", "ignore",
])
for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
df[col] = df[col].astype(float)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
return df[["timestamp", "open", "high", "low", "close", "volume", "quote_volume"]]
def get_all_prices(self) -> dict[str, float]:
tickers = self.client.get_all_tickers()
return {t["symbol"]: float(t["price"]) for t in tickers}
def get_24h_volume(self, symbol: str) -> dict:
ticker = self.client.get_ticker(symbol=symbol)
return {
"volume": float(ticker["volume"]),
"quote_volume": float(ticker["quoteVolume"]),
"price_change_pct": float(ticker["priceChangePercent"]),
}
- Step 4: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_binance_rest.py -v
git add data/binance_rest.py tests/test_binance_rest.py
git commit -m "feat: Binance REST client for OHLCV and top coins"
Task 4: Binance WebSocket Client
Files:
-
Create:
data/binance_ws.py -
Step 1: Implement data/binance_ws.py
import threading
import logging
from binance import ThreadedWebsocketManager
logger = logging.getLogger(__name__)
class BinanceWSClient:
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
self.twm = None
self.prices: dict[str, float] = {}
self._lock = threading.Lock()
def start(self, symbols: list[str]):
self.twm = ThreadedWebsocketManager(
api_key=self.api_key, api_secret=self.api_secret
)
self.twm.start()
streams = [s.lower() + "@miniTicker" for s in symbols]
self.twm.start_multiplex_socket(
callback=self._handle_message, streams=streams
)
logger.info(f"WebSocket started for {len(symbols)} symbols")
def _handle_message(self, msg):
if msg.get("e") == "error":
logger.error(f"WebSocket error: {msg}")
return
data = msg.get("data", msg)
if "s" in data and "c" in data:
with self._lock:
self.prices[data["s"]] = float(data["c"])
def get_price(self, symbol: str) -> float | None:
with self._lock:
return self.prices.get(symbol)
def get_all_prices(self) -> dict[str, float]:
with self._lock:
return dict(self.prices)
def update_symbols(self, symbols: list[str]):
if self.twm:
self.stop()
self.start(symbols)
def stop(self):
if self.twm:
self.twm.stop()
logger.info("WebSocket stopped")
- Step 2: Commit
git add data/binance_ws.py
git commit -m "feat: Binance WebSocket client for real-time prices"
Task 5: News Client
Files:
-
Create:
data/news_client.py -
Create:
tests/test_news_client.py -
Step 1: Write failing tests
# tests/test_news_client.py
import pytest
from unittest.mock import patch, AsyncMock, MagicMock
from data.news_client import NewsClient
@pytest.fixture
def client():
return NewsClient(cryptopanic_key="test", newsapi_key="test")
def test_parse_cryptopanic_response(client):
raw = {
"results": [
{"title": "Bitcoin hits new high", "published_at": "2026-03-20T10:00:00Z",
"currencies": [{"code": "BTC"}], "kind": "news",
"votes": {"positive": 10, "negative": 2}},
{"title": "ETH upgrade coming", "published_at": "2026-03-20T09:00:00Z",
"currencies": [{"code": "ETH"}], "kind": "news",
"votes": {"positive": 5, "negative": 1}},
]
}
articles = client.parse_cryptopanic(raw)
assert len(articles) == 2
assert articles[0]["coin"] == "BTC"
assert articles[0]["title"] == "Bitcoin hits new high"
def test_get_news_for_coin_filters(client):
articles = [
{"coin": "BTC", "title": "BTC news", "sentiment_votes": {"positive": 5, "negative": 1}},
{"coin": "ETH", "title": "ETH news", "sentiment_votes": {"positive": 3, "negative": 3}},
]
btc_news = client.filter_by_coin(articles, "BTC")
assert len(btc_news) == 1
assert btc_news[0]["coin"] == "BTC"
- Step 2: Run tests to verify they fail
Run: cd crypto_news_trading && python -m pytest tests/test_news_client.py -v
Expected: FAIL
- Step 3: Implement data/news_client.py
import httpx
import logging
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
class NewsClient:
CRYPTOPANIC_URL = "https://cryptopanic.com/api/free/v1/posts/"
NEWSAPI_URL = "https://newsapi.org/v2/everything"
def __init__(self, cryptopanic_key: str, newsapi_key: str = ""):
self.cryptopanic_key = cryptopanic_key
self.newsapi_key = newsapi_key
self._cache: list[dict] = []
self._cache_time: datetime | None = None
def fetch_cryptopanic(self) -> list[dict]:
try:
resp = httpx.get(
self.CRYPTOPANIC_URL,
params={"auth_token": self.cryptopanic_key, "filter": "hot", "public": "true"},
timeout=10,
)
resp.raise_for_status()
return self.parse_cryptopanic(resp.json())
except Exception as e:
logger.warning(f"CryptoPanic fetch failed: {e}")
return self._cache
def parse_cryptopanic(self, raw: dict) -> list[dict]:
articles = []
for item in raw.get("results", []):
currencies = item.get("currencies") or []
for cur in currencies:
articles.append({
"coin": cur.get("code", ""),
"title": item.get("title", ""),
"published_at": item.get("published_at", ""),
"kind": item.get("kind", "news"),
"sentiment_votes": item.get("votes", {}),
})
return articles
def fetch_newsapi(self, query: str = "cryptocurrency") -> list[dict]:
if not self.newsapi_key:
return []
try:
since = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d")
resp = httpx.get(
self.NEWSAPI_URL,
params={"q": query, "from": since, "sortBy": "publishedAt",
"apiKey": self.newsapi_key, "language": "en", "pageSize": 50},
timeout=10,
)
resp.raise_for_status()
return [
{"coin": "", "title": a["title"], "published_at": a["publishedAt"],
"kind": "news", "sentiment_votes": {}}
for a in resp.json().get("articles", [])
]
except Exception as e:
logger.warning(f"NewsAPI fetch failed: {e}")
return []
def fetch_all(self) -> list[dict]:
articles = self.fetch_cryptopanic() + self.fetch_newsapi()
self._cache = articles
self._cache_time = datetime.now(timezone.utc)
return articles
def filter_by_coin(self, articles: list[dict], coin_symbol: str) -> list[dict]:
symbol = coin_symbol.replace("USDT", "")
return [a for a in articles if a["coin"].upper() == symbol.upper()]
- Step 4: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_news_client.py -v
git add data/news_client.py tests/test_news.py
git commit -m "feat: news client for CryptoPanic and NewsAPI"
Task 6: Social Media Client
Files:
-
Create:
data/social_client.py -
Create:
tests/test_social_client.py -
Step 1: Write failing tests
# tests/test_social_client.py
import pytest
from unittest.mock import MagicMock, patch
from data.social_client import SocialClient
def test_analyze_reddit_posts():
client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x")
posts = [
{"title": "Bitcoin is amazing! Going to the moon!", "score": 100, "num_comments": 50},
{"title": "BTC crash incoming, sell everything now", "score": 20, "num_comments": 10},
{"title": "Bitcoin steady at 40k, not bad", "score": 50, "num_comments": 30},
]
result = client.simple_sentiment(posts)
assert "positive" in result
assert "negative" in result
assert "neutral" in result
assert result["positive"] + result["negative"] + result["neutral"] == len(posts)
def test_keyword_match():
client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x")
posts = [
{"title": "Bitcoin BTC going up"},
{"title": "Ethereum news today"},
{"title": "BTC and ETH analysis"},
]
filtered = client.filter_posts_by_coin(posts, "BTC")
assert len(filtered) == 2
-
Step 2: Run tests to verify fail, then implement
-
Step 3: Implement data/social_client.py
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
# Keyword-based sentiment (simple, no ML dependency)
POSITIVE_WORDS = {"moon", "bullish", "pump", "rally", "amazing", "great", "buy", "long", "up", "high", "profit", "gain", "surge", "breakout"}
NEGATIVE_WORDS = {"crash", "bearish", "dump", "sell", "short", "down", "low", "loss", "scam", "drop", "fear", "panic", "rekt"}
class SocialClient:
def __init__(self, reddit_client_id: str, reddit_secret: str, reddit_user_agent: str,
twitter_bearer: str = ""):
self.reddit_client_id = reddit_client_id
self.reddit_secret = reddit_secret
self.reddit_user_agent = reddit_user_agent
self.twitter_bearer = twitter_bearer
self._reddit = None
self._cache: list[dict] = []
def _get_reddit(self):
if self._reddit is None:
import praw
self._reddit = praw.Reddit(
client_id=self.reddit_client_id,
client_secret=self.reddit_secret,
user_agent=self.reddit_user_agent,
)
return self._reddit
def fetch_reddit(self, subreddits=("cryptocurrency", "binance", "CryptoMarkets"), limit=50) -> list[dict]:
try:
reddit = self._get_reddit()
posts = []
for sub_name in subreddits:
sub = reddit.subreddit(sub_name)
for post in sub.hot(limit=limit):
posts.append({
"title": post.title,
"score": post.score,
"num_comments": post.num_comments,
"created_utc": post.created_utc,
"subreddit": sub_name,
})
self._cache = posts
return posts
except Exception as e:
logger.warning(f"Reddit fetch failed: {e}")
return self._cache
def filter_posts_by_coin(self, posts: list[dict], coin_symbol: str) -> list[dict]:
symbol = coin_symbol.replace("USDT", "").upper()
# Map common names
name_map = {"BTC": ["bitcoin", "btc"], "ETH": ["ethereum", "eth"],
"SOL": ["solana", "sol"], "BNB": ["bnb", "binance coin"],
"XRP": ["xrp", "ripple"], "DOGE": ["doge", "dogecoin"],
"ADA": ["ada", "cardano"], "AVAX": ["avax", "avalanche"]}
keywords = name_map.get(symbol, [symbol.lower()])
return [p for p in posts if any(kw in p["title"].lower() for kw in keywords)]
def simple_sentiment(self, posts: list[dict]) -> dict:
positive = 0
negative = 0
neutral = 0
for p in posts:
words = set(p["title"].lower().split())
pos_count = len(words & POSITIVE_WORDS)
neg_count = len(words & NEGATIVE_WORDS)
if pos_count > neg_count:
positive += 1
elif neg_count > pos_count:
negative += 1
else:
neutral += 1
return {"positive": positive, "negative": negative, "neutral": neutral, "total": len(posts)}
- Step 4: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_social_client.py -v
git add data/social_client.py tests/test_social.py
git commit -m "feat: social media client with Reddit and sentiment analysis"
Task 7: Technical Analysis Agent
Files:
-
Create:
agents/__init__.py -
Create:
agents/technical.py -
Create:
tests/test_technical.py -
Step 1: Write failing tests
# tests/test_technical.py
import pytest
import pandas as pd
import numpy as np
from agents.technical import TechnicalAgent
def make_ohlcv(n=100, base_price=100.0):
"""Generate synthetic OHLCV data."""
np.random.seed(42)
closes = base_price + np.cumsum(np.random.randn(n) * 2)
df = pd.DataFrame({
"timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"),
"open": closes - np.random.rand(n),
"high": closes + np.abs(np.random.randn(n) * 2),
"low": closes - np.abs(np.random.randn(n) * 2),
"close": closes,
"volume": np.random.randint(100, 10000, n).astype(float),
"quote_volume": np.random.randint(100000, 1000000, n).astype(float),
})
return df
def test_score_returns_0_to_100():
agent = TechnicalAgent()
df = make_ohlcv(100)
score = agent.analyze(df)
assert 0 <= score <= 100
def test_score_with_uptrend():
agent = TechnicalAgent()
n = 100
closes = np.linspace(100, 200, n) # strong uptrend
df = pd.DataFrame({
"timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"),
"open": closes - 1, "high": closes + 2,
"low": closes - 2, "close": closes,
"volume": np.full(n, 5000.0),
"quote_volume": np.full(n, 500000.0),
})
score = agent.analyze(df)
assert score >= 60 # uptrend should score high
def test_score_with_downtrend():
agent = TechnicalAgent()
n = 100
closes = np.linspace(200, 100, n) # strong downtrend
df = pd.DataFrame({
"timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"),
"open": closes + 1, "high": closes + 2,
"low": closes - 2, "close": closes,
"volume": np.full(n, 5000.0),
"quote_volume": np.full(n, 500000.0),
})
score = agent.analyze(df)
assert score <= 40 # downtrend should score low
def test_insufficient_data_returns_50():
agent = TechnicalAgent()
df = make_ohlcv(5) # too few candles
score = agent.analyze(df)
assert score == 50
-
Step 2: Run tests to verify fail
-
Step 3: Implement agents/technical.py
import pandas as pd
import ta
import logging
logger = logging.getLogger(__name__)
class TechnicalAgent:
MIN_CANDLES = 30
def analyze(self, df: pd.DataFrame) -> float:
if len(df) < self.MIN_CANDLES:
return 50.0
try:
signals = []
# RSI (14)
rsi = ta.momentum.RSIIndicator(df["close"], window=14).rsi().iloc[-1]
if rsi < 30:
signals.append(90) # oversold = bullish
elif rsi < 45:
signals.append(70)
elif rsi < 55:
signals.append(50)
elif rsi < 70:
signals.append(35)
else:
signals.append(15) # overbought = bearish
# MACD
macd_ind = ta.trend.MACD(df["close"])
macd_line = macd_ind.macd().iloc[-1]
signal_line = macd_ind.macd_signal().iloc[-1]
macd_diff = macd_ind.macd_diff().iloc[-1]
prev_diff = macd_ind.macd_diff().iloc[-2]
if macd_diff > 0 and prev_diff <= 0:
signals.append(85) # bullish crossover
elif macd_diff < 0 and prev_diff >= 0:
signals.append(15) # bearish crossover
elif macd_diff > 0:
signals.append(65)
else:
signals.append(35)
# Bollinger Bands
bb = ta.volatility.BollingerBands(df["close"])
bb_high = bb.bollinger_hband().iloc[-1]
bb_low = bb.bollinger_lband().iloc[-1]
price = df["close"].iloc[-1]
bb_pct = (price - bb_low) / (bb_high - bb_low) if (bb_high - bb_low) > 0 else 0.5
if bb_pct < 0.2:
signals.append(80) # near lower band = bullish
elif bb_pct > 0.8:
signals.append(20) # near upper band = bearish
else:
signals.append(50)
# SMA trend (20 vs 50)
sma20 = df["close"].rolling(20).mean().iloc[-1]
sma50 = df["close"].rolling(50).mean().iloc[-1] if len(df) >= 50 else sma20
if price > sma20 > sma50:
signals.append(80) # strong uptrend
elif price > sma20:
signals.append(65)
elif price < sma20 < sma50:
signals.append(20) # strong downtrend
else:
signals.append(40)
# SMA 200 (long-term trend)
if len(df) >= 200:
sma200 = df["close"].rolling(200).mean().iloc[-1]
if price > sma200:
signals.append(70)
else:
signals.append(30)
# Volume trend (current vs 20-period average)
vol_avg = df["volume"].rolling(20).mean().iloc[-1]
vol_current = df["volume"].iloc[-1]
vol_ratio = vol_current / vol_avg if vol_avg > 0 else 1.0
if vol_ratio > 2.0 and price > sma20:
signals.append(80) # high volume + uptrend
elif vol_ratio > 2.0:
signals.append(40) # high volume + downtrend
else:
signals.append(50)
# OBV (On-Balance Volume) trend
obv = ta.volume.OnBalanceVolumeIndicator(df["close"], df["volume"]).on_balance_volume()
obv_sma = obv.rolling(20).mean()
if obv.iloc[-1] > obv_sma.iloc[-1]:
signals.append(65)
else:
signals.append(35)
# Candlestick patterns (simplified)
last = df.iloc[-1]
prev = df.iloc[-2]
body = last["close"] - last["open"]
prev_body = prev["close"] - prev["open"]
# Bullish engulfing
if prev_body < 0 and body > 0 and body > abs(prev_body):
signals.append(80)
# Bearish engulfing
elif prev_body > 0 and body < 0 and abs(body) > prev_body:
signals.append(20)
# Doji (indecision)
elif abs(body) < (last["high"] - last["low"]) * 0.1:
signals.append(50)
else:
signals.append(50)
return round(sum(signals) / len(signals), 1)
except Exception as e:
logger.error(f"Technical analysis error: {e}")
return 50.0
- Step 4: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_technical.py -v
git add agents/ tests/test_technical.py
git commit -m "feat: technical analysis agent with RSI, MACD, BB, SMA, volume"
Task 8: News Sentiment Agent
Files:
-
Create:
agents/news.py -
Create:
tests/test_news_agent.py(rename test file to avoid conflict) -
Step 1: Write failing tests
# tests/test_news_agent.py
import pytest
from agents.news import NewsAgent
def test_score_positive_news():
agent = NewsAgent()
articles = [
{"title": "Bitcoin surges 10%", "sentiment_votes": {"positive": 10, "negative": 1}},
{"title": "BTC adoption grows", "sentiment_votes": {"positive": 8, "negative": 2}},
{"title": "Bitcoin rally continues", "sentiment_votes": {"positive": 15, "negative": 0}},
]
score = agent.analyze(articles)
assert score >= 70
def test_score_negative_news():
agent = NewsAgent()
articles = [
{"title": "Bitcoin crashes hard", "sentiment_votes": {"positive": 1, "negative": 10}},
{"title": "Crypto market in fear", "sentiment_votes": {"positive": 0, "negative": 15}},
]
score = agent.analyze(articles)
assert score <= 35
def test_score_no_news_returns_50():
agent = NewsAgent()
score = agent.analyze([])
assert score == 50
def test_score_mixed_news():
agent = NewsAgent()
articles = [
{"title": "BTC up", "sentiment_votes": {"positive": 5, "negative": 5}},
]
score = agent.analyze(articles)
assert 40 <= score <= 60
- Step 2: Implement agents/news.py
import logging
logger = logging.getLogger(__name__)
POSITIVE_WORDS = {"surge", "rally", "bullish", "high", "gain", "profit", "up", "grows", "adoption", "breakout", "soar", "record"}
NEGATIVE_WORDS = {"crash", "drop", "bearish", "low", "loss", "fear", "down", "dump", "scam", "hack", "ban", "panic", "plunge"}
class NewsAgent:
def analyze(self, articles: list[dict]) -> float:
if not articles:
return 50.0
try:
scores = []
for article in articles:
vote_score = self._vote_sentiment(article.get("sentiment_votes", {}))
text_score = self._text_sentiment(article.get("title", ""))
# Weighted: votes matter more if available
if vote_score is not None:
scores.append(vote_score * 0.6 + text_score * 0.4)
else:
scores.append(text_score)
return round(max(0, min(100, sum(scores) / len(scores))), 1)
except Exception as e:
logger.error(f"News analysis error: {e}")
return 50.0
def _vote_sentiment(self, votes: dict) -> float | None:
pos = votes.get("positive", 0)
neg = votes.get("negative", 0)
total = pos + neg
if total == 0:
return None
ratio = pos / total # 0.0 to 1.0
return ratio * 100
def _text_sentiment(self, text: str) -> float:
words = set(text.lower().split())
pos = len(words & POSITIVE_WORDS)
neg = len(words & NEGATIVE_WORDS)
total = pos + neg
if total == 0:
return 50.0
return (pos / total) * 100
- Step 3: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_news_agent.py -v
git add agents/news.py tests/test_news_agent.py
git commit -m "feat: news sentiment agent with vote and text analysis"
Task 9: Social Sentiment Agent
Files:
-
Create:
agents/social.py -
Create:
tests/test_social_agent.py -
Step 1: Write failing tests
# tests/test_social_agent.py
import pytest
from agents.social import SocialAgent
def test_score_bullish_social():
agent = SocialAgent()
sentiment = {"positive": 8, "negative": 1, "neutral": 1, "total": 10}
mention_trend = 2.5 # mentions up 150%
score = agent.analyze(sentiment, mention_trend)
assert score >= 70
def test_score_bearish_social():
agent = SocialAgent()
sentiment = {"positive": 1, "negative": 8, "neutral": 1, "total": 10}
mention_trend = 0.5
score = agent.analyze(sentiment, mention_trend)
assert score <= 35
def test_no_data_returns_50():
agent = SocialAgent()
score = agent.analyze({"positive": 0, "negative": 0, "neutral": 0, "total": 0}, 1.0)
assert score == 50
- Step 2: Implement agents/social.py
import logging
logger = logging.getLogger(__name__)
class SocialAgent:
def analyze(self, sentiment: dict, mention_trend: float = 1.0) -> float:
total = sentiment.get("total", 0)
if total == 0:
return 50.0
try:
pos = sentiment.get("positive", 0)
neg = sentiment.get("negative", 0)
# Sentiment ratio score (0-100)
ratio = pos / total
sentiment_score = ratio * 100
# Mention trend bonus/penalty
# trend > 1 = growing mentions, < 1 = declining
if mention_trend > 2.0:
trend_bonus = 15
elif mention_trend > 1.5:
trend_bonus = 10
elif mention_trend > 1.0:
trend_bonus = 5
elif mention_trend < 0.5:
trend_bonus = -10
else:
trend_bonus = 0
# If sentiment is negative and mentions are surging, it's bearish
if ratio < 0.4 and mention_trend > 1.5:
trend_bonus = -15
score = sentiment_score + trend_bonus
return round(max(0, min(100, score)), 1)
except Exception as e:
logger.error(f"Social analysis error: {e}")
return 50.0
- Step 3: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_social_agent.py -v
git add agents/social.py tests/test_social_agent.py
git commit -m "feat: social sentiment agent with mention trend analysis"
Task 10: AI Analysis Agent
Files:
-
Create:
agents/ai_analyst.py -
Step 1: Implement agents/ai_analyst.py
import json
import logging
from anthropic import Anthropic
logger = logging.getLogger(__name__)
class AIAgent:
def __init__(self, api_key: str):
self.client = Anthropic(api_key=api_key)
def analyze_batch(self, coins_data: list[dict]) -> dict[str, dict]:
"""Analyze multiple coins in one API call. Returns {symbol: {score, summary}}."""
if not coins_data:
return {}
prompt = self._build_prompt(coins_data)
try:
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}],
)
return self._parse_response(response.content[0].text, coins_data)
except Exception as e:
logger.error(f"AI analysis failed: {e}")
return {c["symbol"]: {"score": 50, "summary": "Analysis unavailable"} for c in coins_data}
def _build_prompt(self, coins_data: list[dict]) -> str:
coins_text = ""
for coin in coins_data:
coins_text += f"""
Coin: {coin['symbol']}
- Price: ${coin.get('price', 'N/A')}
- 24h Change: {coin.get('change_pct', 'N/A')}%
- Technical Score: {coin.get('technical_score', 'N/A')}/100
- News Sentiment: {coin.get('news_score', 'N/A')}/100
- Social Sentiment: {coin.get('social_score', 'N/A')}/100
- Recent Headlines: {', '.join(coin.get('headlines', [])[:3])}
"""
return f"""You are a crypto market analyst. Analyze these coins for short-term (24h) spot trading potential.
For each coin, provide:
1. A score from 0-100 (0=strong sell, 50=neutral, 100=strong buy)
2. A brief 1-2 sentence summary explaining your reasoning
{coins_text}
Respond in JSON format:
{{
"SYMBOL": {{"score": NUMBER, "summary": "TEXT"}},
...
}}
Only output the JSON, no other text."""
def _parse_response(self, text: str, coins_data: list[dict]) -> dict[str, dict]:
try:
# Extract JSON from response
text = text.strip()
if text.startswith("```"):
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
return json.loads(text)
except (json.JSONDecodeError, IndexError) as e:
logger.warning(f"Failed to parse AI response: {e}")
return {c["symbol"]: {"score": 50, "summary": "Parse error"} for c in coins_data}
- Step 2: Commit
git add agents/ai_analyst.py
git commit -m "feat: AI analysis agent using Claude API"
Task 11: Signal Engine
Files:
-
Create:
engine/__init__.py -
Create:
engine/signal.py -
Create:
tests/test_signal.py -
Step 1: Write failing tests
# tests/test_signal.py
import pytest
from engine.signal import SignalEngine
@pytest.fixture
def engine():
return SignalEngine()
def test_compute_score_default_weights(engine):
score = engine.compute_score(80, 60, 70, 50)
# 80*0.6 + 60*0.2 + 70*0.1 + 50*0.1 = 48+12+7+5 = 72
assert score == 72.0
def test_classify_buy(engine):
assert engine.classify(75) == "BUY"
def test_classify_hold(engine):
assert engine.classify(55) == "HOLD"
def test_classify_sell(engine):
assert engine.classify(30) == "SELL"
def test_custom_weights(engine):
engine.set_weights({"technical": 0.4, "news": 0.3, "social": 0.2, "ai": 0.1})
score = engine.compute_score(80, 60, 70, 50)
# 80*0.4 + 60*0.3 + 70*0.2 + 50*0.1 = 32+18+14+5 = 69
assert score == 69.0
def test_weights_must_sum_to_one(engine):
with pytest.raises(ValueError):
engine.set_weights({"technical": 0.5, "news": 0.3, "social": 0.1, "ai": 0.2})
def test_rank_coins(engine):
coins = {
"BTC": {"technical": 80, "news": 70, "social": 60, "ai": 75},
"ETH": {"technical": 90, "news": 80, "social": 70, "ai": 85},
"DOGE": {"technical": 30, "news": 25, "social": 40, "ai": 20},
}
ranked = engine.rank_coins(coins)
assert ranked[0]["symbol"] == "ETH"
assert ranked[-1]["symbol"] == "DOGE"
assert ranked[-1]["signal"] == "SELL"
- Step 2: Implement engine/signal.py
from config import DEFAULT_WEIGHTS
class SignalEngine:
def __init__(self):
self.weights = dict(DEFAULT_WEIGHTS)
def set_weights(self, weights: dict):
total = sum(weights.values())
if abs(total - 1.0) > 0.01:
raise ValueError(f"Weights must sum to 1.0, got {total}")
self.weights = weights
def compute_score(self, technical: float, news: float, social: float, ai: float) -> float:
score = (
technical * self.weights["technical"]
+ news * self.weights["news"]
+ social * self.weights["social"]
+ ai * self.weights["ai"]
)
return round(score, 1)
def classify(self, score: float) -> str:
if score >= 70:
return "BUY"
elif score >= 40:
return "HOLD"
return "SELL"
def rank_coins(self, coins: dict[str, dict]) -> list[dict]:
results = []
for symbol, scores in coins.items():
composite = self.compute_score(
scores["technical"], scores["news"], scores["social"], scores["ai"]
)
results.append({
"symbol": symbol,
"technical": scores["technical"],
"news": scores["news"],
"social": scores["social"],
"ai": scores["ai"],
"composite": composite,
"signal": self.classify(composite),
})
results.sort(key=lambda x: x["composite"], reverse=True)
return results
- Step 3: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_signal.py -v
git add engine/ tests/test_signal.py
git commit -m "feat: signal engine with weighted scoring and ranking"
Task 12: Surge Detector
Files:
-
Create:
engine/surge.py -
Create:
tests/test_surge.py -
Step 1: Write failing tests
# tests/test_surge.py
import pytest
from engine.surge import SurgeDetector
def test_detect_surge():
detector = SurgeDetector(multiplier=3.0)
tickers = [
{"symbol": "BTCUSDT", "quoteVolume": "1000000", "volume": "100"},
{"symbol": "NEWUSDT", "quoteVolume": "5000000", "volume": "500"}, # surge
{"symbol": "ETHUSDT", "quoteVolume": "800000", "volume": "80"},
]
avg_volumes = {"BTCUSDT": 900000, "NEWUSDT": 1000000, "ETHUSDT": 750000}
surged = detector.detect(tickers, avg_volumes)
assert "NEWUSDT" in surged
assert "BTCUSDT" not in surged
def test_no_surge():
detector = SurgeDetector(multiplier=3.0)
tickers = [{"symbol": "BTCUSDT", "quoteVolume": "1000000", "volume": "100"}]
avg_volumes = {"BTCUSDT": 900000}
surged = detector.detect(tickers, avg_volumes)
assert len(surged) == 0
- Step 2: Implement engine/surge.py
import logging
logger = logging.getLogger(__name__)
class SurgeDetector:
def __init__(self, multiplier: float = 3.0):
self.multiplier = multiplier
def detect(self, tickers: list[dict], avg_volumes: dict[str, float]) -> list[str]:
surged = []
for t in tickers:
symbol = t["symbol"]
if not symbol.endswith("USDT"):
continue
current_vol = float(t.get("quoteVolume", 0))
avg_vol = avg_volumes.get(symbol, 0)
if avg_vol > 0 and current_vol >= avg_vol * self.multiplier:
logger.info(f"Surge detected: {symbol} volume {current_vol:.0f} vs avg {avg_vol:.0f} ({current_vol/avg_vol:.1f}x)")
surged.append(symbol)
return surged
- Step 3: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_surge.py -v
git add engine/surge.py tests/test_surge.py
git commit -m "feat: volume surge detector"
Task 13: Portfolio Simulator
Files:
-
Create:
engine/portfolio.py -
Create:
tests/test_portfolio.py -
Step 1: Write failing tests
# tests/test_portfolio.py
import pytest
from engine.portfolio import PortfolioManager
@pytest.fixture
def pm():
return PortfolioManager(initial_capital=200.0)
def test_initial_state(pm):
assert pm.cash == 200.0
assert pm.positions == {}
assert pm.trades == []
def test_buy(pm):
pm.buy("BTCUSDT", price=40000.0, score=85)
assert "BTCUSDT" in pm.positions
assert pm.cash < 200.0
assert len(pm.trades) == 1
assert pm.trades[0]["side"] == "BUY"
def test_buy_size_by_score(pm):
# Score 70-79: 15% of cash = $30
pm.buy("SOLUSDT", price=140.0, score=75)
assert abs(pm.trades[0]["amount_usd"] - 30.0) < 0.01
def test_buy_respects_max_positions(pm):
for i, coin in enumerate(["A", "B", "C", "D", "E"]):
pm.buy(f"{coin}USDT", price=10.0, score=80)
pm.buy("FUSDT", price=10.0, score=80) # 6th should be rejected
assert len(pm.positions) == 5
def test_buy_respects_min_position(pm):
pm.cash = 10.0 # below MIN_POSITION_USD=15
pm.buy("BTCUSDT", price=40000.0, score=85)
assert "BTCUSDT" not in pm.positions
def test_sell_full(pm):
pm.buy("ETHUSDT", price=3500.0, score=80)
invested = pm.positions["ETHUSDT"]["invested_usd"]
pm.sell("ETHUSDT", price=3800.0, reason="signal")
assert "ETHUSDT" not in pm.positions
assert pm.cash > 200.0 - invested # got money back + profit
def test_stop_loss(pm):
pm.buy("DOGEUSDT", price=0.10, score=80)
pm.check_exit("DOGEUSDT", current_price=0.091) # -9% < -8%
assert "DOGEUSDT" not in pm.positions
def test_take_profit_partial(pm):
pm.buy("SOLUSDT", price=100.0, score=80)
qty_before = pm.positions["SOLUSDT"]["quantity"]
pm.check_exit("SOLUSDT", current_price=116.0) # +16% > +15%
assert pm.positions["SOLUSDT"]["quantity"] < qty_before # partial sell
def test_take_profit_full(pm):
pm.buy("SOLUSDT", price=100.0, score=80)
pm.check_exit("SOLUSDT", current_price=126.0) # +26% > +25%
assert "SOLUSDT" not in pm.positions
def test_pnl_calculation(pm):
pm.buy("ETHUSDT", price=3500.0, score=80)
pnl = pm.get_portfolio_value({"ETHUSDT": 3800.0})
assert pnl["total_pnl"] > 0
assert pnl["total_value"] > 200.0
- Step 2: Implement engine/portfolio.py
import logging
from datetime import datetime, timezone
from config import MAX_POSITIONS, MIN_POSITION_USD, STOP_LOSS_PCT, TAKE_PROFIT_1_PCT, TAKE_PROFIT_2_PCT
logger = logging.getLogger(__name__)
class PortfolioManager:
def __init__(self, initial_capital: float = 200.0):
self.initial_capital = initial_capital
self.cash = initial_capital
self.positions: dict[str, dict] = {} # symbol -> {entry_price, quantity, invested_usd, tp1_hit}
self.trades: list[dict] = []
def buy(self, symbol: str, price: float, score: float) -> bool:
if symbol in self.positions:
return False
if len(self.positions) >= MAX_POSITIONS:
logger.info(f"Max positions reached, cannot buy {symbol}")
return False
amount = self._position_size(score)
if amount < MIN_POSITION_USD:
logger.info(f"Position too small ({amount:.2f}), skipping {symbol}")
return False
if amount > self.cash:
amount = self.cash
if amount < MIN_POSITION_USD:
return False
quantity = amount / price
self.cash -= amount
self.positions[symbol] = {
"entry_price": price,
"quantity": quantity,
"invested_usd": amount,
"tp1_hit": False,
"opened_at": datetime.now(timezone.utc).isoformat(),
}
self.trades.append({
"coin": symbol, "side": "BUY", "price": price,
"quantity": quantity, "amount_usd": amount,
"timestamp": datetime.now(timezone.utc).isoformat(), "reason": "signal",
})
logger.info(f"BUY {symbol}: ${amount:.2f} @ ${price:.4f} (qty: {quantity:.6f})")
return True
def sell(self, symbol: str, price: float, reason: str = "signal", partial: float = 1.0):
if symbol not in self.positions:
return
pos = self.positions[symbol]
sell_qty = pos["quantity"] * partial
sell_usd = sell_qty * price
self.cash += sell_usd
self.trades.append({
"coin": symbol, "side": "SELL", "price": price,
"quantity": sell_qty, "amount_usd": sell_usd,
"timestamp": datetime.now(timezone.utc).isoformat(), "reason": reason,
})
if partial >= 1.0:
del self.positions[symbol]
logger.info(f"SELL ALL {symbol}: ${sell_usd:.2f} @ ${price:.4f} ({reason})")
else:
pos["quantity"] -= sell_qty
logger.info(f"SELL {partial*100:.0f}% {symbol}: ${sell_usd:.2f} @ ${price:.4f} ({reason})")
def check_exit(self, symbol: str, current_price: float):
if symbol not in self.positions:
return
pos = self.positions[symbol]
entry = pos["entry_price"]
change_pct = (current_price - entry) / entry
# Stop-loss
if change_pct <= STOP_LOSS_PCT:
self.sell(symbol, current_price, reason="stop-loss")
return
# Take-profit 2 (full exit at +25%)
if change_pct >= TAKE_PROFIT_2_PCT:
self.sell(symbol, current_price, reason="take-profit-2")
return
# Take-profit 1 (50% exit at +15%)
if change_pct >= TAKE_PROFIT_1_PCT and not pos["tp1_hit"]:
pos["tp1_hit"] = True
self.sell(symbol, current_price, reason="take-profit-1", partial=0.5)
def _position_size(self, score: float) -> float:
if score >= 90:
pct = 0.30
elif score >= 80:
pct = 0.20
else:
pct = 0.15
return round(self.cash * pct, 2)
def get_portfolio_value(self, current_prices: dict[str, float]) -> dict:
holdings_value = sum(
pos["quantity"] * current_prices.get(sym, pos["entry_price"])
for sym, pos in self.positions.items()
)
total_value = self.cash + holdings_value
total_pnl = total_value - self.initial_capital
pnl_pct = (total_pnl / self.initial_capital) * 100 if self.initial_capital > 0 else 0
winning = sum(1 for t in self.trades if t["side"] == "SELL" and self._trade_pnl(t) > 0)
total_sells = sum(1 for t in self.trades if t["side"] == "SELL")
win_rate = (winning / total_sells * 100) if total_sells > 0 else 0
return {
"total_value": round(total_value, 2),
"cash": round(self.cash, 2),
"holdings_value": round(holdings_value, 2),
"total_pnl": round(total_pnl, 2),
"pnl_pct": round(pnl_pct, 2),
"win_rate": round(win_rate, 1),
"open_positions": len(self.positions),
}
def _trade_pnl(self, sell_trade: dict) -> float:
# Find most recent buy for this coin before the sell
matching_buys = [
t for t in self.trades
if t["coin"] == sell_trade["coin"] and t["side"] == "BUY"
and t["timestamp"] <= sell_trade["timestamp"]
]
if matching_buys:
latest_buy = matching_buys[-1]
return (sell_trade["price"] - latest_buy["price"]) * sell_trade["quantity"]
return 0
- Step 3: Run tests, verify pass, commit
Run: cd crypto_news_trading && python -m pytest tests/test_portfolio.py -v
git add engine/portfolio.py tests/test_portfolio.py
git commit -m "feat: portfolio simulator with entry/exit rules and P&L tracking"
Task 14: Scheduler
Files:
-
Create:
scheduler/__init__.py(empty) -
Create:
scheduler/jobs.py -
Step 1: Implement scheduler/jobs.py
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from data.binance_rest import BinanceRestClient
from data.news_client import NewsClient
from data.social_client import SocialClient
from agents.technical import TechnicalAgent
from agents.news import NewsAgent
from agents.social import SocialAgent
from agents.ai_analyst import AIAgent
from engine.signal import SignalEngine
from engine.surge import SurgeDetector
from engine.portfolio import PortfolioManager
from data.db import Database
logger = logging.getLogger(__name__)
class AnalysisJob:
def __init__(self, binance: BinanceRestClient, news_client: NewsClient,
social_client: SocialClient, ai_agent: AIAgent,
signal_engine: SignalEngine, surge_detector: SurgeDetector,
portfolio: PortfolioManager, db: Database,
monitored_coins: list[str]):
self.binance = binance
self.news_client = news_client
self.social_client = social_client
self.ai_agent = ai_agent
self.tech_agent = TechnicalAgent()
self.news_agent = NewsAgent()
self.social_agent = SocialAgent()
self.signal_engine = signal_engine
self.surge_detector = surge_detector
self.portfolio = portfolio
self.db = db
self.monitored_coins = monitored_coins
self.latest_results: dict = {}
self.ai_summaries: dict[str, str] = {} # symbol -> AI summary text
def run_analysis(self):
logger.info("Starting analysis cycle...")
try:
# 0. Surge detection — add surging coins to monitored list
try:
all_tickers = self.binance.client.get_ticker()
usdt_tickers = [t for t in all_tickers if t["symbol"].endswith("USDT")]
avg_volumes = {t["symbol"]: float(t["quoteVolume"]) * 0.7 for t in usdt_tickers}
surged = self.surge_detector.detect(usdt_tickers, avg_volumes)
for s in surged:
if s not in self.monitored_coins:
self.monitored_coins.append(s)
logger.info(f"Surge-added: {s}")
except Exception as e:
logger.warning(f"Surge detection failed: {e}")
# 1. Fetch data
all_news = self.news_client.fetch_all()
all_social = self.social_client.fetch_reddit()
prices = self.binance.get_all_prices()
coins_scores = {}
coins_for_ai = []
for symbol in self.monitored_coins:
try:
# Technical (multi-timeframe: 1h primary, 4h and 1d secondary)
df_1h = self.binance.get_ohlcv(symbol, interval="1h", limit=100)
score_1h = self.tech_agent.analyze(df_1h)
try:
df_4h = self.binance.get_ohlcv(symbol, interval="4h", limit=100)
score_4h = self.tech_agent.analyze(df_4h)
df_1d = self.binance.get_ohlcv(symbol, interval="1d", limit=200)
score_1d = self.tech_agent.analyze(df_1d)
tech_score = score_1h * 0.5 + score_4h * 0.3 + score_1d * 0.2
except Exception:
tech_score = score_1h
# News
coin_news = self.news_client.filter_by_coin(all_news, symbol)
news_score = self.news_agent.analyze(coin_news)
# Social
coin_posts = self.social_client.filter_posts_by_coin(all_social, symbol)
sentiment = self.social_client.simple_sentiment(coin_posts)
social_score = self.social_agent.analyze(sentiment, mention_trend=1.0)
coins_scores[symbol] = {
"technical": tech_score,
"news": news_score,
"social": social_score,
"ai": 50, # placeholder until AI runs
}
coins_for_ai.append({
"symbol": symbol.replace("USDT", ""),
"price": prices.get(symbol, 0),
"change_pct": 0,
"technical_score": tech_score,
"news_score": news_score,
"social_score": social_score,
"headlines": [a["title"] for a in coin_news[:3]],
})
except Exception as e:
logger.warning(f"Analysis failed for {symbol}: {e}")
coins_scores[symbol] = {"technical": 50, "news": 50, "social": 50, "ai": 50}
# 2. AI batch analysis
if coins_for_ai:
ai_results = self.ai_agent.analyze_batch(coins_for_ai)
for symbol in coins_scores:
short = symbol.replace("USDT", "")
if short in ai_results:
coins_scores[symbol]["ai"] = ai_results[short].get("score", 50)
self.ai_summaries[symbol] = ai_results[short].get("summary", "")
# 3. Compute signals
ranked = self.signal_engine.rank_coins(coins_scores)
self.latest_results = {r["symbol"]: r for r in ranked}
# 4. Store signals
for r in ranked:
self.db.insert_signal(
r["symbol"], r["technical"], r["news"], r["social"], r["ai"],
r["composite"], r["signal"],
)
# 5. Portfolio actions
for r in ranked:
symbol = r["symbol"]
price = prices.get(symbol, 0)
if price <= 0:
continue
# Check exits first
self.portfolio.check_exit(symbol, price)
# Then entries
if r["signal"] == "BUY" and symbol not in self.portfolio.positions:
self.portfolio.buy(symbol, price, r["composite"])
elif r["signal"] == "SELL" and symbol in self.portfolio.positions:
self.portfolio.sell(symbol, price, reason="signal-sell")
# 6. Save portfolio snapshot
pv = self.portfolio.get_portfolio_value(prices)
self.db.insert_portfolio_snapshot(
pv["total_value"], pv["cash"], pv["total_pnl"], pv["pnl_pct"]
)
logger.info(f"Analysis complete. {len(ranked)} coins scored. Portfolio: ${pv['total_value']:.2f}")
except Exception as e:
logger.error(f"Analysis cycle failed: {e}")
def create_scheduler(job: AnalysisJob, interval_minutes: int = 15) -> BackgroundScheduler:
scheduler = BackgroundScheduler()
scheduler.add_job(job.run_analysis, "interval", minutes=interval_minutes, id="analysis")
return scheduler
- Step 2: Commit
git add scheduler/
git commit -m "feat: APScheduler analysis job orchestrating all agents"
Task 15: Streamlit Dashboard — Sidebar
Files:
-
Create:
dashboard/__init__.py(empty) -
Create:
dashboard/sidebar.py -
Step 1: Implement dashboard/sidebar.py
import streamlit as st
import json
from config import DEFAULT_WEIGHTS
def render_sidebar(latest_results: dict, db):
st.sidebar.title("Crypto Signals")
# Navigation
page = st.sidebar.radio("View", ["Signals", "Portfolio"], label_visibility="collapsed")
st.sidebar.divider()
# Coin list sorted by score
if latest_results:
coins = sorted(latest_results.values(), key=lambda x: x["composite"], reverse=True)
selected_coin = None
for coin in coins:
signal = coin["signal"]
color = {"BUY": "green", "HOLD": "orange", "SELL": "red"}.get(signal, "gray")
symbol_short = coin["symbol"].replace("USDT", "")
label = f":{color}[{signal}] **{symbol_short}** — {coin['composite']:.0f}"
if st.sidebar.button(label, key=coin["symbol"], use_container_width=True):
st.session_state["selected_coin"] = coin["symbol"]
st.sidebar.divider()
# Weight sliders
st.sidebar.subheader("Signal Weights")
weights = _load_weights(db)
new_weights = {}
new_weights["technical"] = st.sidebar.slider("Technical", 0.0, 1.0, weights["technical"], 0.05)
new_weights["news"] = st.sidebar.slider("News", 0.0, 1.0, weights["news"], 0.05)
new_weights["social"] = st.sidebar.slider("Social", 0.0, 1.0, weights["social"], 0.05)
new_weights["ai"] = st.sidebar.slider("AI", 0.0, 1.0, weights["ai"], 0.05)
total = sum(new_weights.values())
if abs(total - 1.0) > 0.01:
st.sidebar.warning(f"Weights sum: {total:.2f} (must be 1.0)")
else:
if new_weights != weights:
db.save_setting("weights", json.dumps(new_weights))
st.session_state["weights_changed"] = True
return page
def _load_weights(db) -> dict:
raw = db.load_setting("weights")
if raw:
try:
return json.loads(raw)
except json.JSONDecodeError:
pass
return dict(DEFAULT_WEIGHTS)
- Step 2: Commit
git add dashboard/
git commit -m "feat: Streamlit sidebar with coin list and weight sliders"
Task 16: Streamlit Dashboard — Detail Panels
Files:
-
Create:
dashboard/detail.py -
Step 1: Implement dashboard/detail.py
import streamlit as st
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import ta
def render_detail(symbol: str, coin_data: dict, ohlcv_df: pd.DataFrame,
news_articles: list, social_sentiment: dict, ai_summary: str):
symbol_short = symbol.replace("USDT", "")
signal = coin_data.get("signal", "HOLD")
color = {"BUY": "green", "HOLD": "orange", "SELL": "red"}.get(signal, "gray")
st.header(f"{symbol_short}/USDT")
cols = st.columns(5)
cols[0].metric("Signal", signal)
cols[1].metric("Score", f"{coin_data.get('composite', 0):.0f}/100")
cols[2].metric("Technical", f"{coin_data.get('technical', 0):.0f}")
cols[3].metric("News", f"{coin_data.get('news', 0):.0f}")
cols[4].metric("Social / AI", f"{coin_data.get('social', 0):.0f} / {coin_data.get('ai', 0):.0f}")
# Tabs for detail panels
tab_chart, tab_news, tab_social, tab_ai = st.tabs(["Chart", "News", "Social", "AI Analysis"])
with tab_chart:
_render_chart(ohlcv_df, symbol_short)
with tab_news:
_render_news(news_articles)
with tab_social:
_render_social(social_sentiment)
with tab_ai:
_render_ai(ai_summary)
def _render_chart(df: pd.DataFrame, symbol: str):
if df is None or df.empty:
st.info("No chart data available")
return
# Add indicators
df = df.copy()
df["rsi"] = ta.momentum.RSIIndicator(df["close"]).rsi()
macd = ta.trend.MACD(df["close"])
df["macd"] = macd.macd()
df["macd_signal"] = macd.macd_signal()
bb = ta.volatility.BollingerBands(df["close"])
df["bb_high"] = bb.bollinger_hband()
df["bb_low"] = bb.bollinger_lband()
df["sma20"] = df["close"].rolling(20).mean()
fig = make_subplots(
rows=3, cols=1, shared_xaxes=True,
vertical_spacing=0.05,
row_heights=[0.6, 0.2, 0.2],
subplot_titles=(f"{symbol} Price", "RSI", "MACD"),
)
# Candlestick
fig.add_trace(go.Candlestick(
x=df["timestamp"], open=df["open"], high=df["high"],
low=df["low"], close=df["close"], name="Price",
), row=1, col=1)
# Bollinger Bands
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["bb_high"], line=dict(color="rgba(173,216,230,0.3)"), name="BB Upper"), row=1, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["bb_low"], line=dict(color="rgba(173,216,230,0.3)"), fill="tonexty", name="BB Lower"), row=1, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["sma20"], line=dict(color="orange", width=1), name="SMA20"), row=1, col=1)
# RSI
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["rsi"], line=dict(color="purple"), name="RSI"), row=2, col=1)
fig.add_hline(y=70, line_dash="dash", line_color="red", row=2, col=1)
fig.add_hline(y=30, line_dash="dash", line_color="green", row=2, col=1)
# MACD
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["macd"], line=dict(color="blue"), name="MACD"), row=3, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["macd_signal"], line=dict(color="red"), name="Signal"), row=3, col=1)
fig.update_layout(
template="plotly_dark", height=700, showlegend=False,
xaxis_rangeslider_visible=False,
)
st.plotly_chart(fig, use_container_width=True)
def _render_news(articles: list):
if not articles:
st.info("No recent news for this coin")
return
for article in articles[:10]:
votes = article.get("sentiment_votes", {})
pos = votes.get("positive", 0)
neg = votes.get("negative", 0)
if pos > neg:
icon = ":green_circle:"
elif neg > pos:
icon = ":red_circle:"
else:
icon = ":white_circle:"
st.markdown(f"{icon} **{article['title']}**")
st.caption(f"Published: {article.get('published_at', 'N/A')} | +{pos} -{neg}")
st.divider()
def _render_social(sentiment: dict):
if not sentiment or sentiment.get("total", 0) == 0:
st.info("No social data available")
return
total = sentiment["total"]
cols = st.columns(3)
cols[0].metric("Positive", f"{sentiment['positive']}/{total}", f"{sentiment['positive']/total*100:.0f}%")
cols[1].metric("Negative", f"{sentiment['negative']}/{total}", f"-{sentiment['negative']/total*100:.0f}%")
cols[2].metric("Neutral", f"{sentiment['neutral']}/{total}")
def _render_ai(summary: str):
if not summary:
st.info("AI analysis not available")
return
st.markdown(summary)
- Step 2: Commit
git add dashboard/detail.py
git commit -m "feat: detail panels with chart, news, social, AI views"
Task 17: Streamlit Dashboard — Portfolio View
Files:
-
Create:
dashboard/portfolio_view.py -
Step 1: Implement dashboard/portfolio_view.py
import streamlit as st
import plotly.graph_objects as go
import pandas as pd
def render_portfolio(portfolio_manager, current_prices: dict, db):
st.header("Portfolio Simulator")
pv = portfolio_manager.get_portfolio_value(current_prices)
# Summary bar
cols = st.columns(5)
cols[0].metric("Initial Capital", f"${portfolio_manager.initial_capital:.2f}")
cols[1].metric("Current Value", f"${pv['total_value']:.2f}")
pnl_delta = f"{pv['pnl_pct']:+.1f}%"
cols[2].metric("Total P&L", f"${pv['total_pnl']:+.2f}", pnl_delta)
cols[3].metric("Win Rate", f"{pv['win_rate']:.0f}%")
cols[4].metric("Available Cash", f"${pv['cash']:.2f}")
st.divider()
col_left, col_right = st.columns([3, 2])
with col_left:
# Current holdings
st.subheader("Current Holdings")
if portfolio_manager.positions:
rows = []
for sym, pos in portfolio_manager.positions.items():
price = current_prices.get(sym, pos["entry_price"])
value = pos["quantity"] * price
pnl = value - pos["invested_usd"]
pnl_pct = (pnl / pos["invested_usd"] * 100) if pos["invested_usd"] > 0 else 0
rows.append({
"Coin": sym.replace("USDT", ""),
"Invested": f"${pos['invested_usd']:.2f}",
"Qty": f"{pos['quantity']:.6f}",
"Entry": f"${pos['entry_price']:.4f}",
"Current": f"${price:.4f}",
"P&L": f"${pnl:+.2f} ({pnl_pct:+.1f}%)",
})
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
else:
st.info("No open positions")
with col_right:
# Allocation chart
st.subheader("Allocation")
if portfolio_manager.positions:
labels = [s.replace("USDT", "") for s in portfolio_manager.positions]
values = [p["quantity"] * current_prices.get(s, p["entry_price"])
for s, p in portfolio_manager.positions.items()]
labels.append("Cash")
values.append(pv["cash"])
fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=0.4)])
fig.update_layout(template="plotly_dark", height=300, margin=dict(t=20, b=20))
st.plotly_chart(fig, use_container_width=True)
else:
st.info("100% Cash")
# Trade history
st.divider()
st.subheader("Trade History")
if portfolio_manager.trades:
trade_rows = []
for t in reversed(portfolio_manager.trades[-20:]):
trade_rows.append({
"Time": t["timestamp"][:16],
"Coin": t["coin"].replace("USDT", ""),
"Side": t["side"],
"Price": f"${t['price']:.4f}",
"Amount": f"${t['amount_usd']:.2f}",
"Reason": t["reason"],
})
st.dataframe(pd.DataFrame(trade_rows), use_container_width=True, hide_index=True)
else:
st.info("No trades yet")
- Step 2: Commit
git add dashboard/portfolio_view.py
git commit -m "feat: portfolio view with holdings, allocation, and trade history"
Task 18: Main Streamlit App (run.py)
Files:
-
Create:
run.py -
Step 1: Implement run.py
import streamlit as st
import logging
from logging.handlers import RotatingFileHandler
import os
import json
from config import (
BINANCE_API_KEY, BINANCE_SECRET, CRYPTOPANIC_API_KEY, NEWS_API_KEY,
TWITTER_BEARER_TOKEN, REDDIT_CLIENT_ID, REDDIT_SECRET, REDDIT_USER_AGENT,
ANTHROPIC_API_KEY, DB_PATH, LOG_DIR, TOP_N_COINS, INITIAL_CAPITAL,
ANALYSIS_INTERVAL_MINUTES, DEFAULT_WEIGHTS, SURGE_VOLUME_MULTIPLIER,
)
from data.db import Database
from data.binance_rest import BinanceRestClient
from data.binance_ws import BinanceWSClient
from data.news_client import NewsClient
from data.social_client import SocialClient
from agents.ai_analyst import AIAgent
from engine.signal import SignalEngine
from engine.surge import SurgeDetector
from engine.portfolio import PortfolioManager
from scheduler.jobs import AnalysisJob, create_scheduler
from dashboard.sidebar import render_sidebar
from dashboard.detail import render_detail
from dashboard.portfolio_view import render_portfolio
# --- Logging ---
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
RotatingFileHandler(os.path.join(LOG_DIR, "app.log"), maxBytes=10_000_000, backupCount=5),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
# --- Page config ---
st.set_page_config(page_title="Crypto Signal Dashboard", layout="wide", page_icon="📊")
# --- Init session state ---
if "initialized" not in st.session_state:
st.session_state.initialized = False
st.session_state.selected_coin = None
@st.cache_resource
def init_services():
db = Database(DB_PATH)
db.init()
binance_rest = BinanceRestClient(BINANCE_API_KEY, BINANCE_SECRET)
binance_ws = BinanceWSClient(BINANCE_API_KEY, BINANCE_SECRET)
news_client = NewsClient(CRYPTOPANIC_API_KEY, NEWS_API_KEY)
social_client = SocialClient(REDDIT_CLIENT_ID, REDDIT_SECRET, REDDIT_USER_AGENT, TWITTER_BEARER_TOKEN)
ai_agent = AIAgent(ANTHROPIC_API_KEY)
signal_engine = SignalEngine()
surge_detector = SurgeDetector(SURGE_VOLUME_MULTIPLIER)
portfolio = PortfolioManager(INITIAL_CAPITAL)
# Load saved weights
saved_weights = db.load_setting("weights")
if saved_weights:
try:
signal_engine.set_weights(json.loads(saved_weights))
except (json.JSONDecodeError, ValueError):
pass
# Get monitored coins
try:
monitored = binance_rest.get_top_coins(TOP_N_COINS)
except Exception:
monitored = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "XRPUSDT"]
logger.warning("Failed to fetch top coins, using defaults")
# Create analysis job
job = AnalysisJob(
binance=binance_rest, news_client=news_client,
social_client=social_client, ai_agent=ai_agent,
signal_engine=signal_engine, surge_detector=surge_detector,
portfolio=portfolio, db=db, monitored_coins=monitored,
)
# Start scheduler
scheduler = create_scheduler(job, ANALYSIS_INTERVAL_MINUTES)
scheduler.start()
# Start WebSocket
try:
binance_ws.start(monitored[:20]) # limit WS to top 20
except Exception as e:
logger.warning(f"WebSocket start failed: {e}")
# Run first analysis
job.run_analysis()
return {
"db": db, "binance_rest": binance_rest, "binance_ws": binance_ws,
"news_client": news_client, "social_client": social_client,
"signal_engine": signal_engine, "portfolio": portfolio,
"job": job, "monitored": monitored,
}
# --- Main ---
def main():
services = init_services()
db = services["db"]
job = services["job"]
portfolio = services["portfolio"]
binance_rest = services["binance_rest"]
binance_ws = services["binance_ws"]
news_client = services["news_client"]
social_client = services["social_client"]
# Get current prices (prefer WS, fallback REST)
current_prices = binance_ws.get_all_prices()
if not current_prices:
try:
current_prices = binance_rest.get_all_prices()
except Exception:
current_prices = {}
# Sidebar
page = render_sidebar(job.latest_results, db)
if page == "Signals":
selected = st.session_state.get("selected_coin")
if selected and selected in job.latest_results:
coin_data = job.latest_results[selected]
try:
ohlcv = binance_rest.get_ohlcv(selected, interval="1h", limit=100)
except Exception:
ohlcv = None
coin_news = news_client.filter_by_coin(news_client._cache, selected)
coin_posts = social_client.filter_posts_by_coin(social_client._cache, selected)
sentiment = social_client.simple_sentiment(coin_posts)
ai_summary = job.ai_summaries.get(selected, "AI analysis not yet available.")
render_detail(selected, coin_data, ohlcv, coin_news, sentiment, ai_summary)
else:
st.title("Crypto Signal Dashboard")
st.info("Select a coin from the sidebar to view detailed analysis.")
if job.latest_results:
_render_overview(job.latest_results)
elif page == "Portfolio":
render_portfolio(portfolio, current_prices, db)
def _render_overview(results: dict):
import pandas as pd
st.subheader("Signal Overview")
buy_coins = [r for r in results.values() if r["signal"] == "BUY"]
sell_coins = [r for r in results.values() if r["signal"] == "SELL"]
col1, col2 = st.columns(2)
with col1:
st.markdown("### :green[BUY Signals]")
if buy_coins:
rows = [{"Coin": c["symbol"].replace("USDT",""), "Score": f"{c['composite']:.0f}",
"Tech": f"{c['technical']:.0f}", "News": f"{c['news']:.0f}"}
for c in sorted(buy_coins, key=lambda x: x["composite"], reverse=True)]
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
else:
st.info("No BUY signals currently")
with col2:
st.markdown("### :red[SELL Signals]")
if sell_coins:
rows = [{"Coin": c["symbol"].replace("USDT",""), "Score": f"{c['composite']:.0f}",
"Tech": f"{c['technical']:.0f}", "News": f"{c['news']:.0f}"}
for c in sorted(sell_coins, key=lambda x: x["composite"])]
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
else:
st.info("No SELL signals currently")
if __name__ == "__main__":
main()
- Step 2: Commit
git add run.py
git commit -m "feat: main Streamlit app with dashboard orchestration"
Task 19: Integration Test & First Run
- Step 1: Run all unit tests
Run: cd crypto_news_trading && python -m pytest tests/ -v
Expected: All tests pass
- Step 2: Create .env file with real API keys
Copy .env.example to .env and fill in real API keys.
- Step 3: Test launch
Run: cd crypto_news_trading && streamlit run run.py
Expected: Dashboard opens in browser at http://localhost:8501
-
Step 4: Verify each component
-
Sidebar shows coin list with scores
-
Clicking a coin shows chart, news, social, AI panels
-
Portfolio tab shows $200 initial capital
-
Trades begin appearing after first analysis cycle
-
Step 5: Final commit
git add -A
git commit -m "feat: complete crypto signal dashboard v1.0"