279 lines
9.4 KiB
Python
279 lines
9.4 KiB
Python
"""ICT entry rules.
|
|
|
|
Evaluates bullish and bearish entry conditions and calculates
|
|
stop-loss / take-profit levels based on market structure.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional
|
|
|
|
import pandas as pd
|
|
from loguru import logger
|
|
|
|
from indicators.ict_engine import ICTSignals
|
|
from indicators.multi_timeframe import TradeDirection
|
|
|
|
|
|
@dataclass
|
|
class EntryResult:
|
|
"""Result of an entry rule evaluation."""
|
|
|
|
is_valid: bool
|
|
direction: TradeDirection
|
|
conditions_met: List[str] = field(default_factory=list)
|
|
conditions_failed: List[str] = field(default_factory=list)
|
|
|
|
|
|
class EntryRules:
|
|
"""ICT-based entry rule evaluation.
|
|
|
|
Bullish entry (LONG):
|
|
1. HTF: Higher Highs & Higher Lows
|
|
2. Liquidity Sweep: previous low swept then bounce
|
|
3. Order Block: price enters bullish OB zone
|
|
4. FVG: price returns to bullish FVG
|
|
5. BOS: upward break of structure
|
|
|
|
Bearish entry (SHORT): mirror logic.
|
|
"""
|
|
|
|
def check_bullish_entry(
|
|
self, signals: ICTSignals, price: float
|
|
) -> EntryResult:
|
|
"""Evaluate bullish (LONG) entry conditions."""
|
|
met: List[str] = []
|
|
failed: List[str] = []
|
|
|
|
# 1. BOS bullish
|
|
if signals.latest_bos == 1:
|
|
met.append("BOS bullish")
|
|
else:
|
|
failed.append("BOS bullish")
|
|
|
|
# 2. Order Block -- price in bullish OB zone
|
|
obs = signals.active_order_blocks
|
|
ob_hit = False
|
|
if not obs.empty and "OB" in obs.columns:
|
|
bullish_obs = obs[obs["OB"] == 1]
|
|
for _, row in bullish_obs.iterrows():
|
|
bottom = row.get("Bottom", 0)
|
|
top = row.get("Top", 0)
|
|
if pd.notna(bottom) and pd.notna(top) and bottom <= price <= top:
|
|
ob_hit = True
|
|
break
|
|
if ob_hit:
|
|
met.append("Order Block")
|
|
else:
|
|
failed.append("Order Block")
|
|
|
|
# 3. FVG -- price in bullish FVG
|
|
fvgs = signals.active_fvg
|
|
fvg_hit = False
|
|
if not fvgs.empty and "FVG" in fvgs.columns:
|
|
bullish_fvg = fvgs[fvgs["FVG"] == 1]
|
|
for _, row in bullish_fvg.iterrows():
|
|
bottom = row.get("Bottom", 0)
|
|
top = row.get("Top", 0)
|
|
if pd.notna(bottom) and pd.notna(top) and bottom <= price <= top:
|
|
fvg_hit = True
|
|
break
|
|
if fvg_hit:
|
|
met.append("FVG")
|
|
else:
|
|
failed.append("FVG")
|
|
|
|
# 4. Liquidity swept (recent bearish liquidity = trap)
|
|
liq_swept = False
|
|
try:
|
|
if not signals.liquidity.empty:
|
|
liq_col = signals.liquidity.get("Liquidity", pd.Series(dtype=float))
|
|
recent = liq_col.dropna().tail(3)
|
|
if len(recent) > 0 and (recent == -1).any():
|
|
liq_swept = True
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if liq_swept:
|
|
met.append("Liquidity Sweep")
|
|
else:
|
|
failed.append("Liquidity Sweep")
|
|
|
|
# 5. CHOCH bullish (optional extra confirmation)
|
|
if signals.latest_choch == 1:
|
|
met.append("CHOCH bullish")
|
|
else:
|
|
failed.append("CHOCH bullish")
|
|
|
|
return EntryResult(
|
|
is_valid=len(met) >= 3,
|
|
direction=TradeDirection.LONG,
|
|
conditions_met=met,
|
|
conditions_failed=failed,
|
|
)
|
|
|
|
def check_bearish_entry(
|
|
self, signals: ICTSignals, price: float
|
|
) -> EntryResult:
|
|
"""Evaluate bearish (SHORT) entry conditions."""
|
|
met: List[str] = []
|
|
failed: List[str] = []
|
|
|
|
# 1. BOS bearish
|
|
if signals.latest_bos == -1:
|
|
met.append("BOS bearish")
|
|
else:
|
|
failed.append("BOS bearish")
|
|
|
|
# 2. Order Block -- price in bearish OB
|
|
obs = signals.active_order_blocks
|
|
ob_hit = False
|
|
if not obs.empty and "OB" in obs.columns:
|
|
bearish_obs = obs[obs["OB"] == -1]
|
|
for _, row in bearish_obs.iterrows():
|
|
bottom = row.get("Bottom", 0)
|
|
top = row.get("Top", 0)
|
|
if pd.notna(bottom) and pd.notna(top) and bottom <= price <= top:
|
|
ob_hit = True
|
|
break
|
|
if ob_hit:
|
|
met.append("Order Block")
|
|
else:
|
|
failed.append("Order Block")
|
|
|
|
# 3. FVG bearish
|
|
fvgs = signals.active_fvg
|
|
fvg_hit = False
|
|
if not fvgs.empty and "FVG" in fvgs.columns:
|
|
bearish_fvg = fvgs[fvgs["FVG"] == -1]
|
|
for _, row in bearish_fvg.iterrows():
|
|
bottom = row.get("Bottom", 0)
|
|
top = row.get("Top", 0)
|
|
if pd.notna(bottom) and pd.notna(top) and bottom <= price <= top:
|
|
fvg_hit = True
|
|
break
|
|
if fvg_hit:
|
|
met.append("FVG")
|
|
else:
|
|
failed.append("FVG")
|
|
|
|
# 4. Liquidity swept (bullish liquidity = trap)
|
|
liq_swept = False
|
|
try:
|
|
if not signals.liquidity.empty:
|
|
liq_col = signals.liquidity.get("Liquidity", pd.Series(dtype=float))
|
|
recent = liq_col.dropna().tail(3)
|
|
if len(recent) > 0 and (recent == 1).any():
|
|
liq_swept = True
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if liq_swept:
|
|
met.append("Liquidity Sweep")
|
|
else:
|
|
failed.append("Liquidity Sweep")
|
|
|
|
# 5. CHOCH bearish
|
|
if signals.latest_choch == -1:
|
|
met.append("CHOCH bearish")
|
|
else:
|
|
failed.append("CHOCH bearish")
|
|
|
|
return EntryResult(
|
|
is_valid=len(met) >= 3,
|
|
direction=TradeDirection.SHORT,
|
|
conditions_met=met,
|
|
conditions_failed=failed,
|
|
)
|
|
|
|
def calculate_stop_loss(
|
|
self,
|
|
direction: TradeDirection,
|
|
signals: ICTSignals,
|
|
entry_price: float,
|
|
) -> float:
|
|
"""Calculate stop-loss based on OB boundary or recent swing high/low.
|
|
|
|
For LONG: SL below the nearest bullish OB bottom or swing low.
|
|
For SHORT: SL above the nearest bearish OB top or swing high.
|
|
"""
|
|
buffer_pct = 0.002 # 0.2% buffer
|
|
|
|
if direction == TradeDirection.LONG:
|
|
# Try OB bottom first
|
|
obs = signals.active_order_blocks
|
|
if not obs.empty and "OB" in obs.columns:
|
|
bullish_obs = obs[obs["OB"] == 1]
|
|
if not bullish_obs.empty:
|
|
lowest_bottom = bullish_obs["Bottom"].dropna().min()
|
|
if pd.notna(lowest_bottom):
|
|
return float(lowest_bottom) * (1 - buffer_pct)
|
|
|
|
# Fallback: recent swing low
|
|
swing = signals.swing_highs_lows
|
|
if "Level" in swing.columns and "HighLow" in swing.columns:
|
|
lows = swing[swing["HighLow"] == -1]["Level"].dropna()
|
|
if len(lows) > 0:
|
|
return float(lows.iloc[-1]) * (1 - buffer_pct)
|
|
|
|
# Last resort: fixed percentage
|
|
return entry_price * (1 - 0.02)
|
|
|
|
else: # SHORT
|
|
obs = signals.active_order_blocks
|
|
if not obs.empty and "OB" in obs.columns:
|
|
bearish_obs = obs[obs["OB"] == -1]
|
|
if not bearish_obs.empty:
|
|
highest_top = bearish_obs["Top"].dropna().max()
|
|
if pd.notna(highest_top):
|
|
return float(highest_top) * (1 + buffer_pct)
|
|
|
|
swing = signals.swing_highs_lows
|
|
if "Level" in swing.columns and "HighLow" in swing.columns:
|
|
highs = swing[swing["HighLow"] == 1]["Level"].dropna()
|
|
if len(highs) > 0:
|
|
return float(highs.iloc[-1]) * (1 + buffer_pct)
|
|
|
|
return entry_price * (1 + 0.02)
|
|
|
|
def calculate_take_profit(
|
|
self,
|
|
direction: TradeDirection,
|
|
signals: ICTSignals,
|
|
entry_price: float,
|
|
stop_loss: float,
|
|
) -> float:
|
|
"""Calculate take-profit targeting opposite OB/FVG or 2:1 R:R minimum.
|
|
|
|
For LONG: TP at the nearest bearish OB/FVG above entry, or 2x risk.
|
|
For SHORT: TP at the nearest bullish OB/FVG below entry, or 2x risk.
|
|
"""
|
|
risk = abs(entry_price - stop_loss)
|
|
min_tp_distance = risk * 2 # ensure at least 2:1 R:R
|
|
|
|
if direction == TradeDirection.LONG:
|
|
# Look for bearish OB above price
|
|
obs = signals.active_order_blocks
|
|
if not obs.empty and "OB" in obs.columns:
|
|
bearish_obs = obs[obs["OB"] == -1]
|
|
bottom_vals = bearish_obs["Bottom"].dropna()
|
|
above = bottom_vals[bottom_vals > entry_price]
|
|
if len(above) > 0:
|
|
tp = float(above.min())
|
|
if tp - entry_price >= min_tp_distance:
|
|
return tp
|
|
|
|
return entry_price + min_tp_distance
|
|
|
|
else: # SHORT
|
|
obs = signals.active_order_blocks
|
|
if not obs.empty and "OB" in obs.columns:
|
|
bullish_obs = obs[obs["OB"] == 1]
|
|
top_vals = bullish_obs["Top"].dropna()
|
|
below = top_vals[top_vals < entry_price]
|
|
if len(below) > 0:
|
|
tp = float(below.max())
|
|
if entry_price - tp >= min_tp_distance:
|
|
return tp
|
|
|
|
return entry_price - min_tp_distance
|