174 lines
5.3 KiB
Python
174 lines
5.3 KiB
Python
|
|
"""Confluence checker for ICT signals.
|
||
|
|
|
||
|
|
Evaluates whether enough ICT conditions align (minimum 3 out of 6)
|
||
|
|
to produce a valid trade signal.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from typing import List
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
from loguru import logger
|
||
|
|
|
||
|
|
from config import settings
|
||
|
|
from indicators.ict_engine import ICTSignals
|
||
|
|
from indicators.multi_timeframe import (
|
||
|
|
MarketBias,
|
||
|
|
MTFAnalysis,
|
||
|
|
TradeDirection,
|
||
|
|
TradeZone,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class ConditionResult:
|
||
|
|
"""Result for a single confluence condition."""
|
||
|
|
|
||
|
|
name: str
|
||
|
|
met: bool
|
||
|
|
detail: str = ""
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class ConfluenceResult:
|
||
|
|
"""Aggregated confluence evaluation."""
|
||
|
|
|
||
|
|
score: int
|
||
|
|
conditions: List[ConditionResult] = field(default_factory=list)
|
||
|
|
is_valid: bool = False
|
||
|
|
direction: TradeDirection = TradeDirection.NONE
|
||
|
|
|
||
|
|
def summary(self) -> str:
|
||
|
|
met = [c.name for c in self.conditions if c.met]
|
||
|
|
return f"Score {self.score}/6 ({', '.join(met)})"
|
||
|
|
|
||
|
|
|
||
|
|
class ConfluenceChecker:
|
||
|
|
"""Check if enough ICT conditions align for a trade entry.
|
||
|
|
|
||
|
|
Six conditions are evaluated:
|
||
|
|
1. Market Structure (HTF bias alignment)
|
||
|
|
2. Liquidity Sweep (sweep then reversal)
|
||
|
|
3. Order Block (price in OB zone)
|
||
|
|
4. Fair Value Gap (price in FVG)
|
||
|
|
5. BOS (break of structure confirmation)
|
||
|
|
6. CHOCH (change of character confirmation)
|
||
|
|
"""
|
||
|
|
|
||
|
|
MIN_CONFLUENCE = settings.MIN_CONFLUENCE_SCORE
|
||
|
|
|
||
|
|
def check(
|
||
|
|
self,
|
||
|
|
mtf: MTFAnalysis,
|
||
|
|
current_price: float,
|
||
|
|
htf_signals: ICTSignals | None = None,
|
||
|
|
mtf_signals: ICTSignals | None = None,
|
||
|
|
ltf_signals: ICTSignals | None = None,
|
||
|
|
) -> ConfluenceResult:
|
||
|
|
"""Evaluate all six conditions and return the result."""
|
||
|
|
conditions: List[ConditionResult] = []
|
||
|
|
bias = mtf.htf_bias
|
||
|
|
target = 1 if bias == MarketBias.BULLISH else (-1 if bias == MarketBias.BEARISH else 0)
|
||
|
|
|
||
|
|
# 1. Market Structure
|
||
|
|
ms_met = bias != MarketBias.NEUTRAL
|
||
|
|
conditions.append(
|
||
|
|
ConditionResult("Market Structure", ms_met, f"HTF bias: {bias.value}")
|
||
|
|
)
|
||
|
|
|
||
|
|
# 2. Liquidity Sweep
|
||
|
|
liq_met = False
|
||
|
|
try:
|
||
|
|
if mtf_signals and not mtf_signals.liquidity.empty:
|
||
|
|
liq_col = mtf_signals.liquidity.get("Liquidity", pd.Series(dtype=float))
|
||
|
|
swept = liq_col.dropna()
|
||
|
|
if len(swept) > 0:
|
||
|
|
val = swept.iloc[-1]
|
||
|
|
if pd.notna(val) and int(val) == target:
|
||
|
|
liq_met = True
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
pass
|
||
|
|
conditions.append(
|
||
|
|
ConditionResult("Liquidity Sweep", liq_met)
|
||
|
|
)
|
||
|
|
|
||
|
|
# 3. Order Block (price in or near aligned OB)
|
||
|
|
ob_met = False
|
||
|
|
for zone in mtf.mtf_zones:
|
||
|
|
if zone.zone_type == "OB" and zone.direction == target:
|
||
|
|
if zone.contains_price(current_price):
|
||
|
|
ob_met = True
|
||
|
|
break
|
||
|
|
# Also check proximity (within 0.5% of zone)
|
||
|
|
zone_size = zone.top - zone.bottom
|
||
|
|
margin = max(zone_size * 0.5, current_price * 0.005)
|
||
|
|
if (zone.bottom - margin) <= current_price <= (zone.top + margin):
|
||
|
|
ob_met = True
|
||
|
|
break
|
||
|
|
conditions.append(
|
||
|
|
ConditionResult(
|
||
|
|
"Order Block",
|
||
|
|
ob_met,
|
||
|
|
"Price in/near aligned OB" if ob_met else "",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
# 4. Fair Value Gap (price in or near aligned FVG)
|
||
|
|
fvg_met = False
|
||
|
|
for zone in mtf.mtf_zones:
|
||
|
|
if zone.zone_type == "FVG" and zone.direction == target:
|
||
|
|
if zone.contains_price(current_price):
|
||
|
|
fvg_met = True
|
||
|
|
break
|
||
|
|
zone_size = zone.top - zone.bottom
|
||
|
|
margin = max(zone_size * 0.5, current_price * 0.005)
|
||
|
|
if (zone.bottom - margin) <= current_price <= (zone.top + margin):
|
||
|
|
fvg_met = True
|
||
|
|
break
|
||
|
|
conditions.append(
|
||
|
|
ConditionResult(
|
||
|
|
"Fair Value Gap",
|
||
|
|
fvg_met,
|
||
|
|
"Price in/near aligned FVG" if fvg_met else "",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
# 5. BOS confirmation
|
||
|
|
bos_met = False
|
||
|
|
if ltf_signals:
|
||
|
|
bos_met = ltf_signals.latest_bos == target
|
||
|
|
conditions.append(
|
||
|
|
ConditionResult("BOS", bos_met)
|
||
|
|
)
|
||
|
|
|
||
|
|
# 6. CHOCH confirmation
|
||
|
|
choch_met = False
|
||
|
|
if ltf_signals:
|
||
|
|
choch_met = ltf_signals.latest_choch == target
|
||
|
|
elif mtf_signals:
|
||
|
|
choch_met = mtf_signals.latest_choch == target
|
||
|
|
conditions.append(
|
||
|
|
ConditionResult("CHOCH", choch_met)
|
||
|
|
)
|
||
|
|
|
||
|
|
score = sum(1 for c in conditions if c.met)
|
||
|
|
direction = (
|
||
|
|
TradeDirection.LONG
|
||
|
|
if target == 1
|
||
|
|
else TradeDirection.SHORT
|
||
|
|
if target == -1
|
||
|
|
else TradeDirection.NONE
|
||
|
|
)
|
||
|
|
is_valid = score >= self.MIN_CONFLUENCE and direction != TradeDirection.NONE
|
||
|
|
|
||
|
|
result = ConfluenceResult(
|
||
|
|
score=score,
|
||
|
|
conditions=conditions,
|
||
|
|
is_valid=is_valid,
|
||
|
|
direction=direction,
|
||
|
|
)
|
||
|
|
logger.info("Confluence check: {}", result.summary())
|
||
|
|
return result
|