Files
crypto_news/indicators/multi_timeframe.py
2026-03-20 07:49:42 +09:00

315 lines
9.5 KiB
Python

"""Multi-timeframe ICT analysis.
Combines Higher, Middle, and Lower timeframe signals to determine
market bias, key zones, and precise entry points.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
import pandas as pd
from loguru import logger
from config import settings
from core.data_feed import DataFeed
from indicators.ict_engine import ICTEngine, ICTSignals
class MarketBias(str, Enum):
BULLISH = "BULLISH"
BEARISH = "BEARISH"
NEUTRAL = "NEUTRAL"
class TradeDirection(str, Enum):
LONG = "LONG"
SHORT = "SHORT"
NONE = "NONE"
@dataclass
class TradeZone:
"""A price zone of interest (OB or FVG)."""
zone_type: str # "OB" or "FVG"
direction: int # 1 = bullish, -1 = bearish
top: float
bottom: float
timeframe: str
strength: float = 1.0
@property
def mid(self) -> float:
return (self.top + self.bottom) / 2
def contains_price(self, price: float) -> bool:
return self.bottom <= price <= self.top
@dataclass
class EntryPoint:
"""Precise entry details derived from LTF analysis."""
direction: TradeDirection
price: float
stop_loss: float
take_profit: float
zone: TradeZone
timeframe: str
@dataclass
class MTFAnalysis:
"""Result of multi-timeframe analysis."""
htf_bias: MarketBias
mtf_zones: List[TradeZone] = field(default_factory=list)
ltf_entry: Optional[EntryPoint] = None
confluence_score: int = 0
class MultiTimeframeAnalyzer:
"""Analyze market structure across Higher, Middle, and Lower timeframes."""
TIMEFRAMES = {
"htf": settings.HTF_TIMEFRAME,
"mtf": settings.MTF_TIMEFRAME,
"ltf": settings.LTF_TIMEFRAME,
}
def __init__(self, ict_engine: ICTEngine):
self.engine = ict_engine
async def analyze_all(
self, data_feed: DataFeed, symbol: str
) -> MTFAnalysis:
"""Run ICT analysis on all three timeframes and synthesise results."""
# Fetch data for all timeframes
tf_data = await data_feed.fetch_multi_timeframe(
symbol, list(self.TIMEFRAMES.values())
)
htf_df = tf_data[self.TIMEFRAMES["htf"]]
mtf_df = tf_data[self.TIMEFRAMES["mtf"]]
ltf_df = tf_data[self.TIMEFRAMES["ltf"]]
# Analyze each timeframe
htf_signals = self.engine.analyze(htf_df)
mtf_signals = self.engine.analyze(mtf_df)
ltf_signals = self.engine.analyze(ltf_df)
# Determine bias from HTF
bias = self.get_htf_bias(htf_signals)
# Find key zones from MTF
zones = self.find_mtf_zones(mtf_signals, self.TIMEFRAMES["mtf"])
# Find precise entry from LTF
current_price = float(ltf_df["close"].iloc[-1])
entry = self.find_ltf_entry(ltf_signals, bias, zones, current_price)
# Calculate confluence
score = self._calculate_confluence(htf_signals, mtf_signals, ltf_signals, bias)
result = MTFAnalysis(
htf_bias=bias,
mtf_zones=zones,
ltf_entry=entry,
confluence_score=score,
)
logger.info(
"MTF Analysis for {}: bias={}, zones={}, score={}",
symbol, bias.value, len(zones), score,
)
return result
def get_htf_bias(self, htf_signals: ICTSignals) -> MarketBias:
"""Determine overall market direction from the Higher Timeframe.
Uses BOS/CHOCH and swing structure to decide bias.
"""
latest_bos = htf_signals.latest_bos
latest_choch = htf_signals.latest_choch
# CHOCH takes precedence (trend reversal)
if latest_choch == 1:
return MarketBias.BULLISH
if latest_choch == -1:
return MarketBias.BEARISH
# BOS confirms existing trend
if latest_bos == 1:
return MarketBias.BULLISH
if latest_bos == -1:
return MarketBias.BEARISH
return MarketBias.NEUTRAL
def find_mtf_zones(
self, mtf_signals: ICTSignals, timeframe: str
) -> List[TradeZone]:
"""Extract active Order Block and FVG zones from the Middle Timeframe."""
zones: List[TradeZone] = []
# Order Blocks
obs = mtf_signals.active_order_blocks
if not obs.empty and "Top" in obs.columns and "Bottom" in obs.columns:
for _, row in obs.iterrows():
ob_val = row.get("OB", 0)
top_val = row.get("Top")
bottom_val = row.get("Bottom")
if pd.isna(ob_val) or pd.isna(top_val) or pd.isna(bottom_val):
continue
zones.append(
TradeZone(
zone_type="OB",
direction=int(ob_val),
top=float(top_val),
bottom=float(bottom_val),
timeframe=timeframe,
strength=float(row.get("OBVolume", 1.0)) if pd.notna(row.get("OBVolume")) else 1.0,
)
)
# Fair Value Gaps
fvgs = mtf_signals.active_fvg
if not fvgs.empty and "Top" in fvgs.columns and "Bottom" in fvgs.columns:
for _, row in fvgs.iterrows():
fvg_val = row.get("FVG", 0)
top_val = row.get("Top")
bottom_val = row.get("Bottom")
if pd.isna(fvg_val) or pd.isna(top_val) or pd.isna(bottom_val):
continue
zones.append(
TradeZone(
zone_type="FVG",
direction=int(fvg_val),
top=float(top_val),
bottom=float(bottom_val),
timeframe=timeframe,
)
)
return zones
def find_ltf_entry(
self,
ltf_signals: ICTSignals,
bias: MarketBias,
zones: List[TradeZone],
current_price: float,
) -> Optional[EntryPoint]:
"""Search for a precise entry on the Lower Timeframe.
Only looks for entries aligned with the HTF bias within
identified MTF zones.
"""
if bias == MarketBias.NEUTRAL:
return None
target_dir = 1 if bias == MarketBias.BULLISH else -1
# Check if price is inside any zone aligned with the bias
for zone in zones:
if zone.direction != target_dir:
continue
if not zone.contains_price(current_price):
continue
# Confirm with LTF BOS in the same direction
ltf_bos = ltf_signals.latest_bos
if ltf_bos != target_dir:
continue
direction = (
TradeDirection.LONG
if bias == MarketBias.BULLISH
else TradeDirection.SHORT
)
# SL beyond the zone, TP at 2:1 ratio
if direction == TradeDirection.LONG:
stop_loss = zone.bottom * 0.998 # small buffer below zone
risk = current_price - stop_loss
take_profit = current_price + risk * 2
else:
stop_loss = zone.top * 1.002
risk = stop_loss - current_price
take_profit = current_price - risk * 2
return EntryPoint(
direction=direction,
price=current_price,
stop_loss=stop_loss,
take_profit=take_profit,
zone=zone,
timeframe=settings.LTF_TIMEFRAME,
)
return None
def _calculate_confluence(
self,
htf: ICTSignals,
mtf: ICTSignals,
ltf: ICTSignals,
bias: MarketBias,
) -> int:
"""Count how many ICT conditions align (0-6)."""
score = 0
target = 1 if bias == MarketBias.BULLISH else (-1 if bias == MarketBias.BEARISH else 0)
if target == 0:
return 0
# 1. Market structure (HTF bias exists)
if bias != MarketBias.NEUTRAL:
score += 1
# 2. Liquidity sweep detected on MTF
try:
if not mtf.liquidity.empty:
liq = mtf.liquidity.get("Liquidity", pd.Series(dtype=float))
swept = liq.dropna()
if len(swept) > 0:
val = swept.iloc[-1]
if pd.notna(val) and int(val) == target:
score += 1
except (ValueError, TypeError):
pass
# 3. Order Block present on MTF
try:
if not mtf.active_order_blocks.empty:
ob_dirs = mtf.active_order_blocks.get("OB", pd.Series(dtype=float))
valid_obs = ob_dirs.dropna()
if (valid_obs == target).any():
score += 1
except (ValueError, TypeError):
pass
# 4. FVG present on MTF
try:
if not mtf.active_fvg.empty:
fvg_dirs = mtf.active_fvg.get("FVG", pd.Series(dtype=float))
valid_fvgs = fvg_dirs.dropna()
if (valid_fvgs == target).any():
score += 1
except (ValueError, TypeError):
pass
# 5. BOS on LTF
if ltf.latest_bos is not None and ltf.latest_bos == target:
score += 1
# 6. CHOCH on MTF or LTF
mtf_choch = mtf.latest_choch
ltf_choch = ltf.latest_choch
if (mtf_choch is not None and mtf_choch == target) or \
(ltf_choch is not None and ltf_choch == target):
score += 1
return score