Skip to content

Message Broker


Client

AsyncRabbitClient

Asynchronous RabbitMQ client with robust connection and retries

Source code in src/broker/client.py
class AsyncRabbitClient:
    """Asynchronous RabbitMQ client with robust connection and retries"""

    def __init__(
            self,
            connection_string: str,
            queue_name: str,
            name: str = "async_rabbit_client"
    ) -> None:
        self.connection_string = connection_string
        self.queue_name = queue_name
        self._connection: Optional[aio_pika.RobustConnection] = None
        self._channel: Optional[aio_pika.RobustChannel] = None
        self._lock = asyncio.Lock()
        self._queue = None
        self._connection_error_message = "The client is not started yet"
        self.name = name

    @property
    def queue(self) -> aio_pika.RobustQueue:
        """The queue the client is connected to"""
        return self._queue

    @property
    def connection_error_message(self) -> str:
        """The last known connection message. Empty if connected"""
        if self.is_connected:
            return ""
        return self._connection_error_message

    async def start(self, warmup_seconds: int = 2):
        """Start the client by ensuring connection and starting the reconnect task.
         Connection attempts will be retried in the background to not interfere with the main loop."""
        await asyncio.sleep(warmup_seconds)  # Give some time for RabbitMQ to be ready
        logging.info("Starting RabbitMQ client...")
        await self._connect()

    async def _connect(self):
        """Establish a robust connection to RabbitMQ and declare the queue"""
        self._connection = await aio_pika.connect_robust(self.connection_string)
        self._connection.close_callbacks.add(callback=self._get_connection_error)
        self._channel = await self._connection.channel(publisher_confirms=True)
        self._queue = await self._channel.declare_queue(self.queue_name, durable=True)
        logger.info("Connected to RabbitMQ and declared queue '%s'.", self.queue_name)

    def _get_connection_error(self, connection, exc):
        """A callback to get the"""
        self._connection_error_message = str(exc)

    async def stop(self):
        """Gracefully close the connection to RabbitMQ"""
        if self.is_connected:
            await self._connection.close()
        logger.info(f"Stopped RabbitMQ connection to {self.queue_name}")

    @timing
    async def consume(self):
        """
        Consume a message from the queue

        Returns
        -------
        out: decoded and decompressed message content
        """
        message = await self.queue.get(no_ack=False)
        if message.content_encoding is None or message.content_encoding == CompressionMethod.UNCOMPRESSED:
            body = message.body
        elif message.content_encoding == CompressionMethod.GZIP:
            body = gzip.decompress(message.body)
        else:
            raise NotImplementedError(f"Decompression method '{message.content_encoding}' is not implemented.")
        decoded = body.decode("utf-8")
        logger.info(f"Consumed message: {message.message_id} of size {message.body_size} bytes")
        await message.ack()
        return json.loads(decoded)

    def _prepare_payload(self,
                         payload: Any,
                         compression: CompressionMethod = CompressionMethod.UNCOMPRESSED) -> Tuple[bytes, "gzip"]:
        """
        Prepare the payload for sending to queue. First encode the data, then compress with the provided method

        Parameters
        ----------
        payload: Any
            The data to encode
        compression: CompressionMethod = CompressionMethod.UNCOMPRESSED
            The compression method to use
        Returns
        -------
        body: bytes
            Encoded data
        encoding: str
            Encoding method to set in message properties
        """
        if isinstance(payload, bytes):
            body = payload
        elif isinstance(payload, str):
            body = payload.encode("utf-8")
        else:
            body = json.dumps(payload, default=str).encode("utf-8")

        body, encoding = self.compress(body, compression)
        return body, encoding

    def compress(self, body: bytes, compression: CompressionMethod) -> tuple[bytes, Any]:
        """Compress the body with a given compression method"""
        if compression == CompressionMethod.UNCOMPRESSED:
            encoding = None
        elif compression == CompressionMethod.GZIP:
            body = gzip.compress(body)
            encoding = 'gzip'
        else:
            raise NotImplementedError(f"Compression method '{compression}' is not implemented.")
        return body, encoding

    async def _publish(
            self,
            payload: Any,
            delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
            routing_key: Optional[str] = None,
            compression: CompressionMethod = CompressionMethod.UNCOMPRESSED,
            **kwargs
    ) -> PublishResults:
        """
        Publish bytes to the RabbitMQ queue, ensuring connection before sending.

        Parameters
        ----------
        payload: Any
            data to be sent
        delivery_mode: DeliveryMode
            delivery mode for the message (default is PERSISTENT)
        routing_key: Optional[str]
            routing key for the message
        compression: CompressionMethod
            whether to compress the message

        Returns
        -------
        PublishResults
            status of the publish operation
        """
        routing_key = routing_key or self.queue_name
        connected, error = await self.async_check_connection()
        if not connected:
            logger.error(error)
            return PublishResults(published=False, error=error)
        body, content_encoding = self._prepare_payload(payload, compression=compression)
        message = Message(body, delivery_mode=delivery_mode, content_encoding=content_encoding)
        await self._channel.default_exchange.publish(message, routing_key=routing_key, mandatory=True, **kwargs)
        logger.info(f"Published message of size %s to '%s'.", message.body_size, routing_key)
        return PublishResults(published=True, error=None)

    @timing
    async def publish(self,
                      payload: Any,
                      delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
                      routing_key: Optional[str] = None,
                      compression: CompressionMethod = CompressionMethod.UNCOMPRESSED,
                      **kwargs) -> PublishResults:
        """
        Publish a dictionary as a JSON-encoded message.

        Parameters
        ----------
        payload: Any
            The data to publish
        delivery_mode: DeliveryMode
            delivery mode for the message (default is PERSISTENT)
        routing_key: Optional[str]
            routing key for the message
        compression: CompressionMethod = CompressionMethod.UNCOMPRESSED
            whether to compress the message

        Returns
        -------
        PublishResults
            status of the publish operation
        """
        try:
            return await self._publish(payload=payload,
                                       delivery_mode=delivery_mode,
                                       routing_key=routing_key,
                                       compression=compression,
                                       **kwargs)
        except Exception as exc:
            error = str(exc)
            logger.error("Failed to publish: %s", error)
            return PublishResults(published=False, error=error)

    @property
    def is_connected(self) -> bool:
        """Indicate if there's a working connection to RabbitMQ"""
        has_connection = self._connection is not None and not self._connection.is_closed
        has_channel = self._channel is not None and not self._channel.is_closed
        return has_connection and has_channel

    async def async_check_connection(self) -> Tuple[bool, str]:
        """Perform a health check by ensuring connection. Returns (status, error message)."""
        return self.is_connected, self.connection_error_message

queue: aio_pika.RobustQueue property

The queue the client is connected to

connection_error_message: str property

The last known connection message. Empty if connected

is_connected: bool property

Indicate if there's a working connection to RabbitMQ

start(warmup_seconds=2) async

Start the client by ensuring connection and starting the reconnect task. Connection attempts will be retried in the background to not interfere with the main loop.

Source code in src/broker/client.py
async def start(self, warmup_seconds: int = 2):
    """Start the client by ensuring connection and starting the reconnect task.
     Connection attempts will be retried in the background to not interfere with the main loop."""
    await asyncio.sleep(warmup_seconds)  # Give some time for RabbitMQ to be ready
    logging.info("Starting RabbitMQ client...")
    await self._connect()

stop() async

Gracefully close the connection to RabbitMQ

Source code in src/broker/client.py
async def stop(self):
    """Gracefully close the connection to RabbitMQ"""
    if self.is_connected:
        await self._connection.close()
    logger.info(f"Stopped RabbitMQ connection to {self.queue_name}")

consume() async

Consume a message from the queue

Returns:

Name Type Description
out decoded and decompressed message content
Source code in src/broker/client.py
@timing
async def consume(self):
    """
    Consume a message from the queue

    Returns
    -------
    out: decoded and decompressed message content
    """
    message = await self.queue.get(no_ack=False)
    if message.content_encoding is None or message.content_encoding == CompressionMethod.UNCOMPRESSED:
        body = message.body
    elif message.content_encoding == CompressionMethod.GZIP:
        body = gzip.decompress(message.body)
    else:
        raise NotImplementedError(f"Decompression method '{message.content_encoding}' is not implemented.")
    decoded = body.decode("utf-8")
    logger.info(f"Consumed message: {message.message_id} of size {message.body_size} bytes")
    await message.ack()
    return json.loads(decoded)

compress(body, compression)

Compress the body with a given compression method

Source code in src/broker/client.py
def compress(self, body: bytes, compression: CompressionMethod) -> tuple[bytes, Any]:
    """Compress the body with a given compression method"""
    if compression == CompressionMethod.UNCOMPRESSED:
        encoding = None
    elif compression == CompressionMethod.GZIP:
        body = gzip.compress(body)
        encoding = 'gzip'
    else:
        raise NotImplementedError(f"Compression method '{compression}' is not implemented.")
    return body, encoding

publish(payload, delivery_mode=DeliveryMode.PERSISTENT, routing_key=None, compression=CompressionMethod.UNCOMPRESSED, **kwargs) async

Publish a dictionary as a JSON-encoded message.

Parameters:

Name Type Description Default
payload Any

The data to publish

required
delivery_mode DeliveryMode

delivery mode for the message (default is PERSISTENT)

PERSISTENT
routing_key Optional[str]

routing key for the message

None
compression CompressionMethod

whether to compress the message

UNCOMPRESSED

Returns:

Type Description
PublishResults

status of the publish operation

Source code in src/broker/client.py
@timing
async def publish(self,
                  payload: Any,
                  delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
                  routing_key: Optional[str] = None,
                  compression: CompressionMethod = CompressionMethod.UNCOMPRESSED,
                  **kwargs) -> PublishResults:
    """
    Publish a dictionary as a JSON-encoded message.

    Parameters
    ----------
    payload: Any
        The data to publish
    delivery_mode: DeliveryMode
        delivery mode for the message (default is PERSISTENT)
    routing_key: Optional[str]
        routing key for the message
    compression: CompressionMethod = CompressionMethod.UNCOMPRESSED
        whether to compress the message

    Returns
    -------
    PublishResults
        status of the publish operation
    """
    try:
        return await self._publish(payload=payload,
                                   delivery_mode=delivery_mode,
                                   routing_key=routing_key,
                                   compression=compression,
                                   **kwargs)
    except Exception as exc:
        error = str(exc)
        logger.error("Failed to publish: %s", error)
        return PublishResults(published=False, error=error)

async_check_connection() async

Perform a health check by ensuring connection. Returns (status, error message).

Source code in src/broker/client.py
async def async_check_connection(self) -> Tuple[bool, str]:
    """Perform a health check by ensuring connection. Returns (status, error message)."""
    return self.is_connected, self.connection_error_message

Schemas

PhotoEmbedding

Bases: BaseModel

Vector embedding of a single photo

Source code in src/broker/schemas.py
class PhotoEmbedding(BaseModel):
    """Vector embedding of a single photo"""
    photo_id: int
    embedding: list[float]

ListingEmbeddings

Bases: BaseModel

Vector embeddings of all photos of a house

Source code in src/broker/schemas.py
class ListingEmbeddings(BaseModel):
    """Vector embeddings of all photos of a house"""
    core_listing_id: int
    embeddings: list[PhotoEmbedding]

PublishResults

Bases: BaseModel

The result of a message publish request

Source code in src/broker/schemas.py
class PublishResults(BaseModel):
    """The result of a message publish request"""
    published: bool
    error: Optional[str] = None

CompressionMethod

Bases: str, Enum

Compression methods for message payloads

Source code in src/broker/schemas.py
class CompressionMethod(str, Enum):
    """Compression methods for message payloads"""
    UNCOMPRESSED = "uncompressed"
    GZIP = "gzip"