feat: AI agent, signal engine, surge detector, portfolio simulator

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-20 17:52:05 +09:00
parent adad553a65
commit 46e06df131
7 changed files with 366 additions and 0 deletions

65
agents/ai_analyst.py Normal file
View File

@@ -0,0 +1,65 @@
import json
import logging
from anthropic import Anthropic
logger = logging.getLogger(__name__)
class AIAgent:
def __init__(self, api_key: str):
self.client = Anthropic(api_key=api_key) if api_key else None
def analyze_batch(self, coins_data: list[dict]) -> dict[str, dict]:
if not coins_data or not self.client:
return {c.get("symbol",""): {"score": 50, "summary": "AI not configured"} for c in coins_data}
prompt = self._build_prompt(coins_data)
try:
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}],
)
return self._parse_response(response.content[0].text, coins_data)
except Exception as e:
logger.error(f"AI analysis failed: {e}")
return {c["symbol"]: {"score": 50, "summary": "Analysis unavailable"} for c in coins_data}
def _build_prompt(self, coins_data: list[dict]) -> str:
coins_text = ""
for coin in coins_data:
coins_text += f"""
Coin: {coin['symbol']}
- Price: ${coin.get('price', 'N/A')}
- 24h Change: {coin.get('change_pct', 'N/A')}%
- Technical Score: {coin.get('technical_score', 'N/A')}/100
- News Sentiment: {coin.get('news_score', 'N/A')}/100
- Social Sentiment: {coin.get('social_score', 'N/A')}/100
- Recent Headlines: {', '.join(coin.get('headlines', [])[:3])}
"""
return f"""You are a crypto market analyst. Analyze these coins for short-term (24h) spot trading potential.
For each coin, provide:
1. A score from 0-100 (0=strong sell, 50=neutral, 100=strong buy)
2. A brief 1-2 sentence summary explaining your reasoning
{coins_text}
Respond in JSON format:
{{
"SYMBOL": {{"score": NUMBER, "summary": "TEXT"}},
...
}}
Only output the JSON, no other text."""
def _parse_response(self, text: str, coins_data: list[dict]) -> dict[str, dict]:
try:
text = text.strip()
if text.startswith("```"):
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
return json.loads(text)
except (json.JSONDecodeError, IndexError) as e:
logger.warning(f"Failed to parse AI response: {e}")
return {c["symbol"]: {"score": 50, "summary": "Parse error"} for c in coins_data}

114
engine/portfolio.py Normal file
View File

@@ -0,0 +1,114 @@
import logging
from datetime import datetime, timezone
from config import MAX_POSITIONS, MIN_POSITION_USD, STOP_LOSS_PCT, TAKE_PROFIT_1_PCT, TAKE_PROFIT_2_PCT
logger = logging.getLogger(__name__)
class PortfolioManager:
def __init__(self, initial_capital: float = 200.0):
self.initial_capital = initial_capital
self.cash = initial_capital
self.positions: dict[str, dict] = {}
self.trades: list[dict] = []
def buy(self, symbol: str, price: float, score: float) -> bool:
if symbol in self.positions:
return False
if len(self.positions) >= MAX_POSITIONS:
return False
amount = self._position_size(score)
if amount < MIN_POSITION_USD:
return False
if amount > self.cash:
amount = self.cash
if amount < MIN_POSITION_USD:
return False
quantity = amount / price
self.cash -= amount
self.positions[symbol] = {
"entry_price": price, "quantity": quantity,
"invested_usd": amount, "tp1_hit": False,
"opened_at": datetime.now(timezone.utc).isoformat(),
}
self.trades.append({
"coin": symbol, "side": "BUY", "price": price,
"quantity": quantity, "amount_usd": amount,
"timestamp": datetime.now(timezone.utc).isoformat(), "reason": "signal",
})
return True
def sell(self, symbol: str, price: float, reason: str = "signal", partial: float = 1.0):
if symbol not in self.positions:
return
pos = self.positions[symbol]
sell_qty = pos["quantity"] * partial
sell_usd = sell_qty * price
self.cash += sell_usd
self.trades.append({
"coin": symbol, "side": "SELL", "price": price,
"quantity": sell_qty, "amount_usd": sell_usd,
"timestamp": datetime.now(timezone.utc).isoformat(), "reason": reason,
})
if partial >= 1.0:
del self.positions[symbol]
else:
pos["quantity"] -= sell_qty
def check_exit(self, symbol: str, current_price: float):
if symbol not in self.positions:
return
pos = self.positions[symbol]
entry = pos["entry_price"]
change_pct = (current_price - entry) / entry
if change_pct <= STOP_LOSS_PCT:
self.sell(symbol, current_price, reason="stop-loss")
return
if change_pct >= TAKE_PROFIT_2_PCT:
self.sell(symbol, current_price, reason="take-profit-2")
return
if change_pct >= TAKE_PROFIT_1_PCT and not pos["tp1_hit"]:
pos["tp1_hit"] = True
self.sell(symbol, current_price, reason="take-profit-1", partial=0.5)
def _position_size(self, score: float) -> float:
if score >= 90:
pct = 0.30
elif score >= 80:
pct = 0.20
else:
pct = 0.15
return round(self.cash * pct, 2)
def get_portfolio_value(self, current_prices: dict[str, float]) -> dict:
holdings_value = sum(
pos["quantity"] * current_prices.get(sym, pos["entry_price"])
for sym, pos in self.positions.items()
)
total_value = self.cash + holdings_value
total_pnl = total_value - self.initial_capital
pnl_pct = (total_pnl / self.initial_capital) * 100
winning = sum(1 for t in self.trades if t["side"] == "SELL" and self._trade_pnl(t) > 0)
total_sells = sum(1 for t in self.trades if t["side"] == "SELL")
win_rate = (winning / total_sells * 100) if total_sells > 0 else 0
return {
"total_value": round(total_value, 2),
"cash": round(self.cash, 2),
"holdings_value": round(holdings_value, 2),
"total_pnl": round(total_pnl, 2),
"pnl_pct": round(pnl_pct, 2),
"win_rate": round(win_rate, 1),
"open_positions": len(self.positions),
}
def _trade_pnl(self, sell_trade: dict) -> float:
matching_buys = [
t for t in self.trades
if t["coin"] == sell_trade["coin"] and t["side"] == "BUY"
and t["timestamp"] <= sell_trade["timestamp"]
]
if matching_buys:
latest_buy = matching_buys[-1]
return (sell_trade["price"] - latest_buy["price"]) * sell_trade["quantity"]
return 0

45
engine/signal.py Normal file
View File

@@ -0,0 +1,45 @@
from config import DEFAULT_WEIGHTS
class SignalEngine:
def __init__(self):
self.weights = dict(DEFAULT_WEIGHTS)
def set_weights(self, weights: dict):
total = sum(weights.values())
if abs(total - 1.0) > 0.01:
raise ValueError(f"Weights must sum to 1.0, got {total}")
self.weights = weights
def compute_score(self, technical: float, news: float, social: float, ai: float) -> float:
score = (
technical * self.weights["technical"]
+ news * self.weights["news"]
+ social * self.weights["social"]
+ ai * self.weights["ai"]
)
return round(score, 1)
def classify(self, score: float) -> str:
if score >= 70:
return "BUY"
elif score >= 40:
return "HOLD"
return "SELL"
def rank_coins(self, coins: dict[str, dict]) -> list[dict]:
results = []
for symbol, scores in coins.items():
composite = self.compute_score(
scores["technical"], scores["news"], scores["social"], scores["ai"]
)
results.append({
"symbol": symbol,
"technical": scores["technical"],
"news": scores["news"],
"social": scores["social"],
"ai": scores["ai"],
"composite": composite,
"signal": self.classify(composite),
})
results.sort(key=lambda x: x["composite"], reverse=True)
return results

20
engine/surge.py Normal file
View File

@@ -0,0 +1,20 @@
import logging
logger = logging.getLogger(__name__)
class SurgeDetector:
def __init__(self, multiplier: float = 3.0):
self.multiplier = multiplier
def detect(self, tickers: list[dict], avg_volumes: dict[str, float]) -> list[str]:
surged = []
for t in tickers:
symbol = t["symbol"]
if not symbol.endswith("USDT"):
continue
current_vol = float(t.get("quoteVolume", 0))
avg_vol = avg_volumes.get(symbol, 0)
if avg_vol > 0 and current_vol >= avg_vol * self.multiplier:
logger.info(f"Surge detected: {symbol} volume {current_vol:.0f} vs avg {avg_vol:.0f}")
surged.append(symbol)
return surged

62
tests/test_portfolio.py Normal file
View File

@@ -0,0 +1,62 @@
import pytest
from engine.portfolio import PortfolioManager
@pytest.fixture
def pm():
return PortfolioManager(initial_capital=200.0)
def test_initial_state(pm):
assert pm.cash == 200.0
assert pm.positions == {}
assert pm.trades == []
def test_buy(pm):
pm.buy("BTCUSDT", price=40000.0, score=85)
assert "BTCUSDT" in pm.positions
assert pm.cash < 200.0
assert len(pm.trades) == 1
assert pm.trades[0]["side"] == "BUY"
def test_buy_size_by_score(pm):
pm.buy("SOLUSDT", price=140.0, score=75)
assert abs(pm.trades[0]["amount_usd"] - 30.0) < 0.01
def test_buy_respects_max_positions(pm):
for i, coin in enumerate(["A", "B", "C", "D", "E"]):
pm.buy(f"{coin}USDT", price=10.0, score=80)
pm.buy("FUSDT", price=10.0, score=80)
assert len(pm.positions) == 5
def test_buy_respects_min_position(pm):
pm.cash = 10.0
pm.buy("BTCUSDT", price=40000.0, score=85)
assert "BTCUSDT" not in pm.positions
def test_sell_full(pm):
pm.buy("ETHUSDT", price=3500.0, score=80)
invested = pm.positions["ETHUSDT"]["invested_usd"]
pm.sell("ETHUSDT", price=3800.0, reason="signal")
assert "ETHUSDT" not in pm.positions
assert pm.cash > 200.0 - invested
def test_stop_loss(pm):
pm.buy("DOGEUSDT", price=0.10, score=80)
pm.check_exit("DOGEUSDT", current_price=0.091)
assert "DOGEUSDT" not in pm.positions
def test_take_profit_partial(pm):
pm.buy("SOLUSDT", price=100.0, score=80)
qty_before = pm.positions["SOLUSDT"]["quantity"]
pm.check_exit("SOLUSDT", current_price=116.0)
assert pm.positions["SOLUSDT"]["quantity"] < qty_before
def test_take_profit_full(pm):
pm.buy("SOLUSDT", price=100.0, score=80)
pm.check_exit("SOLUSDT", current_price=126.0)
assert "SOLUSDT" not in pm.positions
def test_pnl_calculation(pm):
pm.buy("ETHUSDT", price=3500.0, score=80)
pnl = pm.get_portfolio_value({"ETHUSDT": 3800.0})
assert pnl["total_pnl"] > 0
assert pnl["total_value"] > 200.0

39
tests/test_signal.py Normal file
View File

@@ -0,0 +1,39 @@
import pytest
from engine.signal import SignalEngine
@pytest.fixture
def engine():
return SignalEngine()
def test_compute_score_default_weights(engine):
score = engine.compute_score(80, 60, 70, 50)
assert score == 72.0
def test_classify_buy(engine):
assert engine.classify(75) == "BUY"
def test_classify_hold(engine):
assert engine.classify(55) == "HOLD"
def test_classify_sell(engine):
assert engine.classify(30) == "SELL"
def test_custom_weights(engine):
engine.set_weights({"technical": 0.4, "news": 0.3, "social": 0.2, "ai": 0.1})
score = engine.compute_score(80, 60, 70, 50)
assert score == 69.0
def test_weights_must_sum_to_one(engine):
with pytest.raises(ValueError):
engine.set_weights({"technical": 0.5, "news": 0.3, "social": 0.1, "ai": 0.2})
def test_rank_coins(engine):
coins = {
"BTC": {"technical": 80, "news": 70, "social": 60, "ai": 75},
"ETH": {"technical": 90, "news": 80, "social": 70, "ai": 85},
"DOGE": {"technical": 30, "news": 25, "social": 40, "ai": 20},
}
ranked = engine.rank_coins(coins)
assert ranked[0]["symbol"] == "ETH"
assert ranked[-1]["symbol"] == "DOGE"
assert ranked[-1]["signal"] == "SELL"

21
tests/test_surge.py Normal file
View File

@@ -0,0 +1,21 @@
import pytest
from engine.surge import SurgeDetector
def test_detect_surge():
detector = SurgeDetector(multiplier=3.0)
tickers = [
{"symbol": "BTCUSDT", "quoteVolume": "1000000"},
{"symbol": "NEWUSDT", "quoteVolume": "5000000"},
{"symbol": "ETHUSDT", "quoteVolume": "800000"},
]
avg_volumes = {"BTCUSDT": 900000, "NEWUSDT": 1000000, "ETHUSDT": 750000}
surged = detector.detect(tickers, avg_volumes)
assert "NEWUSDT" in surged
assert "BTCUSDT" not in surged
def test_no_surge():
detector = SurgeDetector(multiplier=3.0)
tickers = [{"symbol": "BTCUSDT", "quoteVolume": "1000000"}]
avg_volumes = {"BTCUSDT": 900000}
surged = detector.detect(tickers, avg_volumes)
assert len(surged) == 0