diff --git a/scheduler/jobs.py b/scheduler/jobs.py new file mode 100644 index 0000000..212c7cc --- /dev/null +++ b/scheduler/jobs.py @@ -0,0 +1,151 @@ +import logging +from apscheduler.schedulers.background import BackgroundScheduler +from data.binance_rest import BinanceRestClient +from data.news_client import NewsClient +from data.social_client import SocialClient +from agents.technical import TechnicalAgent +from agents.news import NewsAgent +from agents.social import SocialAgent +from agents.ai_analyst import AIAgent +from engine.signal import SignalEngine +from engine.surge import SurgeDetector +from engine.portfolio import PortfolioManager +from data.db import Database + +logger = logging.getLogger(__name__) + +class AnalysisJob: + def __init__(self, binance: BinanceRestClient, news_client: NewsClient, + social_client: SocialClient, ai_agent: AIAgent, + signal_engine: SignalEngine, surge_detector: SurgeDetector, + portfolio: PortfolioManager, db: Database, + monitored_coins: list[str]): + self.binance = binance + self.news_client = news_client + self.social_client = social_client + self.ai_agent = ai_agent + self.tech_agent = TechnicalAgent() + self.news_agent = NewsAgent() + self.social_agent = SocialAgent() + self.signal_engine = signal_engine + self.surge_detector = surge_detector + self.portfolio = portfolio + self.db = db + self.monitored_coins = monitored_coins + self.latest_results: dict = {} + self.ai_summaries: dict[str, str] = {} + + def run_analysis(self): + logger.info("Starting analysis cycle...") + try: + # 0. Surge detection + try: + all_tickers = self.binance.client.get_ticker() + usdt_tickers = [t for t in all_tickers if t["symbol"].endswith("USDT")] + avg_volumes = {t["symbol"]: float(t["quoteVolume"]) * 0.7 for t in usdt_tickers} + surged = self.surge_detector.detect(usdt_tickers, avg_volumes) + for s in surged: + if s not in self.monitored_coins: + self.monitored_coins.append(s) + logger.info(f"Surge-added: {s}") + except Exception as e: + logger.warning(f"Surge detection failed: {e}") + + # 1. Fetch data + all_news = self.news_client.fetch_all() + all_social = self.social_client.fetch_reddit() + prices = self.binance.get_all_prices() + + coins_scores = {} + coins_for_ai = [] + + for symbol in self.monitored_coins: + try: + # Technical (multi-timeframe) + df_1h = self.binance.get_ohlcv(symbol, interval="1h", limit=100) + score_1h = self.tech_agent.analyze(df_1h) + try: + df_4h = self.binance.get_ohlcv(symbol, interval="4h", limit=100) + score_4h = self.tech_agent.analyze(df_4h) + df_1d = self.binance.get_ohlcv(symbol, interval="1d", limit=200) + score_1d = self.tech_agent.analyze(df_1d) + tech_score = score_1h * 0.5 + score_4h * 0.3 + score_1d * 0.2 + except Exception: + tech_score = score_1h + + # News + coin_news = self.news_client.filter_by_coin(all_news, symbol) + news_score = self.news_agent.analyze(coin_news) + + # Social + coin_posts = self.social_client.filter_posts_by_coin(all_social, symbol) + sentiment = self.social_client.simple_sentiment(coin_posts) + social_score = self.social_agent.analyze(sentiment, mention_trend=1.0) + + coins_scores[symbol] = { + "technical": tech_score, + "news": news_score, + "social": social_score, + "ai": 50, + } + coins_for_ai.append({ + "symbol": symbol.replace("USDT", ""), + "price": prices.get(symbol, 0), + "change_pct": 0, + "technical_score": tech_score, + "news_score": news_score, + "social_score": social_score, + "headlines": [a["title"] for a in coin_news[:3]], + }) + except Exception as e: + logger.warning(f"Analysis failed for {symbol}: {e}") + coins_scores[symbol] = {"technical": 50, "news": 50, "social": 50, "ai": 50} + + # 2. AI batch analysis + if coins_for_ai: + ai_results = self.ai_agent.analyze_batch(coins_for_ai) + for symbol in coins_scores: + short = symbol.replace("USDT", "") + if short in ai_results: + coins_scores[symbol]["ai"] = ai_results[short].get("score", 50) + self.ai_summaries[symbol] = ai_results[short].get("summary", "") + + # 3. Compute signals + ranked = self.signal_engine.rank_coins(coins_scores) + self.latest_results = {r["symbol"]: r for r in ranked} + + # 4. Store signals + for r in ranked: + self.db.insert_signal( + r["symbol"], r["technical"], r["news"], r["social"], r["ai"], + r["composite"], r["signal"], + ) + + # 5. Portfolio actions + for r in ranked: + symbol = r["symbol"] + price = prices.get(symbol, 0) + if price <= 0: + continue + self.portfolio.check_exit(symbol, price) + if r["signal"] == "BUY" and symbol not in self.portfolio.positions: + self.portfolio.buy(symbol, price, r["composite"]) + elif r["signal"] == "SELL" and symbol in self.portfolio.positions: + self.portfolio.sell(symbol, price, reason="signal-sell") + + # 6. Save portfolio snapshot + pv = self.portfolio.get_portfolio_value(prices) + self.db.insert_portfolio_snapshot( + pv["total_value"], pv["cash"], pv["total_pnl"], pv["pnl_pct"] + ) + + logger.info(f"Analysis complete. {len(ranked)} coins scored. Portfolio: ${pv['total_value']:.2f}") + + except Exception as e: + logger.error(f"Analysis cycle failed: {e}") + + +def create_scheduler(job: AnalysisJob, interval_minutes: int = 15) -> BackgroundScheduler: + scheduler = BackgroundScheduler() + scheduler.add_job(job.run_analysis, "interval", minutes=interval_minutes, id="analysis") + return scheduler