"""Simple async event bus for inter-module communication.""" from __future__ import annotations import asyncio from collections import defaultdict from typing import Any, Callable, Coroutine, Dict, List from loguru import logger class EventBus: """Publish-subscribe event bus using asyncio.""" def __init__(self): self._subscribers: Dict[str, List[Callable]] = defaultdict(list) def subscribe(self, event_type: str, handler: Callable[..., Coroutine]) -> None: """Register an async handler for an event type.""" self._subscribers[event_type].append(handler) logger.debug("Subscribed to '{}': {}", event_type, handler.__name__) def unsubscribe(self, event_type: str, handler: Callable) -> None: """Remove a handler from an event type.""" self._subscribers[event_type] = [ h for h in self._subscribers[event_type] if h != handler ] async def publish(self, event_type: str, data: Any = None) -> None: """Publish an event, calling all registered handlers concurrently.""" handlers = self._subscribers.get(event_type, []) if not handlers: return logger.debug("Publishing '{}' to {} handler(s)", event_type, len(handlers)) await asyncio.gather(*(h(data) for h in handlers), return_exceptions=True) # Module-level singleton event_bus = EventBus()