Files

66 lines
2.4 KiB
Python
Raw Permalink Normal View History

import json
import logging
from anthropic import Anthropic
logger = logging.getLogger(__name__)
class AIAgent:
def __init__(self, api_key: str):
self.client = Anthropic(api_key=api_key) if api_key else None
def analyze_batch(self, coins_data: list[dict]) -> dict[str, dict]:
if not coins_data or not self.client:
return {c.get("symbol",""): {"score": 50, "summary": "AI not configured"} for c in coins_data}
prompt = self._build_prompt(coins_data)
try:
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2000,
messages=[{"role": "user", "content": prompt}],
)
return self._parse_response(response.content[0].text, coins_data)
except Exception as e:
logger.error(f"AI analysis failed: {e}")
return {c["symbol"]: {"score": 50, "summary": "Analysis unavailable"} for c in coins_data}
def _build_prompt(self, coins_data: list[dict]) -> str:
coins_text = ""
for coin in coins_data:
coins_text += f"""
Coin: {coin['symbol']}
- Price: ${coin.get('price', 'N/A')}
- 24h Change: {coin.get('change_pct', 'N/A')}%
- Technical Score: {coin.get('technical_score', 'N/A')}/100
- News Sentiment: {coin.get('news_score', 'N/A')}/100
- Social Sentiment: {coin.get('social_score', 'N/A')}/100
- Recent Headlines: {', '.join(coin.get('headlines', [])[:3])}
"""
return f"""You are a crypto market analyst. Analyze these coins for short-term (24h) spot trading potential.
For each coin, provide:
1. A score from 0-100 (0=strong sell, 50=neutral, 100=strong buy)
2. A brief 1-2 sentence summary explaining your reasoning
{coins_text}
Respond in JSON format:
{{
"SYMBOL": {{"score": NUMBER, "summary": "TEXT"}},
...
}}
Only output the JSON, no other text."""
def _parse_response(self, text: str, coins_data: list[dict]) -> dict[str, dict]:
try:
text = text.strip()
if text.startswith("```"):
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
return json.loads(text)
except (json.JSONDecodeError, IndexError) as e:
logger.warning(f"Failed to parse AI response: {e}")
return {c["symbol"]: {"score": 50, "summary": "Parse error"} for c in coins_data}