import json from typing import Callable, Dict, List, Optional import aio_pika from .rabbit_base import RabbitBase from .config import settings # Maps FuncName -> handler coroutine Handler = Callable[[dict], "awaitable[None]"] class RabbitListenerBase(RabbitBase): def __init__(self, service_id: str, handlers: Dict[str, Handler]): super().__init__() self._service_id = service_id self._handlers = handlers self._consumers: List[aio_pika.abc.AbstractRobustQueue] = [] def _qname(self, exchange: str, routing_keys: List[str]) -> str: rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk != ""])) or "" suffix = f"-{rk_part}" if rk_part else "" return f"{settings.RABBIT_INSTANCE_NAME}-{exchange}{suffix}" async def start(self, declarations: List[dict]): """ declarations: list of {ExchangeName, FuncName, MessageTimeout, Type?, RoutingKeys?} """ for d in declarations: exch = d["ExchangeName"] rks = d.get("RoutingKeys") or [settings.RABBIT_ROUTING_KEY] ttl = d.get("MessageTimeout") or None q = await self.declare_queue_bind(exchange=exch, queue_name=self._qname(exch, rks), routing_keys=rks, ttl_ms=ttl) await q.consume(self._make_consumer(d["FuncName"])) self._consumers.append(q) def _make_consumer(self, func_name: str): handler = self._handlers.get(func_name) async def _on_msg(msg: aio_pika.IncomingMessage): async with msg.process(): try: # Expect CloudEvent JSON envelope = json.loads(msg.body.decode("utf-8")) data = envelope.get("data") if handler: await handler(data) except Exception as e: # swallow to avoid nack loops; your logger can capture details pass return _on_msg