feat: APScheduler analysis job orchestrating all agents
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
151
scheduler/jobs.py
Normal file
151
scheduler/jobs.py
Normal file
@@ -0,0 +1,151 @@
|
|||||||
|
import logging
|
||||||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
|
from data.binance_rest import BinanceRestClient
|
||||||
|
from data.news_client import NewsClient
|
||||||
|
from data.social_client import SocialClient
|
||||||
|
from agents.technical import TechnicalAgent
|
||||||
|
from agents.news import NewsAgent
|
||||||
|
from agents.social import SocialAgent
|
||||||
|
from agents.ai_analyst import AIAgent
|
||||||
|
from engine.signal import SignalEngine
|
||||||
|
from engine.surge import SurgeDetector
|
||||||
|
from engine.portfolio import PortfolioManager
|
||||||
|
from data.db import Database
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class AnalysisJob:
|
||||||
|
def __init__(self, binance: BinanceRestClient, news_client: NewsClient,
|
||||||
|
social_client: SocialClient, ai_agent: AIAgent,
|
||||||
|
signal_engine: SignalEngine, surge_detector: SurgeDetector,
|
||||||
|
portfolio: PortfolioManager, db: Database,
|
||||||
|
monitored_coins: list[str]):
|
||||||
|
self.binance = binance
|
||||||
|
self.news_client = news_client
|
||||||
|
self.social_client = social_client
|
||||||
|
self.ai_agent = ai_agent
|
||||||
|
self.tech_agent = TechnicalAgent()
|
||||||
|
self.news_agent = NewsAgent()
|
||||||
|
self.social_agent = SocialAgent()
|
||||||
|
self.signal_engine = signal_engine
|
||||||
|
self.surge_detector = surge_detector
|
||||||
|
self.portfolio = portfolio
|
||||||
|
self.db = db
|
||||||
|
self.monitored_coins = monitored_coins
|
||||||
|
self.latest_results: dict = {}
|
||||||
|
self.ai_summaries: dict[str, str] = {}
|
||||||
|
|
||||||
|
def run_analysis(self):
|
||||||
|
logger.info("Starting analysis cycle...")
|
||||||
|
try:
|
||||||
|
# 0. Surge detection
|
||||||
|
try:
|
||||||
|
all_tickers = self.binance.client.get_ticker()
|
||||||
|
usdt_tickers = [t for t in all_tickers if t["symbol"].endswith("USDT")]
|
||||||
|
avg_volumes = {t["symbol"]: float(t["quoteVolume"]) * 0.7 for t in usdt_tickers}
|
||||||
|
surged = self.surge_detector.detect(usdt_tickers, avg_volumes)
|
||||||
|
for s in surged:
|
||||||
|
if s not in self.monitored_coins:
|
||||||
|
self.monitored_coins.append(s)
|
||||||
|
logger.info(f"Surge-added: {s}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Surge detection failed: {e}")
|
||||||
|
|
||||||
|
# 1. Fetch data
|
||||||
|
all_news = self.news_client.fetch_all()
|
||||||
|
all_social = self.social_client.fetch_reddit()
|
||||||
|
prices = self.binance.get_all_prices()
|
||||||
|
|
||||||
|
coins_scores = {}
|
||||||
|
coins_for_ai = []
|
||||||
|
|
||||||
|
for symbol in self.monitored_coins:
|
||||||
|
try:
|
||||||
|
# Technical (multi-timeframe)
|
||||||
|
df_1h = self.binance.get_ohlcv(symbol, interval="1h", limit=100)
|
||||||
|
score_1h = self.tech_agent.analyze(df_1h)
|
||||||
|
try:
|
||||||
|
df_4h = self.binance.get_ohlcv(symbol, interval="4h", limit=100)
|
||||||
|
score_4h = self.tech_agent.analyze(df_4h)
|
||||||
|
df_1d = self.binance.get_ohlcv(symbol, interval="1d", limit=200)
|
||||||
|
score_1d = self.tech_agent.analyze(df_1d)
|
||||||
|
tech_score = score_1h * 0.5 + score_4h * 0.3 + score_1d * 0.2
|
||||||
|
except Exception:
|
||||||
|
tech_score = score_1h
|
||||||
|
|
||||||
|
# News
|
||||||
|
coin_news = self.news_client.filter_by_coin(all_news, symbol)
|
||||||
|
news_score = self.news_agent.analyze(coin_news)
|
||||||
|
|
||||||
|
# Social
|
||||||
|
coin_posts = self.social_client.filter_posts_by_coin(all_social, symbol)
|
||||||
|
sentiment = self.social_client.simple_sentiment(coin_posts)
|
||||||
|
social_score = self.social_agent.analyze(sentiment, mention_trend=1.0)
|
||||||
|
|
||||||
|
coins_scores[symbol] = {
|
||||||
|
"technical": tech_score,
|
||||||
|
"news": news_score,
|
||||||
|
"social": social_score,
|
||||||
|
"ai": 50,
|
||||||
|
}
|
||||||
|
coins_for_ai.append({
|
||||||
|
"symbol": symbol.replace("USDT", ""),
|
||||||
|
"price": prices.get(symbol, 0),
|
||||||
|
"change_pct": 0,
|
||||||
|
"technical_score": tech_score,
|
||||||
|
"news_score": news_score,
|
||||||
|
"social_score": social_score,
|
||||||
|
"headlines": [a["title"] for a in coin_news[:3]],
|
||||||
|
})
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Analysis failed for {symbol}: {e}")
|
||||||
|
coins_scores[symbol] = {"technical": 50, "news": 50, "social": 50, "ai": 50}
|
||||||
|
|
||||||
|
# 2. AI batch analysis
|
||||||
|
if coins_for_ai:
|
||||||
|
ai_results = self.ai_agent.analyze_batch(coins_for_ai)
|
||||||
|
for symbol in coins_scores:
|
||||||
|
short = symbol.replace("USDT", "")
|
||||||
|
if short in ai_results:
|
||||||
|
coins_scores[symbol]["ai"] = ai_results[short].get("score", 50)
|
||||||
|
self.ai_summaries[symbol] = ai_results[short].get("summary", "")
|
||||||
|
|
||||||
|
# 3. Compute signals
|
||||||
|
ranked = self.signal_engine.rank_coins(coins_scores)
|
||||||
|
self.latest_results = {r["symbol"]: r for r in ranked}
|
||||||
|
|
||||||
|
# 4. Store signals
|
||||||
|
for r in ranked:
|
||||||
|
self.db.insert_signal(
|
||||||
|
r["symbol"], r["technical"], r["news"], r["social"], r["ai"],
|
||||||
|
r["composite"], r["signal"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# 5. Portfolio actions
|
||||||
|
for r in ranked:
|
||||||
|
symbol = r["symbol"]
|
||||||
|
price = prices.get(symbol, 0)
|
||||||
|
if price <= 0:
|
||||||
|
continue
|
||||||
|
self.portfolio.check_exit(symbol, price)
|
||||||
|
if r["signal"] == "BUY" and symbol not in self.portfolio.positions:
|
||||||
|
self.portfolio.buy(symbol, price, r["composite"])
|
||||||
|
elif r["signal"] == "SELL" and symbol in self.portfolio.positions:
|
||||||
|
self.portfolio.sell(symbol, price, reason="signal-sell")
|
||||||
|
|
||||||
|
# 6. Save portfolio snapshot
|
||||||
|
pv = self.portfolio.get_portfolio_value(prices)
|
||||||
|
self.db.insert_portfolio_snapshot(
|
||||||
|
pv["total_value"], pv["cash"], pv["total_pnl"], pv["pnl_pct"]
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(f"Analysis complete. {len(ranked)} coins scored. Portfolio: ${pv['total_value']:.2f}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Analysis cycle failed: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def create_scheduler(job: AnalysisJob, interval_minutes: int = 15) -> BackgroundScheduler:
|
||||||
|
scheduler = BackgroundScheduler()
|
||||||
|
scheduler.add_job(job.run_analysis, "interval", minutes=interval_minutes, id="analysis")
|
||||||
|
return scheduler
|
||||||
Reference in New Issue
Block a user