diff --git a/agents/ai_analyst.py b/agents/ai_analyst.py new file mode 100644 index 0000000..e614027 --- /dev/null +++ b/agents/ai_analyst.py @@ -0,0 +1,65 @@ +import json +import logging +from anthropic import Anthropic + +logger = logging.getLogger(__name__) + +class AIAgent: + def __init__(self, api_key: str): + self.client = Anthropic(api_key=api_key) if api_key else None + + def analyze_batch(self, coins_data: list[dict]) -> dict[str, dict]: + if not coins_data or not self.client: + return {c.get("symbol",""): {"score": 50, "summary": "AI not configured"} for c in coins_data} + + prompt = self._build_prompt(coins_data) + try: + response = self.client.messages.create( + model="claude-sonnet-4-20250514", + max_tokens=2000, + messages=[{"role": "user", "content": prompt}], + ) + return self._parse_response(response.content[0].text, coins_data) + except Exception as e: + logger.error(f"AI analysis failed: {e}") + return {c["symbol"]: {"score": 50, "summary": "Analysis unavailable"} for c in coins_data} + + def _build_prompt(self, coins_data: list[dict]) -> str: + coins_text = "" + for coin in coins_data: + coins_text += f""" +Coin: {coin['symbol']} +- Price: ${coin.get('price', 'N/A')} +- 24h Change: {coin.get('change_pct', 'N/A')}% +- Technical Score: {coin.get('technical_score', 'N/A')}/100 +- News Sentiment: {coin.get('news_score', 'N/A')}/100 +- Social Sentiment: {coin.get('social_score', 'N/A')}/100 +- Recent Headlines: {', '.join(coin.get('headlines', [])[:3])} +""" + return f"""You are a crypto market analyst. Analyze these coins for short-term (24h) spot trading potential. + +For each coin, provide: +1. A score from 0-100 (0=strong sell, 50=neutral, 100=strong buy) +2. A brief 1-2 sentence summary explaining your reasoning + +{coins_text} + +Respond in JSON format: +{{ + "SYMBOL": {{"score": NUMBER, "summary": "TEXT"}}, + ... +}} + +Only output the JSON, no other text.""" + + def _parse_response(self, text: str, coins_data: list[dict]) -> dict[str, dict]: + try: + text = text.strip() + if text.startswith("```"): + text = text.split("```")[1] + if text.startswith("json"): + text = text[4:] + return json.loads(text) + except (json.JSONDecodeError, IndexError) as e: + logger.warning(f"Failed to parse AI response: {e}") + return {c["symbol"]: {"score": 50, "summary": "Parse error"} for c in coins_data} diff --git a/engine/portfolio.py b/engine/portfolio.py new file mode 100644 index 0000000..73a9471 --- /dev/null +++ b/engine/portfolio.py @@ -0,0 +1,114 @@ +import logging +from datetime import datetime, timezone +from config import MAX_POSITIONS, MIN_POSITION_USD, STOP_LOSS_PCT, TAKE_PROFIT_1_PCT, TAKE_PROFIT_2_PCT + +logger = logging.getLogger(__name__) + +class PortfolioManager: + def __init__(self, initial_capital: float = 200.0): + self.initial_capital = initial_capital + self.cash = initial_capital + self.positions: dict[str, dict] = {} + self.trades: list[dict] = [] + + def buy(self, symbol: str, price: float, score: float) -> bool: + if symbol in self.positions: + return False + if len(self.positions) >= MAX_POSITIONS: + return False + amount = self._position_size(score) + if amount < MIN_POSITION_USD: + return False + if amount > self.cash: + amount = self.cash + if amount < MIN_POSITION_USD: + return False + quantity = amount / price + self.cash -= amount + self.positions[symbol] = { + "entry_price": price, "quantity": quantity, + "invested_usd": amount, "tp1_hit": False, + "opened_at": datetime.now(timezone.utc).isoformat(), + } + self.trades.append({ + "coin": symbol, "side": "BUY", "price": price, + "quantity": quantity, "amount_usd": amount, + "timestamp": datetime.now(timezone.utc).isoformat(), "reason": "signal", + }) + return True + + def sell(self, symbol: str, price: float, reason: str = "signal", partial: float = 1.0): + if symbol not in self.positions: + return + pos = self.positions[symbol] + sell_qty = pos["quantity"] * partial + sell_usd = sell_qty * price + self.cash += sell_usd + self.trades.append({ + "coin": symbol, "side": "SELL", "price": price, + "quantity": sell_qty, "amount_usd": sell_usd, + "timestamp": datetime.now(timezone.utc).isoformat(), "reason": reason, + }) + if partial >= 1.0: + del self.positions[symbol] + else: + pos["quantity"] -= sell_qty + + def check_exit(self, symbol: str, current_price: float): + if symbol not in self.positions: + return + pos = self.positions[symbol] + entry = pos["entry_price"] + change_pct = (current_price - entry) / entry + if change_pct <= STOP_LOSS_PCT: + self.sell(symbol, current_price, reason="stop-loss") + return + if change_pct >= TAKE_PROFIT_2_PCT: + self.sell(symbol, current_price, reason="take-profit-2") + return + if change_pct >= TAKE_PROFIT_1_PCT and not pos["tp1_hit"]: + pos["tp1_hit"] = True + self.sell(symbol, current_price, reason="take-profit-1", partial=0.5) + + def _position_size(self, score: float) -> float: + if score >= 90: + pct = 0.30 + elif score >= 80: + pct = 0.20 + else: + pct = 0.15 + return round(self.cash * pct, 2) + + def get_portfolio_value(self, current_prices: dict[str, float]) -> dict: + holdings_value = sum( + pos["quantity"] * current_prices.get(sym, pos["entry_price"]) + for sym, pos in self.positions.items() + ) + total_value = self.cash + holdings_value + total_pnl = total_value - self.initial_capital + pnl_pct = (total_pnl / self.initial_capital) * 100 + + winning = sum(1 for t in self.trades if t["side"] == "SELL" and self._trade_pnl(t) > 0) + total_sells = sum(1 for t in self.trades if t["side"] == "SELL") + win_rate = (winning / total_sells * 100) if total_sells > 0 else 0 + + return { + "total_value": round(total_value, 2), + "cash": round(self.cash, 2), + "holdings_value": round(holdings_value, 2), + "total_pnl": round(total_pnl, 2), + "pnl_pct": round(pnl_pct, 2), + "win_rate": round(win_rate, 1), + "open_positions": len(self.positions), + } + + def _trade_pnl(self, sell_trade: dict) -> float: + matching_buys = [ + t for t in self.trades + if t["coin"] == sell_trade["coin"] and t["side"] == "BUY" + and t["timestamp"] <= sell_trade["timestamp"] + ] + if matching_buys: + latest_buy = matching_buys[-1] + return (sell_trade["price"] - latest_buy["price"]) * sell_trade["quantity"] + return 0 diff --git a/engine/signal.py b/engine/signal.py new file mode 100644 index 0000000..6dd287e --- /dev/null +++ b/engine/signal.py @@ -0,0 +1,45 @@ +from config import DEFAULT_WEIGHTS + +class SignalEngine: + def __init__(self): + self.weights = dict(DEFAULT_WEIGHTS) + + def set_weights(self, weights: dict): + total = sum(weights.values()) + if abs(total - 1.0) > 0.01: + raise ValueError(f"Weights must sum to 1.0, got {total}") + self.weights = weights + + def compute_score(self, technical: float, news: float, social: float, ai: float) -> float: + score = ( + technical * self.weights["technical"] + + news * self.weights["news"] + + social * self.weights["social"] + + ai * self.weights["ai"] + ) + return round(score, 1) + + def classify(self, score: float) -> str: + if score >= 70: + return "BUY" + elif score >= 40: + return "HOLD" + return "SELL" + + def rank_coins(self, coins: dict[str, dict]) -> list[dict]: + results = [] + for symbol, scores in coins.items(): + composite = self.compute_score( + scores["technical"], scores["news"], scores["social"], scores["ai"] + ) + results.append({ + "symbol": symbol, + "technical": scores["technical"], + "news": scores["news"], + "social": scores["social"], + "ai": scores["ai"], + "composite": composite, + "signal": self.classify(composite), + }) + results.sort(key=lambda x: x["composite"], reverse=True) + return results diff --git a/engine/surge.py b/engine/surge.py new file mode 100644 index 0000000..f79963b --- /dev/null +++ b/engine/surge.py @@ -0,0 +1,20 @@ +import logging + +logger = logging.getLogger(__name__) + +class SurgeDetector: + def __init__(self, multiplier: float = 3.0): + self.multiplier = multiplier + + def detect(self, tickers: list[dict], avg_volumes: dict[str, float]) -> list[str]: + surged = [] + for t in tickers: + symbol = t["symbol"] + if not symbol.endswith("USDT"): + continue + current_vol = float(t.get("quoteVolume", 0)) + avg_vol = avg_volumes.get(symbol, 0) + if avg_vol > 0 and current_vol >= avg_vol * self.multiplier: + logger.info(f"Surge detected: {symbol} volume {current_vol:.0f} vs avg {avg_vol:.0f}") + surged.append(symbol) + return surged diff --git a/tests/test_portfolio.py b/tests/test_portfolio.py new file mode 100644 index 0000000..9b52886 --- /dev/null +++ b/tests/test_portfolio.py @@ -0,0 +1,62 @@ +import pytest +from engine.portfolio import PortfolioManager + +@pytest.fixture +def pm(): + return PortfolioManager(initial_capital=200.0) + +def test_initial_state(pm): + assert pm.cash == 200.0 + assert pm.positions == {} + assert pm.trades == [] + +def test_buy(pm): + pm.buy("BTCUSDT", price=40000.0, score=85) + assert "BTCUSDT" in pm.positions + assert pm.cash < 200.0 + assert len(pm.trades) == 1 + assert pm.trades[0]["side"] == "BUY" + +def test_buy_size_by_score(pm): + pm.buy("SOLUSDT", price=140.0, score=75) + assert abs(pm.trades[0]["amount_usd"] - 30.0) < 0.01 + +def test_buy_respects_max_positions(pm): + for i, coin in enumerate(["A", "B", "C", "D", "E"]): + pm.buy(f"{coin}USDT", price=10.0, score=80) + pm.buy("FUSDT", price=10.0, score=80) + assert len(pm.positions) == 5 + +def test_buy_respects_min_position(pm): + pm.cash = 10.0 + pm.buy("BTCUSDT", price=40000.0, score=85) + assert "BTCUSDT" not in pm.positions + +def test_sell_full(pm): + pm.buy("ETHUSDT", price=3500.0, score=80) + invested = pm.positions["ETHUSDT"]["invested_usd"] + pm.sell("ETHUSDT", price=3800.0, reason="signal") + assert "ETHUSDT" not in pm.positions + assert pm.cash > 200.0 - invested + +def test_stop_loss(pm): + pm.buy("DOGEUSDT", price=0.10, score=80) + pm.check_exit("DOGEUSDT", current_price=0.091) + assert "DOGEUSDT" not in pm.positions + +def test_take_profit_partial(pm): + pm.buy("SOLUSDT", price=100.0, score=80) + qty_before = pm.positions["SOLUSDT"]["quantity"] + pm.check_exit("SOLUSDT", current_price=116.0) + assert pm.positions["SOLUSDT"]["quantity"] < qty_before + +def test_take_profit_full(pm): + pm.buy("SOLUSDT", price=100.0, score=80) + pm.check_exit("SOLUSDT", current_price=126.0) + assert "SOLUSDT" not in pm.positions + +def test_pnl_calculation(pm): + pm.buy("ETHUSDT", price=3500.0, score=80) + pnl = pm.get_portfolio_value({"ETHUSDT": 3800.0}) + assert pnl["total_pnl"] > 0 + assert pnl["total_value"] > 200.0 diff --git a/tests/test_signal.py b/tests/test_signal.py new file mode 100644 index 0000000..477ecb2 --- /dev/null +++ b/tests/test_signal.py @@ -0,0 +1,39 @@ +import pytest +from engine.signal import SignalEngine + +@pytest.fixture +def engine(): + return SignalEngine() + +def test_compute_score_default_weights(engine): + score = engine.compute_score(80, 60, 70, 50) + assert score == 72.0 + +def test_classify_buy(engine): + assert engine.classify(75) == "BUY" + +def test_classify_hold(engine): + assert engine.classify(55) == "HOLD" + +def test_classify_sell(engine): + assert engine.classify(30) == "SELL" + +def test_custom_weights(engine): + engine.set_weights({"technical": 0.4, "news": 0.3, "social": 0.2, "ai": 0.1}) + score = engine.compute_score(80, 60, 70, 50) + assert score == 69.0 + +def test_weights_must_sum_to_one(engine): + with pytest.raises(ValueError): + engine.set_weights({"technical": 0.5, "news": 0.3, "social": 0.1, "ai": 0.2}) + +def test_rank_coins(engine): + coins = { + "BTC": {"technical": 80, "news": 70, "social": 60, "ai": 75}, + "ETH": {"technical": 90, "news": 80, "social": 70, "ai": 85}, + "DOGE": {"technical": 30, "news": 25, "social": 40, "ai": 20}, + } + ranked = engine.rank_coins(coins) + assert ranked[0]["symbol"] == "ETH" + assert ranked[-1]["symbol"] == "DOGE" + assert ranked[-1]["signal"] == "SELL" diff --git a/tests/test_surge.py b/tests/test_surge.py new file mode 100644 index 0000000..8556181 --- /dev/null +++ b/tests/test_surge.py @@ -0,0 +1,21 @@ +import pytest +from engine.surge import SurgeDetector + +def test_detect_surge(): + detector = SurgeDetector(multiplier=3.0) + tickers = [ + {"symbol": "BTCUSDT", "quoteVolume": "1000000"}, + {"symbol": "NEWUSDT", "quoteVolume": "5000000"}, + {"symbol": "ETHUSDT", "quoteVolume": "800000"}, + ] + avg_volumes = {"BTCUSDT": 900000, "NEWUSDT": 1000000, "ETHUSDT": 750000} + surged = detector.detect(tickers, avg_volumes) + assert "NEWUSDT" in surged + assert "BTCUSDT" not in surged + +def test_no_surge(): + detector = SurgeDetector(multiplier=3.0) + tickers = [{"symbol": "BTCUSDT", "quoteVolume": "1000000"}] + avg_volumes = {"BTCUSDT": 900000} + surged = detector.detect(tickers, avg_volumes) + assert len(surged) == 0