315 lines
9.5 KiB
Python
315 lines
9.5 KiB
Python
|
|
"""Multi-timeframe ICT analysis.
|
||
|
|
|
||
|
|
Combines Higher, Middle, and Lower timeframe signals to determine
|
||
|
|
market bias, key zones, and precise entry points.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from enum import Enum
|
||
|
|
from typing import List, Optional
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
from loguru import logger
|
||
|
|
|
||
|
|
from config import settings
|
||
|
|
from core.data_feed import DataFeed
|
||
|
|
from indicators.ict_engine import ICTEngine, ICTSignals
|
||
|
|
|
||
|
|
|
||
|
|
class MarketBias(str, Enum):
|
||
|
|
BULLISH = "BULLISH"
|
||
|
|
BEARISH = "BEARISH"
|
||
|
|
NEUTRAL = "NEUTRAL"
|
||
|
|
|
||
|
|
|
||
|
|
class TradeDirection(str, Enum):
|
||
|
|
LONG = "LONG"
|
||
|
|
SHORT = "SHORT"
|
||
|
|
NONE = "NONE"
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class TradeZone:
|
||
|
|
"""A price zone of interest (OB or FVG)."""
|
||
|
|
|
||
|
|
zone_type: str # "OB" or "FVG"
|
||
|
|
direction: int # 1 = bullish, -1 = bearish
|
||
|
|
top: float
|
||
|
|
bottom: float
|
||
|
|
timeframe: str
|
||
|
|
strength: float = 1.0
|
||
|
|
|
||
|
|
@property
|
||
|
|
def mid(self) -> float:
|
||
|
|
return (self.top + self.bottom) / 2
|
||
|
|
|
||
|
|
def contains_price(self, price: float) -> bool:
|
||
|
|
return self.bottom <= price <= self.top
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class EntryPoint:
|
||
|
|
"""Precise entry details derived from LTF analysis."""
|
||
|
|
|
||
|
|
direction: TradeDirection
|
||
|
|
price: float
|
||
|
|
stop_loss: float
|
||
|
|
take_profit: float
|
||
|
|
zone: TradeZone
|
||
|
|
timeframe: str
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class MTFAnalysis:
|
||
|
|
"""Result of multi-timeframe analysis."""
|
||
|
|
|
||
|
|
htf_bias: MarketBias
|
||
|
|
mtf_zones: List[TradeZone] = field(default_factory=list)
|
||
|
|
ltf_entry: Optional[EntryPoint] = None
|
||
|
|
confluence_score: int = 0
|
||
|
|
|
||
|
|
|
||
|
|
class MultiTimeframeAnalyzer:
|
||
|
|
"""Analyze market structure across Higher, Middle, and Lower timeframes."""
|
||
|
|
|
||
|
|
TIMEFRAMES = {
|
||
|
|
"htf": settings.HTF_TIMEFRAME,
|
||
|
|
"mtf": settings.MTF_TIMEFRAME,
|
||
|
|
"ltf": settings.LTF_TIMEFRAME,
|
||
|
|
}
|
||
|
|
|
||
|
|
def __init__(self, ict_engine: ICTEngine):
|
||
|
|
self.engine = ict_engine
|
||
|
|
|
||
|
|
async def analyze_all(
|
||
|
|
self, data_feed: DataFeed, symbol: str
|
||
|
|
) -> MTFAnalysis:
|
||
|
|
"""Run ICT analysis on all three timeframes and synthesise results."""
|
||
|
|
# Fetch data for all timeframes
|
||
|
|
tf_data = await data_feed.fetch_multi_timeframe(
|
||
|
|
symbol, list(self.TIMEFRAMES.values())
|
||
|
|
)
|
||
|
|
|
||
|
|
htf_df = tf_data[self.TIMEFRAMES["htf"]]
|
||
|
|
mtf_df = tf_data[self.TIMEFRAMES["mtf"]]
|
||
|
|
ltf_df = tf_data[self.TIMEFRAMES["ltf"]]
|
||
|
|
|
||
|
|
# Analyze each timeframe
|
||
|
|
htf_signals = self.engine.analyze(htf_df)
|
||
|
|
mtf_signals = self.engine.analyze(mtf_df)
|
||
|
|
ltf_signals = self.engine.analyze(ltf_df)
|
||
|
|
|
||
|
|
# Determine bias from HTF
|
||
|
|
bias = self.get_htf_bias(htf_signals)
|
||
|
|
|
||
|
|
# Find key zones from MTF
|
||
|
|
zones = self.find_mtf_zones(mtf_signals, self.TIMEFRAMES["mtf"])
|
||
|
|
|
||
|
|
# Find precise entry from LTF
|
||
|
|
current_price = float(ltf_df["close"].iloc[-1])
|
||
|
|
entry = self.find_ltf_entry(ltf_signals, bias, zones, current_price)
|
||
|
|
|
||
|
|
# Calculate confluence
|
||
|
|
score = self._calculate_confluence(htf_signals, mtf_signals, ltf_signals, bias)
|
||
|
|
|
||
|
|
result = MTFAnalysis(
|
||
|
|
htf_bias=bias,
|
||
|
|
mtf_zones=zones,
|
||
|
|
ltf_entry=entry,
|
||
|
|
confluence_score=score,
|
||
|
|
)
|
||
|
|
logger.info(
|
||
|
|
"MTF Analysis for {}: bias={}, zones={}, score={}",
|
||
|
|
symbol, bias.value, len(zones), score,
|
||
|
|
)
|
||
|
|
return result
|
||
|
|
|
||
|
|
def get_htf_bias(self, htf_signals: ICTSignals) -> MarketBias:
|
||
|
|
"""Determine overall market direction from the Higher Timeframe.
|
||
|
|
|
||
|
|
Uses BOS/CHOCH and swing structure to decide bias.
|
||
|
|
"""
|
||
|
|
latest_bos = htf_signals.latest_bos
|
||
|
|
latest_choch = htf_signals.latest_choch
|
||
|
|
|
||
|
|
# CHOCH takes precedence (trend reversal)
|
||
|
|
if latest_choch == 1:
|
||
|
|
return MarketBias.BULLISH
|
||
|
|
if latest_choch == -1:
|
||
|
|
return MarketBias.BEARISH
|
||
|
|
|
||
|
|
# BOS confirms existing trend
|
||
|
|
if latest_bos == 1:
|
||
|
|
return MarketBias.BULLISH
|
||
|
|
if latest_bos == -1:
|
||
|
|
return MarketBias.BEARISH
|
||
|
|
|
||
|
|
return MarketBias.NEUTRAL
|
||
|
|
|
||
|
|
def find_mtf_zones(
|
||
|
|
self, mtf_signals: ICTSignals, timeframe: str
|
||
|
|
) -> List[TradeZone]:
|
||
|
|
"""Extract active Order Block and FVG zones from the Middle Timeframe."""
|
||
|
|
zones: List[TradeZone] = []
|
||
|
|
|
||
|
|
# Order Blocks
|
||
|
|
obs = mtf_signals.active_order_blocks
|
||
|
|
if not obs.empty and "Top" in obs.columns and "Bottom" in obs.columns:
|
||
|
|
for _, row in obs.iterrows():
|
||
|
|
ob_val = row.get("OB", 0)
|
||
|
|
top_val = row.get("Top")
|
||
|
|
bottom_val = row.get("Bottom")
|
||
|
|
if pd.isna(ob_val) or pd.isna(top_val) or pd.isna(bottom_val):
|
||
|
|
continue
|
||
|
|
zones.append(
|
||
|
|
TradeZone(
|
||
|
|
zone_type="OB",
|
||
|
|
direction=int(ob_val),
|
||
|
|
top=float(top_val),
|
||
|
|
bottom=float(bottom_val),
|
||
|
|
timeframe=timeframe,
|
||
|
|
strength=float(row.get("OBVolume", 1.0)) if pd.notna(row.get("OBVolume")) else 1.0,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Fair Value Gaps
|
||
|
|
fvgs = mtf_signals.active_fvg
|
||
|
|
if not fvgs.empty and "Top" in fvgs.columns and "Bottom" in fvgs.columns:
|
||
|
|
for _, row in fvgs.iterrows():
|
||
|
|
fvg_val = row.get("FVG", 0)
|
||
|
|
top_val = row.get("Top")
|
||
|
|
bottom_val = row.get("Bottom")
|
||
|
|
if pd.isna(fvg_val) or pd.isna(top_val) or pd.isna(bottom_val):
|
||
|
|
continue
|
||
|
|
zones.append(
|
||
|
|
TradeZone(
|
||
|
|
zone_type="FVG",
|
||
|
|
direction=int(fvg_val),
|
||
|
|
top=float(top_val),
|
||
|
|
bottom=float(bottom_val),
|
||
|
|
timeframe=timeframe,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
return zones
|
||
|
|
|
||
|
|
def find_ltf_entry(
|
||
|
|
self,
|
||
|
|
ltf_signals: ICTSignals,
|
||
|
|
bias: MarketBias,
|
||
|
|
zones: List[TradeZone],
|
||
|
|
current_price: float,
|
||
|
|
) -> Optional[EntryPoint]:
|
||
|
|
"""Search for a precise entry on the Lower Timeframe.
|
||
|
|
|
||
|
|
Only looks for entries aligned with the HTF bias within
|
||
|
|
identified MTF zones.
|
||
|
|
"""
|
||
|
|
if bias == MarketBias.NEUTRAL:
|
||
|
|
return None
|
||
|
|
|
||
|
|
target_dir = 1 if bias == MarketBias.BULLISH else -1
|
||
|
|
|
||
|
|
# Check if price is inside any zone aligned with the bias
|
||
|
|
for zone in zones:
|
||
|
|
if zone.direction != target_dir:
|
||
|
|
continue
|
||
|
|
if not zone.contains_price(current_price):
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Confirm with LTF BOS in the same direction
|
||
|
|
ltf_bos = ltf_signals.latest_bos
|
||
|
|
if ltf_bos != target_dir:
|
||
|
|
continue
|
||
|
|
|
||
|
|
direction = (
|
||
|
|
TradeDirection.LONG
|
||
|
|
if bias == MarketBias.BULLISH
|
||
|
|
else TradeDirection.SHORT
|
||
|
|
)
|
||
|
|
|
||
|
|
# SL beyond the zone, TP at 2:1 ratio
|
||
|
|
if direction == TradeDirection.LONG:
|
||
|
|
stop_loss = zone.bottom * 0.998 # small buffer below zone
|
||
|
|
risk = current_price - stop_loss
|
||
|
|
take_profit = current_price + risk * 2
|
||
|
|
else:
|
||
|
|
stop_loss = zone.top * 1.002
|
||
|
|
risk = stop_loss - current_price
|
||
|
|
take_profit = current_price - risk * 2
|
||
|
|
|
||
|
|
return EntryPoint(
|
||
|
|
direction=direction,
|
||
|
|
price=current_price,
|
||
|
|
stop_loss=stop_loss,
|
||
|
|
take_profit=take_profit,
|
||
|
|
zone=zone,
|
||
|
|
timeframe=settings.LTF_TIMEFRAME,
|
||
|
|
)
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _calculate_confluence(
|
||
|
|
self,
|
||
|
|
htf: ICTSignals,
|
||
|
|
mtf: ICTSignals,
|
||
|
|
ltf: ICTSignals,
|
||
|
|
bias: MarketBias,
|
||
|
|
) -> int:
|
||
|
|
"""Count how many ICT conditions align (0-6)."""
|
||
|
|
score = 0
|
||
|
|
target = 1 if bias == MarketBias.BULLISH else (-1 if bias == MarketBias.BEARISH else 0)
|
||
|
|
if target == 0:
|
||
|
|
return 0
|
||
|
|
|
||
|
|
# 1. Market structure (HTF bias exists)
|
||
|
|
if bias != MarketBias.NEUTRAL:
|
||
|
|
score += 1
|
||
|
|
|
||
|
|
# 2. Liquidity sweep detected on MTF
|
||
|
|
try:
|
||
|
|
if not mtf.liquidity.empty:
|
||
|
|
liq = mtf.liquidity.get("Liquidity", pd.Series(dtype=float))
|
||
|
|
swept = liq.dropna()
|
||
|
|
if len(swept) > 0:
|
||
|
|
val = swept.iloc[-1]
|
||
|
|
if pd.notna(val) and int(val) == target:
|
||
|
|
score += 1
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
pass
|
||
|
|
|
||
|
|
# 3. Order Block present on MTF
|
||
|
|
try:
|
||
|
|
if not mtf.active_order_blocks.empty:
|
||
|
|
ob_dirs = mtf.active_order_blocks.get("OB", pd.Series(dtype=float))
|
||
|
|
valid_obs = ob_dirs.dropna()
|
||
|
|
if (valid_obs == target).any():
|
||
|
|
score += 1
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
pass
|
||
|
|
|
||
|
|
# 4. FVG present on MTF
|
||
|
|
try:
|
||
|
|
if not mtf.active_fvg.empty:
|
||
|
|
fvg_dirs = mtf.active_fvg.get("FVG", pd.Series(dtype=float))
|
||
|
|
valid_fvgs = fvg_dirs.dropna()
|
||
|
|
if (valid_fvgs == target).any():
|
||
|
|
score += 1
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
pass
|
||
|
|
|
||
|
|
# 5. BOS on LTF
|
||
|
|
if ltf.latest_bos is not None and ltf.latest_bos == target:
|
||
|
|
score += 1
|
||
|
|
|
||
|
|
# 6. CHOCH on MTF or LTF
|
||
|
|
mtf_choch = mtf.latest_choch
|
||
|
|
ltf_choch = ltf.latest_choch
|
||
|
|
if (mtf_choch is not None and mtf_choch == target) or \
|
||
|
|
(ltf_choch is not None and ltf_choch == target):
|
||
|
|
score += 1
|
||
|
|
|
||
|
|
return score
|