Compare commits

...

10 Commits

Author SHA1 Message Date
e16d944985 feat: Streamlit dashboard with sidebar, detail, portfolio, and main app
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:56:31 +09:00
9cc8241e22 feat: APScheduler analysis job orchestrating all agents
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:54:55 +09:00
7e1d556385 feat: technical, news, and social analysis agents
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:52:51 +09:00
46e06df131 feat: AI agent, signal engine, surge detector, portfolio simulator
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:52:05 +09:00
adad553a65 feat: Binance WS, news client, and social client
Add ThreadedWebsocketManager-based BinanceWSClient for real-time price
streaming, NewsClient for CryptoPanic/NewsAPI fetching with coin filtering,
and SocialClient for Reddit post retrieval with keyword filtering and
simple keyword-based sentiment scoring. Includes unit tests for news and
social clients (4/4 passing).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:48:12 +09:00
ffada928f2 Add database layer with SQLite schema, ORM-style Database class, and tests
Implements Task 2: creates db/schema.sql with five tables (signals, trades,
positions, portfolio, settings) and indexes; data/db.py with a Database class
covering all CRUD operations; tests/test_db.py with 6 passing pytest tests.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:47:34 +09:00
45a39e78c4 Add Binance REST client with OHLCV, ticker, and volume methods
Implements BinanceRestClient wrapping python-binance with get_top_coins,
get_ohlcv, get_all_prices, and get_24h_volume. Includes mocked unit tests
for all public methods.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:47:04 +09:00
1a241e8b7b Add project setup: dependencies, config, and package scaffolding
- requirements.txt with all core dependencies (streamlit, binance, ta, anthropic, praw, etc.)
- .env.example template for all required API keys
- .gitignore covering secrets, caches, db files, and logs
- config.py loading env vars with trading defaults and signal weights
- tests/conftest.py with sys.path fix for imports
- Empty __init__.py files for tests, data, agents, engine, dashboard, scheduler packages

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-20 17:44:45 +09:00
9557a95409 Add implementation plan for crypto signal dashboard
19 tasks covering: project setup, DB layer, Binance REST/WS, news/social clients,
4 analysis agents, signal engine, surge detector, portfolio simulator,
Streamlit dashboard (sidebar, detail, portfolio), scheduler, and integration test.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 17:24:12 +09:00
f8355248b4 Add crypto signal dashboard design spec
Comprehensive spec covering: 4 analysis agents (technical, news, social, AI),
signal scoring engine, virtual $200 spot portfolio simulator with P&L tracking,
Streamlit dashboard with sidebar+detail layout, and failure handling strategy.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 17:12:39 +09:00
41 changed files with 4600 additions and 0 deletions

9
.env.example Normal file
View File

@@ -0,0 +1,9 @@
BINANCE_API_KEY=
BINANCE_SECRET=
CRYPTOPANIC_API_KEY=
NEWS_API_KEY=
TWITTER_BEARER_TOKEN=
REDDIT_CLIENT_ID=
REDDIT_SECRET=
REDDIT_USER_AGENT=crypto_signal_bot/1.0
ANTHROPIC_API_KEY=

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
.env
__pycache__/
*.pyc
*.db
logs/
.superpowers/

0
agents/__init__.py Normal file
View File

65
agents/ai_analyst.py Normal file
View File

@@ -0,0 +1,65 @@
import json
import logging
from anthropic import Anthropic
logger = logging.getLogger(__name__)
class AIAgent:
def __init__(self, api_key: str):
self.client = Anthropic(api_key=api_key) if api_key else None
def analyze_batch(self, coins_data: list[dict]) -> dict[str, dict]:
if not coins_data or not self.client:
return {c.get("symbol",""): {"score": 50, "summary": "AI not configured"} for c in coins_data}
prompt = self._build_prompt(coins_data)
try:
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}],
)
return self._parse_response(response.content[0].text, coins_data)
except Exception as e:
logger.error(f"AI analysis failed: {e}")
return {c["symbol"]: {"score": 50, "summary": "Analysis unavailable"} for c in coins_data}
def _build_prompt(self, coins_data: list[dict]) -> str:
coins_text = ""
for coin in coins_data:
coins_text += f"""
Coin: {coin['symbol']}
- Price: ${coin.get('price', 'N/A')}
- 24h Change: {coin.get('change_pct', 'N/A')}%
- Technical Score: {coin.get('technical_score', 'N/A')}/100
- News Sentiment: {coin.get('news_score', 'N/A')}/100
- Social Sentiment: {coin.get('social_score', 'N/A')}/100
- Recent Headlines: {', '.join(coin.get('headlines', [])[:3])}
"""
return f"""You are a crypto market analyst. Analyze these coins for short-term (24h) spot trading potential.
For each coin, provide:
1. A score from 0-100 (0=strong sell, 50=neutral, 100=strong buy)
2. A brief 1-2 sentence summary explaining your reasoning
{coins_text}
Respond in JSON format:
{{
"SYMBOL": {{"score": NUMBER, "summary": "TEXT"}},
...
}}
Only output the JSON, no other text."""
def _parse_response(self, text: str, coins_data: list[dict]) -> dict[str, dict]:
try:
text = text.strip()
if text.startswith("```"):
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
return json.loads(text)
except (json.JSONDecodeError, IndexError) as e:
logger.warning(f"Failed to parse AI response: {e}")
return {c["symbol"]: {"score": 50, "summary": "Parse error"} for c in coins_data}

41
agents/news.py Normal file
View File

@@ -0,0 +1,41 @@
import logging
logger = logging.getLogger(__name__)
POSITIVE_WORDS = {"surge", "rally", "bullish", "high", "gain", "profit", "grows", "adoption", "breakout", "soar", "record"}
NEGATIVE_WORDS = {"crash", "drop", "bearish", "low", "loss", "fear", "down", "dump", "scam", "hack", "ban", "panic", "plunge"}
class NewsAgent:
def analyze(self, articles: list[dict]) -> float:
if not articles:
return 50.0
try:
scores = []
for article in articles:
vote_score = self._vote_sentiment(article.get("sentiment_votes", {}))
text_score = self._text_sentiment(article.get("title", ""))
if vote_score is not None:
scores.append(vote_score * 0.6 + text_score * 0.4)
else:
scores.append(text_score)
return round(max(0, min(100, sum(scores) / len(scores))), 1)
except Exception as e:
logger.error(f"News analysis error: {e}")
return 50.0
def _vote_sentiment(self, votes: dict) -> float | None:
pos = votes.get("positive", 0)
neg = votes.get("negative", 0)
total = pos + neg
if total == 0:
return None
return (pos / total) * 100
def _text_sentiment(self, text: str) -> float:
words = set(text.lower().split())
pos = len(words & POSITIVE_WORDS)
neg = len(words & NEGATIVE_WORDS)
total = pos + neg
if total == 0:
return 50.0
return (pos / total) * 100

33
agents/social.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
logger = logging.getLogger(__name__)
class SocialAgent:
def analyze(self, sentiment: dict, mention_trend: float = 1.0) -> float:
total = sentiment.get("total", 0)
if total == 0:
return 50.0
try:
pos = sentiment.get("positive", 0)
ratio = pos / total
sentiment_score = ratio * 100
if mention_trend > 2.0:
trend_bonus = 15
elif mention_trend > 1.5:
trend_bonus = 10
elif mention_trend > 1.0:
trend_bonus = 5
elif mention_trend < 0.5:
trend_bonus = -10
else:
trend_bonus = 0
if ratio < 0.4 and mention_trend > 1.5:
trend_bonus = -15
score = sentiment_score + trend_bonus
return round(max(0, min(100, score)), 1)
except Exception as e:
logger.error(f"Social analysis error: {e}")
return 50.0

113
agents/technical.py Normal file
View File

@@ -0,0 +1,113 @@
import pandas as pd
import ta
import logging
logger = logging.getLogger(__name__)
class TechnicalAgent:
MIN_CANDLES = 30
def analyze(self, df: pd.DataFrame) -> float:
if len(df) < self.MIN_CANDLES:
return 50.0
try:
signals = []
# RSI (14)
rsi = ta.momentum.RSIIndicator(df["close"], window=14).rsi().iloc[-1]
if rsi < 30:
signals.append(65)
elif rsi < 45:
signals.append(70)
elif rsi < 55:
signals.append(50)
elif rsi < 70:
signals.append(35)
else:
signals.append(40)
# MACD
macd_ind = ta.trend.MACD(df["close"])
macd_diff = macd_ind.macd_diff().iloc[-1]
prev_diff = macd_ind.macd_diff().iloc[-2]
if macd_diff > 0 and prev_diff <= 0:
signals.append(85)
elif macd_diff < 0 and prev_diff >= 0:
signals.append(15)
elif macd_diff > 0:
signals.append(65)
else:
signals.append(35)
# Bollinger Bands
bb = ta.volatility.BollingerBands(df["close"])
bb_high = bb.bollinger_hband().iloc[-1]
bb_low = bb.bollinger_lband().iloc[-1]
price = df["close"].iloc[-1]
bb_pct = (price - bb_low) / (bb_high - bb_low) if (bb_high - bb_low) > 0 else 0.5
if bb_pct < 0.2:
signals.append(55)
elif bb_pct > 0.8:
signals.append(40)
else:
signals.append(50)
# SMA trend (20 vs 50)
sma20 = df["close"].rolling(20).mean().iloc[-1]
sma50 = df["close"].rolling(50).mean().iloc[-1] if len(df) >= 50 else sma20
if price > sma20 > sma50:
signals.append(80)
elif price > sma20:
signals.append(65)
elif price < sma20 < sma50:
signals.append(20)
else:
signals.append(40)
# SMA 200 (long-term trend)
if len(df) >= 200:
sma200 = df["close"].rolling(200).mean().iloc[-1]
if price > sma200:
signals.append(70)
else:
signals.append(30)
# Volume trend
vol_avg = df["volume"].rolling(20).mean().iloc[-1]
vol_current = df["volume"].iloc[-1]
vol_ratio = vol_current / vol_avg if vol_avg > 0 else 1.0
if vol_ratio > 2.0 and price > sma20:
signals.append(80)
elif vol_ratio > 2.0:
signals.append(40)
else:
signals.append(50)
# OBV trend
obv = ta.volume.OnBalanceVolumeIndicator(df["close"], df["volume"]).on_balance_volume()
obv_sma = obv.rolling(20).mean()
if obv.iloc[-1] > obv_sma.iloc[-1]:
signals.append(65)
else:
signals.append(35)
# Candlestick patterns (simplified)
last = df.iloc[-1]
prev = df.iloc[-2]
body = last["close"] - last["open"]
prev_body = prev["close"] - prev["open"]
if prev_body < 0 and body > 0 and body > abs(prev_body):
signals.append(80) # bullish engulfing
elif prev_body > 0 and body < 0 and abs(body) > prev_body:
signals.append(20) # bearish engulfing
elif abs(body) < (last["high"] - last["low"]) * 0.1:
signals.append(50) # doji
else:
signals.append(50)
return round(sum(signals) / len(signals), 1)
except Exception as e:
logger.error(f"Technical analysis error: {e}")
return 50.0

36
config.py Normal file
View File

@@ -0,0 +1,36 @@
import os
from dotenv import load_dotenv
load_dotenv()
# API Keys
BINANCE_API_KEY = os.getenv("BINANCE_API_KEY", "")
BINANCE_SECRET = os.getenv("BINANCE_SECRET", "")
CRYPTOPANIC_API_KEY = os.getenv("CRYPTOPANIC_API_KEY", "")
NEWS_API_KEY = os.getenv("NEWS_API_KEY", "")
TWITTER_BEARER_TOKEN = os.getenv("TWITTER_BEARER_TOKEN", "")
REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID", "")
REDDIT_SECRET = os.getenv("REDDIT_SECRET", "")
REDDIT_USER_AGENT = os.getenv("REDDIT_USER_AGENT", "crypto_signal_bot/1.0")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
# Defaults
DB_PATH = os.path.join(os.path.dirname(__file__), "crypto_signals.db")
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
TOP_N_COINS = 50
SURGE_VOLUME_MULTIPLIER = 3.0
MAX_POSITIONS = 5
INITIAL_CAPITAL = 200.0
STOP_LOSS_PCT = -0.08
TAKE_PROFIT_1_PCT = 0.15
TAKE_PROFIT_2_PCT = 0.25
MIN_POSITION_USD = 15.0
ANALYSIS_INTERVAL_MINUTES = 15
# Default weights
DEFAULT_WEIGHTS = {
"technical": 0.6,
"news": 0.2,
"social": 0.1,
"ai": 0.1,
}

0
dashboard/__init__.py Normal file
View File

93
dashboard/detail.py Normal file
View File

@@ -0,0 +1,93 @@
import streamlit as st
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import ta
def render_detail(symbol: str, coin_data: dict, ohlcv_df: pd.DataFrame,
news_articles: list, social_sentiment: dict, ai_summary: str):
symbol_short = symbol.replace("USDT", "")
signal = coin_data.get("signal", "HOLD")
st.header(f"{symbol_short}/USDT")
cols = st.columns(5)
cols[0].metric("Signal", signal)
cols[1].metric("Score", f"{coin_data.get('composite', 0):.0f}/100")
cols[2].metric("Technical", f"{coin_data.get('technical', 0):.0f}")
cols[3].metric("News", f"{coin_data.get('news', 0):.0f}")
cols[4].metric("Social / AI", f"{coin_data.get('social', 0):.0f} / {coin_data.get('ai', 0):.0f}")
tab_chart, tab_news, tab_social, tab_ai = st.tabs(["Chart", "News", "Social", "AI Analysis"])
with tab_chart:
_render_chart(ohlcv_df, symbol_short)
with tab_news:
_render_news(news_articles)
with tab_social:
_render_social(social_sentiment)
with tab_ai:
_render_ai(ai_summary)
def _render_chart(df: pd.DataFrame, symbol: str):
if df is None or df.empty:
st.info("No chart data available")
return
df = df.copy()
df["rsi"] = ta.momentum.RSIIndicator(df["close"]).rsi()
macd = ta.trend.MACD(df["close"])
df["macd"] = macd.macd()
df["macd_signal"] = macd.macd_signal()
bb = ta.volatility.BollingerBands(df["close"])
df["bb_high"] = bb.bollinger_hband()
df["bb_low"] = bb.bollinger_lband()
df["sma20"] = df["close"].rolling(20).mean()
fig = make_subplots(rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.05,
row_heights=[0.6, 0.2, 0.2],
subplot_titles=(f"{symbol} Price", "RSI", "MACD"))
fig.add_trace(go.Candlestick(x=df["timestamp"], open=df["open"], high=df["high"],
low=df["low"], close=df["close"], name="Price"), row=1, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["bb_high"], line=dict(color="rgba(173,216,230,0.3)"), name="BB Upper"), row=1, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["bb_low"], line=dict(color="rgba(173,216,230,0.3)"), fill="tonexty", name="BB Lower"), row=1, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["sma20"], line=dict(color="orange", width=1), name="SMA20"), row=1, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["rsi"], line=dict(color="purple"), name="RSI"), row=2, col=1)
fig.add_hline(y=70, line_dash="dash", line_color="red", row=2, col=1)
fig.add_hline(y=30, line_dash="dash", line_color="green", row=2, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["macd"], line=dict(color="blue"), name="MACD"), row=3, col=1)
fig.add_trace(go.Scatter(x=df["timestamp"], y=df["macd_signal"], line=dict(color="red"), name="Signal"), row=3, col=1)
fig.update_layout(template="plotly_dark", height=700, showlegend=False, xaxis_rangeslider_visible=False)
st.plotly_chart(fig, use_container_width=True)
def _render_news(articles: list):
if not articles:
st.info("No recent news for this coin")
return
for article in articles[:10]:
votes = article.get("sentiment_votes", {})
pos = votes.get("positive", 0)
neg = votes.get("negative", 0)
if pos > neg:
icon = ":green_circle:"
elif neg > pos:
icon = ":red_circle:"
else:
icon = ":white_circle:"
st.markdown(f"{icon} **{article['title']}**")
st.caption(f"Published: {article.get('published_at', 'N/A')} | +{pos} -{neg}")
st.divider()
def _render_social(sentiment: dict):
if not sentiment or sentiment.get("total", 0) == 0:
st.info("No social data available")
return
total = sentiment["total"]
cols = st.columns(3)
cols[0].metric("Positive", f"{sentiment['positive']}/{total}", f"{sentiment['positive']/total*100:.0f}%")
cols[1].metric("Negative", f"{sentiment['negative']}/{total}", f"-{sentiment['negative']/total*100:.0f}%")
cols[2].metric("Neutral", f"{sentiment['neutral']}/{total}")
def _render_ai(summary: str):
if not summary:
st.info("AI analysis not available")
return
st.markdown(summary)

View File

@@ -0,0 +1,70 @@
import streamlit as st
import plotly.graph_objects as go
import pandas as pd
def render_portfolio(portfolio_manager, current_prices: dict, db):
st.header("Portfolio Simulator")
pv = portfolio_manager.get_portfolio_value(current_prices)
cols = st.columns(5)
cols[0].metric("Initial Capital", f"${portfolio_manager.initial_capital:.2f}")
cols[1].metric("Current Value", f"${pv['total_value']:.2f}")
pnl_delta = f"{pv['pnl_pct']:+.1f}%"
cols[2].metric("Total P&L", f"${pv['total_pnl']:+.2f}", pnl_delta)
cols[3].metric("Win Rate", f"{pv['win_rate']:.0f}%")
cols[4].metric("Available Cash", f"${pv['cash']:.2f}")
st.divider()
col_left, col_right = st.columns([3, 2])
with col_left:
st.subheader("Current Holdings")
if portfolio_manager.positions:
rows = []
for sym, pos in portfolio_manager.positions.items():
price = current_prices.get(sym, pos["entry_price"])
value = pos["quantity"] * price
pnl = value - pos["invested_usd"]
pnl_pct = (pnl / pos["invested_usd"] * 100) if pos["invested_usd"] > 0 else 0
rows.append({
"Coin": sym.replace("USDT", ""),
"Invested": f"${pos['invested_usd']:.2f}",
"Qty": f"{pos['quantity']:.6f}",
"Entry": f"${pos['entry_price']:.4f}",
"Current": f"${price:.4f}",
"P&L": f"${pnl:+.2f} ({pnl_pct:+.1f}%)",
})
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
else:
st.info("No open positions")
with col_right:
st.subheader("Allocation")
if portfolio_manager.positions:
labels = [s.replace("USDT", "") for s in portfolio_manager.positions]
values = [p["quantity"] * current_prices.get(s, p["entry_price"])
for s, p in portfolio_manager.positions.items()]
labels.append("Cash")
values.append(pv["cash"])
fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=0.4)])
fig.update_layout(template="plotly_dark", height=300, margin=dict(t=20, b=20))
st.plotly_chart(fig, use_container_width=True)
else:
st.info("100% Cash")
st.divider()
st.subheader("Trade History")
if portfolio_manager.trades:
trade_rows = []
for t in reversed(portfolio_manager.trades[-20:]):
trade_rows.append({
"Time": t["timestamp"][:16],
"Coin": t["coin"].replace("USDT", ""),
"Side": t["side"],
"Price": f"${t['price']:.4f}",
"Amount": f"${t['amount_usd']:.2f}",
"Reason": t["reason"],
})
st.dataframe(pd.DataFrame(trade_rows), use_container_width=True, hide_index=True)
else:
st.info("No trades yet")

46
dashboard/sidebar.py Normal file
View File

@@ -0,0 +1,46 @@
import streamlit as st
import json
from config import DEFAULT_WEIGHTS
def render_sidebar(latest_results: dict, db):
st.sidebar.title("Crypto Signals")
page = st.sidebar.radio("View", ["Signals", "Portfolio"], label_visibility="collapsed")
st.sidebar.divider()
if latest_results:
coins = sorted(latest_results.values(), key=lambda x: x["composite"], reverse=True)
for coin in coins:
signal = coin["signal"]
color = {"BUY": "green", "HOLD": "orange", "SELL": "red"}.get(signal, "gray")
symbol_short = coin["symbol"].replace("USDT", "")
label = f":{color}[{signal}] **{symbol_short}** — {coin['composite']:.0f}"
if st.sidebar.button(label, key=coin["symbol"], use_container_width=True):
st.session_state["selected_coin"] = coin["symbol"]
st.sidebar.divider()
st.sidebar.subheader("Signal Weights")
weights = _load_weights(db)
new_weights = {}
new_weights["technical"] = st.sidebar.slider("Technical", 0.0, 1.0, weights["technical"], 0.05)
new_weights["news"] = st.sidebar.slider("News", 0.0, 1.0, weights["news"], 0.05)
new_weights["social"] = st.sidebar.slider("Social", 0.0, 1.0, weights["social"], 0.05)
new_weights["ai"] = st.sidebar.slider("AI", 0.0, 1.0, weights["ai"], 0.05)
total = sum(new_weights.values())
if abs(total - 1.0) > 0.01:
st.sidebar.warning(f"Weights sum: {total:.2f} (must be 1.0)")
else:
if new_weights != weights:
db.save_setting("weights", json.dumps(new_weights))
st.session_state["weights_changed"] = True
return page
def _load_weights(db) -> dict:
raw = db.load_setting("weights")
if raw:
try:
return json.loads(raw)
except json.JSONDecodeError:
pass
return dict(DEFAULT_WEIGHTS)

0
data/__init__.py Normal file
View File

42
data/binance_rest.py Normal file
View File

@@ -0,0 +1,42 @@
import pandas as pd
from binance.client import Client
import logging
logger = logging.getLogger(__name__)
class BinanceRestClient:
def __init__(self, api_key: str, api_secret: str):
self.client = Client(api_key, api_secret)
def get_top_coins(self, limit: int = 50) -> list[str]:
tickers = self.client.get_ticker()
usdt_pairs = [
t for t in tickers
if t["symbol"].endswith("USDT") and not t["symbol"].startswith("USDT")
]
usdt_pairs.sort(key=lambda x: float(x["quoteVolume"]), reverse=True)
return [t["symbol"] for t in usdt_pairs[:limit]]
def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> pd.DataFrame:
klines = self.client.get_klines(symbol=symbol, interval=interval, limit=limit)
df = pd.DataFrame(klines, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_volume", "trades", "taker_buy_base",
"taker_buy_quote", "ignore",
])
for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
df[col] = df[col].astype(float)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
return df[["timestamp", "open", "high", "low", "close", "volume", "quote_volume"]]
def get_all_prices(self) -> dict[str, float]:
tickers = self.client.get_all_tickers()
return {t["symbol"]: float(t["price"]) for t in tickers}
def get_24h_volume(self, symbol: str) -> dict:
ticker = self.client.get_ticker(symbol=symbol)
return {
"volume": float(ticker["volume"]),
"quote_volume": float(ticker["quoteVolume"]),
"price_change_pct": float(ticker["priceChangePercent"]),
}

51
data/binance_ws.py Normal file
View File

@@ -0,0 +1,51 @@
import threading
import logging
from binance import ThreadedWebsocketManager
logger = logging.getLogger(__name__)
class BinanceWSClient:
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
self.twm = None
self.prices: dict[str, float] = {}
self._lock = threading.Lock()
def start(self, symbols: list[str]):
self.twm = ThreadedWebsocketManager(
api_key=self.api_key, api_secret=self.api_secret
)
self.twm.start()
streams = [s.lower() + "@miniTicker" for s in symbols]
self.twm.start_multiplex_socket(
callback=self._handle_message, streams=streams
)
logger.info(f"WebSocket started for {len(symbols)} symbols")
def _handle_message(self, msg):
if msg.get("e") == "error":
logger.error(f"WebSocket error: {msg}")
return
data = msg.get("data", msg)
if "s" in data and "c" in data:
with self._lock:
self.prices[data["s"]] = float(data["c"])
def get_price(self, symbol: str) -> float | None:
with self._lock:
return self.prices.get(symbol)
def get_all_prices(self) -> dict[str, float]:
with self._lock:
return dict(self.prices)
def update_symbols(self, symbols: list[str]):
if self.twm:
self.stop()
self.start(symbols)
def stop(self):
if self.twm:
self.twm.stop()
logger.info("WebSocket stopped")

99
data/db.py Normal file
View File

@@ -0,0 +1,99 @@
import sqlite3
import os
class Database:
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = None
def init(self):
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA busy_timeout=5000")
self.conn.row_factory = sqlite3.Row
schema_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "db", "schema.sql")
with open(schema_path) as f:
self.conn.executescript(f.read())
def close(self):
if self.conn:
self.conn.close()
def execute(self, sql, params=()):
return self.conn.execute(sql, params)
def insert_signal(self, coin, technical, news, social, ai, composite, signal):
self.conn.execute(
"INSERT INTO signals (coin, technical_score, news_score, social_score, ai_score, composite_score, signal) VALUES (?,?,?,?,?,?,?)",
(coin, technical, news, social, ai, composite, signal),
)
self.conn.commit()
def get_latest_signals(self, limit=50):
return self.conn.execute(
"SELECT * FROM signals WHERE id IN (SELECT MAX(id) FROM signals GROUP BY coin) ORDER BY composite_score DESC LIMIT ?",
(limit,),
).fetchall()
def insert_trade(self, coin, side, price, quantity, amount_usd, reason):
self.conn.execute(
"INSERT INTO trades (coin, side, price, quantity, amount_usd, reason) VALUES (?,?,?,?,?,?)",
(coin, side, price, quantity, amount_usd, reason),
)
self.conn.commit()
def get_trades(self, limit=100):
return self.conn.execute(
"SELECT * FROM trades ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
def open_position(self, coin, entry_price, quantity, invested_usd):
self.conn.execute(
"INSERT INTO positions (coin, entry_price, quantity, invested_usd) VALUES (?,?,?,?)",
(coin, entry_price, quantity, invested_usd),
)
self.conn.commit()
def get_open_positions(self):
return self.conn.execute(
"SELECT * FROM positions WHERE status='OPEN'"
).fetchall()
def close_position(self, position_id):
self.conn.execute(
"UPDATE positions SET status='CLOSED', closed_at=CURRENT_TIMESTAMP WHERE id=?",
(position_id,),
)
self.conn.commit()
def update_position_quantity(self, position_id, new_quantity):
self.conn.execute(
"UPDATE positions SET quantity=? WHERE id=?",
(new_quantity, position_id),
)
self.conn.commit()
def insert_portfolio_snapshot(self, total_value, cash, pnl, pnl_pct):
self.conn.execute(
"INSERT INTO portfolio (total_value, cash, pnl, pnl_pct) VALUES (?,?,?,?)",
(total_value, cash, pnl, pnl_pct),
)
self.conn.commit()
def get_portfolio_history(self, limit=100):
return self.conn.execute(
"SELECT * FROM portfolio ORDER BY timestamp DESC LIMIT ?", (limit,)
).fetchall()
def save_setting(self, key, value):
self.conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?,?)",
(key, value),
)
self.conn.commit()
def load_setting(self, key):
row = self.conn.execute(
"SELECT value FROM settings WHERE key=?", (key,)
).fetchone()
return row["value"] if row else None

73
data/news_client.py Normal file
View File

@@ -0,0 +1,73 @@
import httpx
import logging
from datetime import datetime, timedelta, timezone
logger = logging.getLogger(__name__)
class NewsClient:
CRYPTOPANIC_URL = "https://cryptopanic.com/api/free/v1/posts/"
NEWSAPI_URL = "https://newsapi.org/v2/everything"
def __init__(self, cryptopanic_key: str, newsapi_key: str = ""):
self.cryptopanic_key = cryptopanic_key
self.newsapi_key = newsapi_key
self._cache: list[dict] = []
self._cache_time: datetime | None = None
def fetch_cryptopanic(self) -> list[dict]:
try:
resp = httpx.get(
self.CRYPTOPANIC_URL,
params={"auth_token": self.cryptopanic_key, "filter": "hot", "public": "true"},
timeout=10,
)
resp.raise_for_status()
return self.parse_cryptopanic(resp.json())
except Exception as e:
logger.warning(f"CryptoPanic fetch failed: {e}")
return self._cache
def parse_cryptopanic(self, raw: dict) -> list[dict]:
articles = []
for item in raw.get("results", []):
currencies = item.get("currencies") or []
for cur in currencies:
articles.append({
"coin": cur.get("code", ""),
"title": item.get("title", ""),
"published_at": item.get("published_at", ""),
"kind": item.get("kind", "news"),
"sentiment_votes": item.get("votes", {}),
})
return articles
def fetch_newsapi(self, query: str = "cryptocurrency") -> list[dict]:
if not self.newsapi_key:
return []
try:
since = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y-%m-%d")
resp = httpx.get(
self.NEWSAPI_URL,
params={"q": query, "from": since, "sortBy": "publishedAt",
"apiKey": self.newsapi_key, "language": "en", "pageSize": 50},
timeout=10,
)
resp.raise_for_status()
return [
{"coin": "", "title": a["title"], "published_at": a["publishedAt"],
"kind": "news", "sentiment_votes": {}}
for a in resp.json().get("articles", [])
]
except Exception as e:
logger.warning(f"NewsAPI fetch failed: {e}")
return []
def fetch_all(self) -> list[dict]:
articles = self.fetch_cryptopanic() + self.fetch_newsapi()
self._cache = articles
self._cache_time = datetime.now(timezone.utc)
return articles
def filter_by_coin(self, articles: list[dict], coin_symbol: str) -> list[dict]:
symbol = coin_symbol.replace("USDT", "")
return [a for a in articles if a["coin"].upper() == symbol.upper()]

72
data/social_client.py Normal file
View File

@@ -0,0 +1,72 @@
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
POSITIVE_WORDS = {"moon", "bullish", "pump", "rally", "amazing", "great", "buy", "long", "up", "high", "profit", "gain", "surge", "breakout"}
NEGATIVE_WORDS = {"crash", "bearish", "dump", "sell", "short", "down", "low", "loss", "scam", "drop", "fear", "panic", "rekt"}
class SocialClient:
def __init__(self, reddit_client_id: str, reddit_secret: str, reddit_user_agent: str,
twitter_bearer: str = ""):
self.reddit_client_id = reddit_client_id
self.reddit_secret = reddit_secret
self.reddit_user_agent = reddit_user_agent
self.twitter_bearer = twitter_bearer
self._reddit = None
self._cache: list[dict] = []
def _get_reddit(self):
if self._reddit is None:
import praw
self._reddit = praw.Reddit(
client_id=self.reddit_client_id,
client_secret=self.reddit_secret,
user_agent=self.reddit_user_agent,
)
return self._reddit
def fetch_reddit(self, subreddits=("cryptocurrency", "binance", "CryptoMarkets"), limit=50) -> list[dict]:
try:
reddit = self._get_reddit()
posts = []
for sub_name in subreddits:
sub = reddit.subreddit(sub_name)
for post in sub.hot(limit=limit):
posts.append({
"title": post.title,
"score": post.score,
"num_comments": post.num_comments,
"created_utc": post.created_utc,
"subreddit": sub_name,
})
self._cache = posts
return posts
except Exception as e:
logger.warning(f"Reddit fetch failed: {e}")
return self._cache
def filter_posts_by_coin(self, posts: list[dict], coin_symbol: str) -> list[dict]:
symbol = coin_symbol.replace("USDT", "").upper()
name_map = {"BTC": ["bitcoin", "btc"], "ETH": ["ethereum", "eth"],
"SOL": ["solana", "sol"], "BNB": ["bnb", "binance coin"],
"XRP": ["xrp", "ripple"], "DOGE": ["doge", "dogecoin"],
"ADA": ["ada", "cardano"], "AVAX": ["avax", "avalanche"]}
keywords = name_map.get(symbol, [symbol.lower()])
return [p for p in posts if any(kw in p["title"].lower() for kw in keywords)]
def simple_sentiment(self, posts: list[dict]) -> dict:
positive = 0
negative = 0
neutral = 0
for p in posts:
words = set(p["title"].lower().split())
pos_count = len(words & POSITIVE_WORDS)
neg_count = len(words & NEGATIVE_WORDS)
if pos_count > neg_count:
positive += 1
elif neg_count > pos_count:
negative += 1
else:
neutral += 1
return {"positive": positive, "negative": negative, "neutral": neutral, "total": len(posts)}

51
db/schema.sql Normal file
View File

@@ -0,0 +1,51 @@
CREATE TABLE IF NOT EXISTS signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
technical_score REAL DEFAULT 50,
news_score REAL DEFAULT 50,
social_score REAL DEFAULT 50,
ai_score REAL DEFAULT 50,
composite_score REAL DEFAULT 50,
signal TEXT DEFAULT 'HOLD'
);
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin TEXT NOT NULL,
side TEXT NOT NULL,
price REAL NOT NULL,
quantity REAL NOT NULL,
amount_usd REAL NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
reason TEXT
);
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
coin TEXT NOT NULL,
entry_price REAL NOT NULL,
quantity REAL NOT NULL,
invested_usd REAL NOT NULL,
status TEXT DEFAULT 'OPEN',
opened_at DATETIME DEFAULT CURRENT_TIMESTAMP,
closed_at DATETIME
);
CREATE TABLE IF NOT EXISTS portfolio (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
total_value REAL NOT NULL,
cash REAL NOT NULL,
pnl REAL NOT NULL,
pnl_pct REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_signals_coin_ts ON signals(coin, timestamp);
CREATE INDEX IF NOT EXISTS idx_positions_status ON positions(status);
CREATE INDEX IF NOT EXISTS idx_trades_coin ON trades(coin);

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,320 @@
# Crypto News Signal Dashboard — Design Spec
## Overview
Binance 현물(Spot) 코인을 대상으로 뉴스, 소셜미디어, 기술적 분석, AI 분석을 종합하여 매수/매도 시그널을 생성하고, 가상 $200 포트폴리오로 시뮬레이션하는 대시보드.
## Tech Stack
- **App**: Streamlit (단일 프로세스, Sidebar + Detail 레이아웃)
- **Scheduler**: APScheduler (in-process background thread)
- **Storage**: SQLite WAL mode (시그널 이력, 포트폴리오, 거래 기록) + JSON cache (API 응답)
- **Language**: Python 3.11+
## Application Topology
Streamlit이 메인 프로세스로 동작하며, FastAPI 없이 Python 모듈을 직접 import하여 사용한다.
```
[Streamlit Process (main)]
├── imports agents/* (직접 호출)
├── imports engine/* (직접 호출)
├── imports data/* (직접 호출)
├── APScheduler (in-process, background thread)
│ └── 15분마다 agents 실행 → engine 업데이트
└── Binance WebSocket (background thread)
└── 실시간 가격 업데이트 → st.session_state
```
- **단일 프로세스**: Streamlit이 모든 모듈을 직접 import. 별도 FastAPI 서버 불필요
- **Scheduler**: APScheduler가 Streamlit 프로세스 내 background thread로 동작
- **WebSocket**: python-binance의 ThreadedWebsocketManager로 background에서 가격 수신
- **Storage**: SQLite WAL 모드 (concurrent read 허용, single-writer)
## Architecture
### Data Sources
| Source | API | Purpose | Update Cycle |
|--------|-----|---------|-------------|
| Binance | REST + WebSocket | 가격, 거래량, OHLCV, 시가총액 순위 | 실시간 (WebSocket) |
| CryptoPanic / NewsAPI | REST | 크립토 뉴스 헤드라인 | 15분 |
| Twitter/X API | REST | 코인별 언급량, 감성 | 15분 |
| Reddit API | REST | r/cryptocurrency 등 감성 분석 | 15분 |
| Claude API | REST | 시장 상황 종합 분석, 뉴스 영향도 판단 | 15분 |
### Analysis Agents
4개의 독립적인 분석 에이전트가 각 코인에 대해 0~100 점수를 산출한다.
#### 1. Technical Agent (기본 가중치: 60%)
- **Indicators**: RSI(14), MACD(12,26,9), Bollinger Bands(20,2), 이동평균(SMA 20/50/200)
- **Volume Analysis**: 거래량 급증 감지, OBV(On-Balance Volume)
- **Candlestick Patterns**: Engulfing, Doji, Hammer 등 주요 패턴
- **Input**: Binance OHLCV 데이터 (1h, 4h, 1d 캔들)
- **Output**: 0~100 기술적 점수
#### 2. News Agent (기본 가중치: 20%)
- **Sources**: CryptoPanic API, NewsAPI
- **Analysis**: 헤드라인 감성 분석 (positive/negative/neutral)
- **Scoring**: 최근 24시간 뉴스의 감성 비율 + 뉴스 영향도(중요도) 가중
- **Output**: 0~100 뉴스 감성 점수
#### 3. Social Agent (기본 가중치: 10%)
- **Sources**: Twitter/X API, Reddit API (r/cryptocurrency, r/binance)
- **Metrics**: 코인별 언급량 추세, 감성 분석, 인플루언서 언급 가중치
- **Analysis**: 24시간 언급량 변화율 + 감성 비율
- **Output**: 0~100 소셜 감성 점수
#### 4. AI Agent (기본 가중치: 10%)
- **Provider**: Claude API (Anthropic)
- **Input**: 최근 뉴스 요약, 기술적 지표 요약, 소셜 감성 요약, 시장 전반 상황
- **Task**: 종합적 시장 컨텍스트 판단, 각 코인의 단기 전망 평가
- **Output**: 0~100 AI 종합 점수 + 텍스트 분석 요약
### Signal Engine
#### Score Aggregator
```
composite_score = (technical * w1) + (news * w2) + (social * w3) + (ai * w4)
where w1 + w2 + w3 + w4 = 1.0
defaults: w1=0.6, w2=0.2, w3=0.1, w4=0.1
```
#### Signal Classification
- **BUY**: composite_score >= 70
- **HOLD**: 40 <= composite_score < 70
- **SELL**: composite_score < 40
#### Surge Detector
- 바이낸스 전체 코인 중 거래량이 24h 평균 대비 300% 이상 급증한 코인을 자동 감지
- 감지된 코인을 모니터링 목록에 자동 추가
- Top 50에 없더라도 급등/급락 코인 포착 가능
### Coin Scope
- **Base**: 시가총액 Top 50 (USDT 페어 기준)
- **Auto-add**: 거래량 급증 코인 (Surge Detector)
- **User pin**: 사용자가 수동으로 추가한 코인
- **Pair**: 모든 코인은 USDT 현물 페어 기준
## Portfolio Simulator
### Rules
- **초기 자본**: $200 (가상)
- **거래 방식**: Binance 현물(Spot)만 사용. 레버리지/마진/선물/공매도 없음
- **최대 동시 포지션**: 5개
### Entry (매수)
- Signal score >= 70인 코인 중 점수가 높은 순서대로 매수
- Position size: 가용 현금의 15~30% (점수가 높을수록 큰 비중)
- Score 70~79: 15% of cash
- Score 80~89: 20% of cash
- Score 90~100: 30% of cash
- 가용 현금이 없으면 매수하지 않음
- **최소 포지션**: $15 미만이면 매수하지 않음 (바이낸스 최소 주문 규모 고려)
### Exit (매도)
- **Signal exit**: 보유 코인의 score가 39 이하로 하락 시 전량 매도
- **Stop-loss**: 진입가 대비 -8% 하락 시 자동 손절
- **Take-profit 1차**: 진입가 대비 +15% 상승 시 50% 부분 익절
- **Take-profit 2차**: 진입가 대비 +25% 상승 시 나머지 전량 익절
### Reinvestment (재투자)
- 매도 후 현금은 가용 풀로 복귀
- 다음 15분 분석 주기에 새로운 BUY 시그널 탐색
- 가장 높은 점수의 코인부터 우선 배분
### P&L Tracking
- **Per-trade P&L**: 각 거래의 실현 손익 (진입가 vs 퇴장가)
- **Per-position P&L**: 보유 중인 포지션의 미실현 손익 (진입가 vs 현재가)
- **Total portfolio P&L**: 초기 자본 대비 총 수익률
- **Win rate**: 수익 거래 / 전체 거래
- **Average gain/loss**: 평균 수익 / 평균 손실
## Dashboard (Streamlit)
### Layout: Sidebar + Detail
#### Sidebar (Left)
- 코인 목록 + 종합 점수
- 색상 코딩: BUY(green), HOLD(yellow), SELL(red)
- 코인 클릭 시 우측 Detail 영역 변경
- 정렬: 점수 높은 순 (BUY 순위)
- 하단: 가중치 슬라이더 (사용자 커스텀)
#### Detail Area (Right)
선택한 코인의 상세 정보를 4개 패널로 표시:
1. **Chart Panel**: 캔들스틱 차트 + RSI/MACD/BB 오버레이. plotly 사용
2. **News Panel**: 최근 뉴스 헤드라인 목록 + 감성 태그
3. **Social Panel**: Twitter/Reddit 언급량 추세 차트 + 주요 글
4. **AI Panel**: Claude 분석 요약 텍스트 + 시그널 근거
#### Portfolio Tab
- Summary bar: 초기자본, 현재가치, 총 P&L, 승률, 가용 현금
- Current Holdings: 보유 포지션 테이블 (코인, 배분금액, 수량, 평균진입가, 현재가, P&L)
- Trade History: 청산 거래 이력 (날짜, 코인, 진입/퇴장가, 사이즈, P&L, 사유)
- Allocation Chart: 원형 차트 (코인별 비중 + 현금)
## Update Cycle
| Component | Cycle | Method |
|-----------|-------|--------|
| Price data | Realtime | Binance WebSocket |
| Technical indicators | On price update | Calculated from OHLCV |
| News analysis | Every 15 min | APScheduler → CryptoPanic/NewsAPI |
| Social analysis | Every 15 min | APScheduler → Twitter/Reddit API |
| AI analysis | Every 15 min | APScheduler → Claude API |
| Signal scoring | Every 15 min | After all agents complete |
| Portfolio rebalance | Every 15 min | After signal scoring |
## Project Structure
```
crypto_news_trading/
├── run.py # Streamlit entry point (streamlit run run.py)
├── config.py # API keys, settings, defaults
├── requirements.txt
├── .env # API keys (gitignored)
├── agents/
│ ├── __init__.py
│ ├── technical.py # Technical analysis agent
│ ├── news.py # News sentiment agent
│ ├── social.py # Social media agent
│ └── ai_analyst.py # Claude AI agent
├── engine/
│ ├── __init__.py
│ ├── signal.py # Score aggregator + signal classifier
│ ├── surge.py # Volume surge detector
│ └── portfolio.py # Portfolio simulator + P&L tracker
├── data/
│ ├── __init__.py
│ ├── binance_ws.py # Binance WebSocket client
│ ├── binance_rest.py # Binance REST client
│ ├── news_client.py # CryptoPanic / NewsAPI client
│ ├── social_client.py # Twitter / Reddit client
│ └── db.py # SQLite DB manager
├── dashboard/
│ ├── app.py # Streamlit main app
│ ├── sidebar.py # Coin list sidebar
│ ├── detail.py # Detail panels (chart, news, social, ai)
│ └── portfolio_view.py # Portfolio dashboard
├── scheduler/
│ └── jobs.py # APScheduler job definitions
├── logs/ # Log files (gitignored)
└── db/
└── schema.sql # SQLite schema
```
## Database Schema (SQLite)
### signals
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER PK | Auto increment |
| coin | TEXT | Symbol (e.g., BTCUSDT) |
| timestamp | DATETIME | Signal time |
| technical_score | REAL | 0~100 |
| news_score | REAL | 0~100 |
| social_score | REAL | 0~100 |
| ai_score | REAL | 0~100 |
| composite_score | REAL | Weighted sum |
| signal | TEXT | BUY / HOLD / SELL |
### trades
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER PK | Auto increment |
| coin | TEXT | Symbol |
| side | TEXT | BUY / SELL |
| price | REAL | Execution price |
| quantity | REAL | Coin quantity |
| amount_usd | REAL | USD value |
| timestamp | DATETIME | Trade time |
| reason | TEXT | Signal/stop-loss/take-profit |
### positions
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER PK | Auto increment |
| coin | TEXT | Symbol |
| entry_price | REAL | Average entry price |
| quantity | REAL | Current quantity |
| invested_usd | REAL | Total USD invested |
| status | TEXT | OPEN / CLOSED |
| opened_at | DATETIME | Position open time |
| closed_at | DATETIME | Position close time (nullable) |
### portfolio
| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER PK | Auto increment |
| timestamp | DATETIME | Snapshot time |
| total_value | REAL | Portfolio total value |
| cash | REAL | Available cash |
| pnl | REAL | Total P&L |
| pnl_pct | REAL | P&L percentage |
## Failure Handling
| Failure | Recovery |
|---------|----------|
| Binance WebSocket 끊김 | 자동 reconnect (python-binance 내장), 3회 실패 시 REST fallback |
| CryptoPanic/NewsAPI 실패 | 캐시된 마지막 결과 사용, 뉴스 점수를 50 (neutral)로 설정 |
| Twitter API 미사용/실패 | Reddit만으로 소셜 점수 산출 |
| Reddit API 실패 | 소셜 점수를 50 (neutral)로 설정 |
| Claude API 타임아웃 | AI 점수를 50으로 설정, 이전 분석 텍스트 유지 |
| Agent가 점수 미산출 | 해당 에이전트 점수 50, 나머지 에이전트로 가중 재배분 |
## Rate Limit & Cost
### API Call Budget (per 15-min cycle)
- **Binance REST**: ~5 calls (Top 50 목록 + surge scan). WebSocket은 별도
- **CryptoPanic**: ~1 call (전체 뉴스 조회 후 코인별 필터링)
- **Reddit**: ~3 calls (subreddit별 조회)
- **Twitter**: ~5 calls (있을 경우)
- **Claude API**: ~2-3 calls (코인을 배치로 묶어 분석. Top 10은 매 주기, 나머지는 1시간 간격)
### Claude API 예상 비용
- 배치 분석: ~2,000 input tokens + ~500 output tokens per call
- 하루 ~150 calls → 월 ~$5-15 (Sonnet 기준)
## Logging
- Python `logging` 모듈, `RotatingFileHandler` (10MB, 5 backups)
- 로그 레벨: 시그널 결정(INFO), 거래 실행(INFO), API 에러(WARNING), 시스템 에러(ERROR)
- 로그 파일: `logs/app.log`
## Data Retention
- signals 테이블: 90일 보관, 이후 일별 요약으로 집계 후 삭제
- trades/positions: 영구 보관
- portfolio 스냅샷: 90일 보관
## Database Schema Additions
### settings
| Column | Type | Description |
|--------|------|-------------|
| key | TEXT PK | Setting name |
| value | TEXT | JSON-encoded value |
사용자 가중치, surge 임계값, 포트폴리오 설정 등을 저장.
## API Keys Required
| Service | Env Variable | Required |
|---------|-------------|----------|
| Binance | BINANCE_API_KEY, BINANCE_SECRET | Yes |
| CryptoPanic | CRYPTOPANIC_API_KEY | Yes |
| NewsAPI | NEWS_API_KEY | Optional (fallback) |
| Twitter/X | TWITTER_BEARER_TOKEN | Optional (유료 API, 없으면 Reddit만 사용) |
| Reddit | REDDIT_CLIENT_ID, REDDIT_SECRET | Yes |
| Anthropic (Claude) | ANTHROPIC_API_KEY | Yes |

0
engine/__init__.py Normal file
View File

114
engine/portfolio.py Normal file
View File

@@ -0,0 +1,114 @@
import logging
from datetime import datetime, timezone
from config import MAX_POSITIONS, MIN_POSITION_USD, STOP_LOSS_PCT, TAKE_PROFIT_1_PCT, TAKE_PROFIT_2_PCT
logger = logging.getLogger(__name__)
class PortfolioManager:
def __init__(self, initial_capital: float = 200.0):
self.initial_capital = initial_capital
self.cash = initial_capital
self.positions: dict[str, dict] = {}
self.trades: list[dict] = []
def buy(self, symbol: str, price: float, score: float) -> bool:
if symbol in self.positions:
return False
if len(self.positions) >= MAX_POSITIONS:
return False
amount = self._position_size(score)
if amount < MIN_POSITION_USD:
return False
if amount > self.cash:
amount = self.cash
if amount < MIN_POSITION_USD:
return False
quantity = amount / price
self.cash -= amount
self.positions[symbol] = {
"entry_price": price, "quantity": quantity,
"invested_usd": amount, "tp1_hit": False,
"opened_at": datetime.now(timezone.utc).isoformat(),
}
self.trades.append({
"coin": symbol, "side": "BUY", "price": price,
"quantity": quantity, "amount_usd": amount,
"timestamp": datetime.now(timezone.utc).isoformat(), "reason": "signal",
})
return True
def sell(self, symbol: str, price: float, reason: str = "signal", partial: float = 1.0):
if symbol not in self.positions:
return
pos = self.positions[symbol]
sell_qty = pos["quantity"] * partial
sell_usd = sell_qty * price
self.cash += sell_usd
self.trades.append({
"coin": symbol, "side": "SELL", "price": price,
"quantity": sell_qty, "amount_usd": sell_usd,
"timestamp": datetime.now(timezone.utc).isoformat(), "reason": reason,
})
if partial >= 1.0:
del self.positions[symbol]
else:
pos["quantity"] -= sell_qty
def check_exit(self, symbol: str, current_price: float):
if symbol not in self.positions:
return
pos = self.positions[symbol]
entry = pos["entry_price"]
change_pct = (current_price - entry) / entry
if change_pct <= STOP_LOSS_PCT:
self.sell(symbol, current_price, reason="stop-loss")
return
if change_pct >= TAKE_PROFIT_2_PCT:
self.sell(symbol, current_price, reason="take-profit-2")
return
if change_pct >= TAKE_PROFIT_1_PCT and not pos["tp1_hit"]:
pos["tp1_hit"] = True
self.sell(symbol, current_price, reason="take-profit-1", partial=0.5)
def _position_size(self, score: float) -> float:
if score >= 90:
pct = 0.30
elif score >= 80:
pct = 0.20
else:
pct = 0.15
return round(self.cash * pct, 2)
def get_portfolio_value(self, current_prices: dict[str, float]) -> dict:
holdings_value = sum(
pos["quantity"] * current_prices.get(sym, pos["entry_price"])
for sym, pos in self.positions.items()
)
total_value = self.cash + holdings_value
total_pnl = total_value - self.initial_capital
pnl_pct = (total_pnl / self.initial_capital) * 100
winning = sum(1 for t in self.trades if t["side"] == "SELL" and self._trade_pnl(t) > 0)
total_sells = sum(1 for t in self.trades if t["side"] == "SELL")
win_rate = (winning / total_sells * 100) if total_sells > 0 else 0
return {
"total_value": round(total_value, 2),
"cash": round(self.cash, 2),
"holdings_value": round(holdings_value, 2),
"total_pnl": round(total_pnl, 2),
"pnl_pct": round(pnl_pct, 2),
"win_rate": round(win_rate, 1),
"open_positions": len(self.positions),
}
def _trade_pnl(self, sell_trade: dict) -> float:
matching_buys = [
t for t in self.trades
if t["coin"] == sell_trade["coin"] and t["side"] == "BUY"
and t["timestamp"] <= sell_trade["timestamp"]
]
if matching_buys:
latest_buy = matching_buys[-1]
return (sell_trade["price"] - latest_buy["price"]) * sell_trade["quantity"]
return 0

45
engine/signal.py Normal file
View File

@@ -0,0 +1,45 @@
from config import DEFAULT_WEIGHTS
class SignalEngine:
def __init__(self):
self.weights = dict(DEFAULT_WEIGHTS)
def set_weights(self, weights: dict):
total = sum(weights.values())
if abs(total - 1.0) > 0.01:
raise ValueError(f"Weights must sum to 1.0, got {total}")
self.weights = weights
def compute_score(self, technical: float, news: float, social: float, ai: float) -> float:
score = (
technical * self.weights["technical"]
+ news * self.weights["news"]
+ social * self.weights["social"]
+ ai * self.weights["ai"]
)
return round(score, 1)
def classify(self, score: float) -> str:
if score >= 70:
return "BUY"
elif score >= 40:
return "HOLD"
return "SELL"
def rank_coins(self, coins: dict[str, dict]) -> list[dict]:
results = []
for symbol, scores in coins.items():
composite = self.compute_score(
scores["technical"], scores["news"], scores["social"], scores["ai"]
)
results.append({
"symbol": symbol,
"technical": scores["technical"],
"news": scores["news"],
"social": scores["social"],
"ai": scores["ai"],
"composite": composite,
"signal": self.classify(composite),
})
results.sort(key=lambda x: x["composite"], reverse=True)
return results

20
engine/surge.py Normal file
View File

@@ -0,0 +1,20 @@
import logging
logger = logging.getLogger(__name__)
class SurgeDetector:
def __init__(self, multiplier: float = 3.0):
self.multiplier = multiplier
def detect(self, tickers: list[dict], avg_volumes: dict[str, float]) -> list[str]:
surged = []
for t in tickers:
symbol = t["symbol"]
if not symbol.endswith("USDT"):
continue
current_vol = float(t.get("quoteVolume", 0))
avg_vol = avg_volumes.get(symbol, 0)
if avg_vol > 0 and current_vol >= avg_vol * self.multiplier:
logger.info(f"Surge detected: {symbol} volume {current_vol:.0f} vs avg {avg_vol:.0f}")
surged.append(symbol)
return surged

11
requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
streamlit>=1.30.0
python-binance>=1.0.19
pandas>=2.1.0
ta>=0.11.0
plotly>=5.18.0
anthropic>=0.40.0
praw>=7.7.0
httpx>=0.27.0
apscheduler>=3.10.0
python-dotenv>=1.0.0
pytest>=8.0.0

161
run.py Normal file
View File

@@ -0,0 +1,161 @@
import streamlit as st
import logging
from logging.handlers import RotatingFileHandler
import os
import json
from config import (
BINANCE_API_KEY, BINANCE_SECRET, CRYPTOPANIC_API_KEY, NEWS_API_KEY,
TWITTER_BEARER_TOKEN, REDDIT_CLIENT_ID, REDDIT_SECRET, REDDIT_USER_AGENT,
ANTHROPIC_API_KEY, DB_PATH, LOG_DIR, TOP_N_COINS, INITIAL_CAPITAL,
ANALYSIS_INTERVAL_MINUTES, DEFAULT_WEIGHTS, SURGE_VOLUME_MULTIPLIER,
)
from data.db import Database
from data.binance_rest import BinanceRestClient
from data.binance_ws import BinanceWSClient
from data.news_client import NewsClient
from data.social_client import SocialClient
from agents.ai_analyst import AIAgent
from engine.signal import SignalEngine
from engine.surge import SurgeDetector
from engine.portfolio import PortfolioManager
from scheduler.jobs import AnalysisJob, create_scheduler
from dashboard.sidebar import render_sidebar
from dashboard.detail import render_detail
from dashboard.portfolio_view import render_portfolio
os.makedirs(LOG_DIR, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
RotatingFileHandler(os.path.join(LOG_DIR, "app.log"), maxBytes=10_000_000, backupCount=5),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
st.set_page_config(page_title="Crypto Signal Dashboard", layout="wide", page_icon="📊")
if "initialized" not in st.session_state:
st.session_state.initialized = False
st.session_state.selected_coin = None
@st.cache_resource
def init_services():
db = Database(DB_PATH)
db.init()
binance_rest = BinanceRestClient(BINANCE_API_KEY, BINANCE_SECRET)
binance_ws = BinanceWSClient(BINANCE_API_KEY, BINANCE_SECRET)
news_client = NewsClient(CRYPTOPANIC_API_KEY, NEWS_API_KEY)
social_client = SocialClient(REDDIT_CLIENT_ID, REDDIT_SECRET, REDDIT_USER_AGENT, TWITTER_BEARER_TOKEN)
ai_agent = AIAgent(ANTHROPIC_API_KEY)
signal_engine = SignalEngine()
surge_detector = SurgeDetector(SURGE_VOLUME_MULTIPLIER)
portfolio = PortfolioManager(INITIAL_CAPITAL)
saved_weights = db.load_setting("weights")
if saved_weights:
try:
signal_engine.set_weights(json.loads(saved_weights))
except (json.JSONDecodeError, ValueError):
pass
try:
monitored = binance_rest.get_top_coins(TOP_N_COINS)
except Exception:
monitored = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "XRPUSDT"]
logger.warning("Failed to fetch top coins, using defaults")
job = AnalysisJob(
binance=binance_rest, news_client=news_client,
social_client=social_client, ai_agent=ai_agent,
signal_engine=signal_engine, surge_detector=surge_detector,
portfolio=portfolio, db=db, monitored_coins=monitored,
)
scheduler = create_scheduler(job, ANALYSIS_INTERVAL_MINUTES)
scheduler.start()
try:
binance_ws.start(monitored[:20])
except Exception as e:
logger.warning(f"WebSocket start failed: {e}")
job.run_analysis()
return {
"db": db, "binance_rest": binance_rest, "binance_ws": binance_ws,
"news_client": news_client, "social_client": social_client,
"signal_engine": signal_engine, "portfolio": portfolio,
"job": job, "monitored": monitored,
}
def main():
services = init_services()
db = services["db"]
job = services["job"]
portfolio = services["portfolio"]
binance_rest = services["binance_rest"]
binance_ws = services["binance_ws"]
news_client = services["news_client"]
social_client = services["social_client"]
current_prices = binance_ws.get_all_prices()
if not current_prices:
try:
current_prices = binance_rest.get_all_prices()
except Exception:
current_prices = {}
page = render_sidebar(job.latest_results, db)
if page == "Signals":
selected = st.session_state.get("selected_coin")
if selected and selected in job.latest_results:
coin_data = job.latest_results[selected]
try:
ohlcv = binance_rest.get_ohlcv(selected, interval="1h", limit=100)
except Exception:
ohlcv = None
coin_news = news_client.filter_by_coin(news_client._cache, selected)
coin_posts = social_client.filter_posts_by_coin(social_client._cache, selected)
sentiment = social_client.simple_sentiment(coin_posts)
ai_summary = job.ai_summaries.get(selected, "AI analysis not yet available.")
render_detail(selected, coin_data, ohlcv, coin_news, sentiment, ai_summary)
else:
st.title("Crypto Signal Dashboard")
st.info("Select a coin from the sidebar to view detailed analysis.")
if job.latest_results:
_render_overview(job.latest_results)
elif page == "Portfolio":
render_portfolio(portfolio, current_prices, db)
def _render_overview(results: dict):
import pandas as pd
st.subheader("Signal Overview")
buy_coins = [r for r in results.values() if r["signal"] == "BUY"]
sell_coins = [r for r in results.values() if r["signal"] == "SELL"]
col1, col2 = st.columns(2)
with col1:
st.markdown("### :green[BUY Signals]")
if buy_coins:
rows = [{"Coin": c["symbol"].replace("USDT",""), "Score": f"{c['composite']:.0f}",
"Tech": f"{c['technical']:.0f}", "News": f"{c['news']:.0f}"}
for c in sorted(buy_coins, key=lambda x: x["composite"], reverse=True)]
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
else:
st.info("No BUY signals currently")
with col2:
st.markdown("### :red[SELL Signals]")
if sell_coins:
rows = [{"Coin": c["symbol"].replace("USDT",""), "Score": f"{c['composite']:.0f}",
"Tech": f"{c['technical']:.0f}", "News": f"{c['news']:.0f}"}
for c in sorted(sell_coins, key=lambda x: x["composite"])]
st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True)
else:
st.info("No SELL signals currently")
if __name__ == "__main__":
main()

0
scheduler/__init__.py Normal file
View File

151
scheduler/jobs.py Normal file
View File

@@ -0,0 +1,151 @@
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from data.binance_rest import BinanceRestClient
from data.news_client import NewsClient
from data.social_client import SocialClient
from agents.technical import TechnicalAgent
from agents.news import NewsAgent
from agents.social import SocialAgent
from agents.ai_analyst import AIAgent
from engine.signal import SignalEngine
from engine.surge import SurgeDetector
from engine.portfolio import PortfolioManager
from data.db import Database
logger = logging.getLogger(__name__)
class AnalysisJob:
def __init__(self, binance: BinanceRestClient, news_client: NewsClient,
social_client: SocialClient, ai_agent: AIAgent,
signal_engine: SignalEngine, surge_detector: SurgeDetector,
portfolio: PortfolioManager, db: Database,
monitored_coins: list[str]):
self.binance = binance
self.news_client = news_client
self.social_client = social_client
self.ai_agent = ai_agent
self.tech_agent = TechnicalAgent()
self.news_agent = NewsAgent()
self.social_agent = SocialAgent()
self.signal_engine = signal_engine
self.surge_detector = surge_detector
self.portfolio = portfolio
self.db = db
self.monitored_coins = monitored_coins
self.latest_results: dict = {}
self.ai_summaries: dict[str, str] = {}
def run_analysis(self):
logger.info("Starting analysis cycle...")
try:
# 0. Surge detection
try:
all_tickers = self.binance.client.get_ticker()
usdt_tickers = [t for t in all_tickers if t["symbol"].endswith("USDT")]
avg_volumes = {t["symbol"]: float(t["quoteVolume"]) * 0.7 for t in usdt_tickers}
surged = self.surge_detector.detect(usdt_tickers, avg_volumes)
for s in surged:
if s not in self.monitored_coins:
self.monitored_coins.append(s)
logger.info(f"Surge-added: {s}")
except Exception as e:
logger.warning(f"Surge detection failed: {e}")
# 1. Fetch data
all_news = self.news_client.fetch_all()
all_social = self.social_client.fetch_reddit()
prices = self.binance.get_all_prices()
coins_scores = {}
coins_for_ai = []
for symbol in self.monitored_coins:
try:
# Technical (multi-timeframe)
df_1h = self.binance.get_ohlcv(symbol, interval="1h", limit=100)
score_1h = self.tech_agent.analyze(df_1h)
try:
df_4h = self.binance.get_ohlcv(symbol, interval="4h", limit=100)
score_4h = self.tech_agent.analyze(df_4h)
df_1d = self.binance.get_ohlcv(symbol, interval="1d", limit=200)
score_1d = self.tech_agent.analyze(df_1d)
tech_score = score_1h * 0.5 + score_4h * 0.3 + score_1d * 0.2
except Exception:
tech_score = score_1h
# News
coin_news = self.news_client.filter_by_coin(all_news, symbol)
news_score = self.news_agent.analyze(coin_news)
# Social
coin_posts = self.social_client.filter_posts_by_coin(all_social, symbol)
sentiment = self.social_client.simple_sentiment(coin_posts)
social_score = self.social_agent.analyze(sentiment, mention_trend=1.0)
coins_scores[symbol] = {
"technical": tech_score,
"news": news_score,
"social": social_score,
"ai": 50,
}
coins_for_ai.append({
"symbol": symbol.replace("USDT", ""),
"price": prices.get(symbol, 0),
"change_pct": 0,
"technical_score": tech_score,
"news_score": news_score,
"social_score": social_score,
"headlines": [a["title"] for a in coin_news[:3]],
})
except Exception as e:
logger.warning(f"Analysis failed for {symbol}: {e}")
coins_scores[symbol] = {"technical": 50, "news": 50, "social": 50, "ai": 50}
# 2. AI batch analysis
if coins_for_ai:
ai_results = self.ai_agent.analyze_batch(coins_for_ai)
for symbol in coins_scores:
short = symbol.replace("USDT", "")
if short in ai_results:
coins_scores[symbol]["ai"] = ai_results[short].get("score", 50)
self.ai_summaries[symbol] = ai_results[short].get("summary", "")
# 3. Compute signals
ranked = self.signal_engine.rank_coins(coins_scores)
self.latest_results = {r["symbol"]: r for r in ranked}
# 4. Store signals
for r in ranked:
self.db.insert_signal(
r["symbol"], r["technical"], r["news"], r["social"], r["ai"],
r["composite"], r["signal"],
)
# 5. Portfolio actions
for r in ranked:
symbol = r["symbol"]
price = prices.get(symbol, 0)
if price <= 0:
continue
self.portfolio.check_exit(symbol, price)
if r["signal"] == "BUY" and symbol not in self.portfolio.positions:
self.portfolio.buy(symbol, price, r["composite"])
elif r["signal"] == "SELL" and symbol in self.portfolio.positions:
self.portfolio.sell(symbol, price, reason="signal-sell")
# 6. Save portfolio snapshot
pv = self.portfolio.get_portfolio_value(prices)
self.db.insert_portfolio_snapshot(
pv["total_value"], pv["cash"], pv["total_pnl"], pv["pnl_pct"]
)
logger.info(f"Analysis complete. {len(ranked)} coins scored. Portfolio: ${pv['total_value']:.2f}")
except Exception as e:
logger.error(f"Analysis cycle failed: {e}")
def create_scheduler(job: AnalysisJob, interval_minutes: int = 15) -> BackgroundScheduler:
scheduler = BackgroundScheduler()
scheduler.add_job(job.run_analysis, "interval", minutes=interval_minutes, id="analysis")
return scheduler

0
tests/__init__.py Normal file
View File

3
tests/conftest.py Normal file
View File

@@ -0,0 +1,3 @@
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))

View File

@@ -0,0 +1,42 @@
import pytest
from unittest.mock import patch, MagicMock
from data.binance_rest import BinanceRestClient
@pytest.fixture
def client():
with patch("data.binance_rest.Client") as mock_client_cls:
mock_client_cls.return_value = MagicMock()
c = BinanceRestClient(api_key="test", api_secret="test")
return c
def test_get_top_coins_returns_symbols(client):
mock_tickers = [
{"symbol": "BTCUSDT", "quoteVolume": "1000000"},
{"symbol": "ETHUSDT", "quoteVolume": "500000"},
{"symbol": "BTCETH", "quoteVolume": "200000"},
]
with patch.object(client.client, "get_ticker", return_value=mock_tickers):
result = client.get_top_coins(limit=2)
assert "BTCUSDT" in result
assert "ETHUSDT" in result
assert "BTCETH" not in result
def test_get_ohlcv_returns_dataframe(client):
mock_klines = [
[1700000000000, "40000", "41000", "39000", "40500", "100",
1700003599999, "4050000", 500, "50", "2025000", "0"]
]
with patch.object(client.client, "get_klines", return_value=mock_klines):
df = client.get_ohlcv("BTCUSDT", interval="1h", limit=1)
assert len(df) == 1
assert "close" in df.columns
assert df.iloc[0]["close"] == 40500.0
def test_get_all_tickers(client):
mock_prices = [
{"symbol": "BTCUSDT", "price": "40000.00"},
{"symbol": "ETHUSDT", "price": "3500.00"},
]
with patch.object(client.client, "get_all_tickers", return_value=mock_prices):
result = client.get_all_prices()
assert result["BTCUSDT"] == 40000.0

51
tests/test_db.py Normal file
View File

@@ -0,0 +1,51 @@
import os
import pytest
from data.db import Database
@pytest.fixture
def db(tmp_path):
db_path = str(tmp_path / "test.db")
database = Database(db_path)
database.init()
yield database
database.close()
def test_init_creates_tables(db):
tables = db.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
names = {r[0] for r in tables}
assert "signals" in names
assert "trades" in names
assert "positions" in names
assert "portfolio" in names
assert "settings" in names
def test_insert_signal(db):
db.insert_signal("BTCUSDT", 75.0, 60.0, 55.0, 70.0, 68.5, "HOLD")
rows = db.get_latest_signals()
assert len(rows) == 1
assert rows[0]["coin"] == "BTCUSDT"
assert rows[0]["composite_score"] == 68.5
def test_insert_trade(db):
db.insert_trade("ETHUSDT", "BUY", 3500.0, 0.01, 35.0, "signal")
trades = db.get_trades()
assert len(trades) == 1
assert trades[0]["coin"] == "ETHUSDT"
def test_open_close_position(db):
db.open_position("SOLUSDT", 140.0, 0.5, 70.0)
positions = db.get_open_positions()
assert len(positions) == 1
db.close_position(positions[0]["id"])
assert len(db.get_open_positions()) == 0
def test_save_load_setting(db):
db.save_setting("weights", '{"technical": 0.7}')
val = db.load_setting("weights")
assert val == '{"technical": 0.7}'
def test_portfolio_snapshot(db):
db.insert_portfolio_snapshot(210.0, 50.0, 10.0, 5.0)
snaps = db.get_portfolio_history()
assert len(snaps) == 1
assert snaps[0]["total_value"] == 210.0

31
tests/test_news_agent.py Normal file
View File

@@ -0,0 +1,31 @@
import pytest
from agents.news import NewsAgent
def test_score_positive_news():
agent = NewsAgent()
articles = [
{"title": "Bitcoin surges 10%", "sentiment_votes": {"positive": 10, "negative": 1}},
{"title": "BTC adoption grows", "sentiment_votes": {"positive": 8, "negative": 2}},
{"title": "Bitcoin rally continues", "sentiment_votes": {"positive": 15, "negative": 0}},
]
score = agent.analyze(articles)
assert score >= 70
def test_score_negative_news():
agent = NewsAgent()
articles = [
{"title": "Bitcoin crashes hard", "sentiment_votes": {"positive": 1, "negative": 10}},
{"title": "Crypto market in fear", "sentiment_votes": {"positive": 0, "negative": 15}},
]
score = agent.analyze(articles)
assert score <= 35
def test_score_no_news_returns_50():
agent = NewsAgent()
assert agent.analyze([]) == 50
def test_score_mixed_news():
agent = NewsAgent()
articles = [{"title": "BTC up", "sentiment_votes": {"positive": 5, "negative": 5}}]
score = agent.analyze(articles)
assert 40 <= score <= 60

31
tests/test_news_client.py Normal file
View File

@@ -0,0 +1,31 @@
import pytest
from data.news_client import NewsClient
@pytest.fixture
def client():
return NewsClient(cryptopanic_key="test", newsapi_key="test")
def test_parse_cryptopanic_response(client):
raw = {
"results": [
{"title": "Bitcoin hits new high", "published_at": "2026-03-20T10:00:00Z",
"currencies": [{"code": "BTC"}], "kind": "news",
"votes": {"positive": 10, "negative": 2}},
{"title": "ETH upgrade coming", "published_at": "2026-03-20T09:00:00Z",
"currencies": [{"code": "ETH"}], "kind": "news",
"votes": {"positive": 5, "negative": 1}},
]
}
articles = client.parse_cryptopanic(raw)
assert len(articles) == 2
assert articles[0]["coin"] == "BTC"
assert articles[0]["title"] == "Bitcoin hits new high"
def test_get_news_for_coin_filters(client):
articles = [
{"coin": "BTC", "title": "BTC news", "sentiment_votes": {"positive": 5, "negative": 1}},
{"coin": "ETH", "title": "ETH news", "sentiment_votes": {"positive": 3, "negative": 3}},
]
btc_news = client.filter_by_coin(articles, "BTC")
assert len(btc_news) == 1
assert btc_news[0]["coin"] == "BTC"

62
tests/test_portfolio.py Normal file
View File

@@ -0,0 +1,62 @@
import pytest
from engine.portfolio import PortfolioManager
@pytest.fixture
def pm():
return PortfolioManager(initial_capital=200.0)
def test_initial_state(pm):
assert pm.cash == 200.0
assert pm.positions == {}
assert pm.trades == []
def test_buy(pm):
pm.buy("BTCUSDT", price=40000.0, score=85)
assert "BTCUSDT" in pm.positions
assert pm.cash < 200.0
assert len(pm.trades) == 1
assert pm.trades[0]["side"] == "BUY"
def test_buy_size_by_score(pm):
pm.buy("SOLUSDT", price=140.0, score=75)
assert abs(pm.trades[0]["amount_usd"] - 30.0) < 0.01
def test_buy_respects_max_positions(pm):
for i, coin in enumerate(["A", "B", "C", "D", "E"]):
pm.buy(f"{coin}USDT", price=10.0, score=80)
pm.buy("FUSDT", price=10.0, score=80)
assert len(pm.positions) == 5
def test_buy_respects_min_position(pm):
pm.cash = 10.0
pm.buy("BTCUSDT", price=40000.0, score=85)
assert "BTCUSDT" not in pm.positions
def test_sell_full(pm):
pm.buy("ETHUSDT", price=3500.0, score=80)
invested = pm.positions["ETHUSDT"]["invested_usd"]
pm.sell("ETHUSDT", price=3800.0, reason="signal")
assert "ETHUSDT" not in pm.positions
assert pm.cash > 200.0 - invested
def test_stop_loss(pm):
pm.buy("DOGEUSDT", price=0.10, score=80)
pm.check_exit("DOGEUSDT", current_price=0.091)
assert "DOGEUSDT" not in pm.positions
def test_take_profit_partial(pm):
pm.buy("SOLUSDT", price=100.0, score=80)
qty_before = pm.positions["SOLUSDT"]["quantity"]
pm.check_exit("SOLUSDT", current_price=116.0)
assert pm.positions["SOLUSDT"]["quantity"] < qty_before
def test_take_profit_full(pm):
pm.buy("SOLUSDT", price=100.0, score=80)
pm.check_exit("SOLUSDT", current_price=126.0)
assert "SOLUSDT" not in pm.positions
def test_pnl_calculation(pm):
pm.buy("ETHUSDT", price=3500.0, score=80)
pnl = pm.get_portfolio_value({"ETHUSDT": 3800.0})
assert pnl["total_pnl"] > 0
assert pnl["total_value"] > 200.0

39
tests/test_signal.py Normal file
View File

@@ -0,0 +1,39 @@
import pytest
from engine.signal import SignalEngine
@pytest.fixture
def engine():
return SignalEngine()
def test_compute_score_default_weights(engine):
score = engine.compute_score(80, 60, 70, 50)
assert score == 72.0
def test_classify_buy(engine):
assert engine.classify(75) == "BUY"
def test_classify_hold(engine):
assert engine.classify(55) == "HOLD"
def test_classify_sell(engine):
assert engine.classify(30) == "SELL"
def test_custom_weights(engine):
engine.set_weights({"technical": 0.4, "news": 0.3, "social": 0.2, "ai": 0.1})
score = engine.compute_score(80, 60, 70, 50)
assert score == 69.0
def test_weights_must_sum_to_one(engine):
with pytest.raises(ValueError):
engine.set_weights({"technical": 0.5, "news": 0.3, "social": 0.1, "ai": 0.2})
def test_rank_coins(engine):
coins = {
"BTC": {"technical": 80, "news": 70, "social": 60, "ai": 75},
"ETH": {"technical": 90, "news": 80, "social": 70, "ai": 85},
"DOGE": {"technical": 30, "news": 25, "social": 40, "ai": 20},
}
ranked = engine.rank_coins(coins)
assert ranked[0]["symbol"] == "ETH"
assert ranked[-1]["symbol"] == "DOGE"
assert ranked[-1]["signal"] == "SELL"

View File

@@ -0,0 +1,19 @@
import pytest
from agents.social import SocialAgent
def test_score_bullish_social():
agent = SocialAgent()
sentiment = {"positive": 8, "negative": 1, "neutral": 1, "total": 10}
score = agent.analyze(sentiment, mention_trend=2.5)
assert score >= 70
def test_score_bearish_social():
agent = SocialAgent()
sentiment = {"positive": 1, "negative": 8, "neutral": 1, "total": 10}
score = agent.analyze(sentiment, mention_trend=0.5)
assert score <= 35
def test_no_data_returns_50():
agent = SocialAgent()
score = agent.analyze({"positive": 0, "negative": 0, "neutral": 0, "total": 0}, 1.0)
assert score == 50

View File

@@ -0,0 +1,25 @@
import pytest
from data.social_client import SocialClient
def test_analyze_reddit_posts():
client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x")
posts = [
{"title": "Bitcoin is amazing! Going to the moon!", "score": 100, "num_comments": 50},
{"title": "BTC crash incoming, sell everything now", "score": 20, "num_comments": 10},
{"title": "Bitcoin steady at 40k, not bad", "score": 50, "num_comments": 30},
]
result = client.simple_sentiment(posts)
assert "positive" in result
assert "negative" in result
assert "neutral" in result
assert result["positive"] + result["negative"] + result["neutral"] == len(posts)
def test_keyword_match():
client = SocialClient(reddit_client_id="x", reddit_secret="x", reddit_user_agent="x")
posts = [
{"title": "Bitcoin BTC going up"},
{"title": "Ethereum news today"},
{"title": "BTC and ETH analysis"},
]
filtered = client.filter_posts_by_coin(posts, "BTC")
assert len(filtered) == 2

21
tests/test_surge.py Normal file
View File

@@ -0,0 +1,21 @@
import pytest
from engine.surge import SurgeDetector
def test_detect_surge():
detector = SurgeDetector(multiplier=3.0)
tickers = [
{"symbol": "BTCUSDT", "quoteVolume": "1000000"},
{"symbol": "NEWUSDT", "quoteVolume": "5000000"},
{"symbol": "ETHUSDT", "quoteVolume": "800000"},
]
avg_volumes = {"BTCUSDT": 900000, "NEWUSDT": 1000000, "ETHUSDT": 750000}
surged = detector.detect(tickers, avg_volumes)
assert "NEWUSDT" in surged
assert "BTCUSDT" not in surged
def test_no_surge():
detector = SurgeDetector(multiplier=3.0)
tickers = [{"symbol": "BTCUSDT", "quoteVolume": "1000000"}]
avg_volumes = {"BTCUSDT": 900000}
surged = detector.detect(tickers, avg_volumes)
assert len(surged) == 0

58
tests/test_technical.py Normal file
View File

@@ -0,0 +1,58 @@
import pytest
import pandas as pd
import numpy as np
from agents.technical import TechnicalAgent
def make_ohlcv(n=100, base_price=100.0):
np.random.seed(42)
closes = base_price + np.cumsum(np.random.randn(n) * 2)
df = pd.DataFrame({
"timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"),
"open": closes - np.random.rand(n),
"high": closes + np.abs(np.random.randn(n) * 2),
"low": closes - np.abs(np.random.randn(n) * 2),
"close": closes,
"volume": np.random.randint(100, 10000, n).astype(float),
"quote_volume": np.random.randint(100000, 1000000, n).astype(float),
})
return df
def test_score_returns_0_to_100():
agent = TechnicalAgent()
df = make_ohlcv(100)
score = agent.analyze(df)
assert 0 <= score <= 100
def test_score_with_uptrend():
agent = TechnicalAgent()
n = 100
closes = np.linspace(100, 200, n)
df = pd.DataFrame({
"timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"),
"open": closes - 1, "high": closes + 2,
"low": closes - 2, "close": closes,
"volume": np.full(n, 5000.0),
"quote_volume": np.full(n, 500000.0),
})
score = agent.analyze(df)
assert score >= 55 # uptrend should score above average
def test_score_with_downtrend():
agent = TechnicalAgent()
n = 100
closes = np.linspace(200, 100, n)
df = pd.DataFrame({
"timestamp": pd.date_range("2026-01-01", periods=n, freq="1h"),
"open": closes + 1, "high": closes + 2,
"low": closes - 2, "close": closes,
"volume": np.full(n, 5000.0),
"quote_volume": np.full(n, 500000.0),
})
score = agent.analyze(df)
assert score <= 45 # downtrend should score below average
def test_insufficient_data_returns_50():
agent = TechnicalAgent()
df = make_ohlcv(5)
score = agent.analyze(df)
assert score == 50