Skip to content

Conversation

@dadodimauro
Copy link

This PR implements an asynchronous interface for the rabbitmq-amqp-python-client as proposed in #49.

The idea is to create a set of separate classes that wrap the syncronous ones, implementig the blocking operation in a separate thread (using run_in_executor).

Changes

Added a new rabbitmq_amqp_python_client.asyncio module that provides async/await compatible interfaces for AMQP operations.

Key components

  • AsyncConnection
  • AsyncPublisher
  • AsyncConsumer
  • AsyncManagment
  • AsyncEnviroment

Implementation details

The async classes act as facades that:

  1. Wrap the corresponding synchronous classes
  2. Execute blocking operations in a thread pool executor using run_in_executor
  3. Coordinate concurrent access using asyncio.Lock
  4. Implement proper async context managers (async with) for resource management
  5. Maintain API compatibility with the synchronous version

Checklist

  • Add async classes
  • Add tests for async classes
  • Update examples
  • Add complete docstrings to async classes
  • Performance benchmarks
  • Validate thread safety

@dadodimauro
Copy link
Author

Changes to existing codebase

I also made two small changes to the existing codebase. I write this comment to higlight this and also to ask if it was outside the scope of this PR and if you prefer me to propose this changes in a separate PR.

  • 18e80d6: This is done to fix a problem where a queue is deleted before the link is closed, this caused the test tests/asyncio/test_publisher.py::test_connection_context_manager_async to be flaky.

  • 78db3e7: this changes the is_open property behaviour, please let me know if I misinterpreted how this should work.

@dadodimauro
Copy link
Author

One thing that worries me about the current implementation of the async interfaces was this segmentation fault error i got when running the test: https://gist.github.com/dadodimauro/dc70490623ae16e5007251557e5cfba5

I tried to reproduce it with, but I was unsuccessful:

for i in {1..100}; do echo "=== Run $i ==="; poetry run pytest tests/asyncio/ -x || echo "FAILED"; done

@Gsantomaggio
Copy link
Member

interfaces was this segmentation fault error i got when running the test

segmentation fault comes from the C part. I am almost sure that the C part does not handle the multi-threaded access.

@dadodimauro
Copy link
Author

interfaces was this segmentation fault error i got when running the test

segmentation fault comes from the C part. I am almost sure that the C part does not handle the multi-threaded access.

I think so too, that's why I am using asyncio.Lock to avoid this, but I'm not sure it's the correct approach.

@dadodimauro
Copy link
Author

I’ve pushed some fixes and improved the docstrings.

However, there’s still an issue with the consumer.run() and consumer.stop() methods. At the moment, the only clean way to interrupt the consumer is by raising an exception inside the message handler.

I tried several approaches to start and stop the consumer gracefully, but couldn’t find a reliable solution. The script below reproduces the issue and demonstrates the behavior I encountered (btw, the error raised is not always the same, sometime is a segmentation fault, other times a connection error, I didn't understand why).

I’d really appreciate any guidance or suggestions on how to handle this. Thank you!

import asyncio
from rabbitmq_amqp_python_client import (  
    AddressHelper,
    AMQPMessagingHandler,
    Converter,
    AsyncEnvironment,
    Event,
    ExchangeSpecification,
    ExchangeToQueueBindingSpecification,
    Message,
    OutcomeState,
    QuorumQueueSpecification,
)

MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):

    def __init__(self):
        super().__init__()
        self._count = 0

    def on_amqp_message(self, event: Event):
        print(
            "received message: {} ".format(
                Converter.bytes_to_string(event.message.body)
            )
        )

        self.delivery_context.accept(event)
        self._count = self._count + 1
        print("count " + str(self._count))

    def on_connection_closed(self, event: Event):
        print("connection closed")

    def on_link_closed(self, event: Event) -> None:
        print("link closed")


async def main():
    exchange_name = "test-exchange"
    queue_name = "example-queue"
    routing_key = "routing-key"

    print("connection to amqp server")

    async with AsyncEnvironment(
        uri="amqp://guest:guest@localhost:5672/"
    ) as environment:
        connection = await environment.connection()
        await connection.dial()
        management = await connection.management()

        print("declaring exchange and queue")
        await management.declare_exchange(ExchangeSpecification(name=exchange_name))

        await management.declare_queue(
            QuorumQueueSpecification(name=queue_name)
        )

        print("binding queue to exchange")
        bind_name = await management.bind(
            ExchangeToQueueBindingSpecification(
                source_exchange=exchange_name,
                destination_queue=queue_name,
                binding_key=routing_key,
            )
        )

        addr = AddressHelper.exchange_address(exchange_name, routing_key)
        addr_queue = AddressHelper.queue_address(queue_name)

        print("create a publisher and publish a test message")
        publisher = await connection.publisher(addr)

        print("purging the queue")
        messages_purged = await management.purge_queue(queue_name)
        print("messages purged: " + str(messages_purged))

        # publish messages
        for i in range(MESSAGES_TO_PUBLISH):
            status = await publisher.publish(
                Message(body=Converter.string_to_bytes("test message {} ".format(i)))
            )
            if status.remote_state == OutcomeState.ACCEPTED:
                print("message accepted")

        await publisher.close()

        print("create a consumer and consume the test message - press control + c to terminate to consume")
        
        handler = MyMessageHandler()
        consumer = await connection.consumer(addr_queue, message_handler=handler)

        # Create a task for the consumer
        consumer_task = asyncio.create_task(consumer.run())

        try:
            # Wait indefinitely until interrupted
            await consumer_task
        except KeyboardInterrupt:
            print("consumption interrupted by user")
            # Cancel the consumer task
            consumer_task.cancel()
            try:
                await consumer_task
            except asyncio.CancelledError:
                pass
        finally:
            print("cleanup")
            try:
                await consumer.close()
            except Exception as e:
                print(f"Error during cleanup: {e}")

        print("unbind")
        await management.unbind(bind_name)

        print("delete queue")
        await management.delete_queue(queue_name)

        print("delete exchange")
        await management.delete_exchange(exchange_name)

        print("closing connections")
        await management.close()


if __name__ == "__main__":
    asyncio.run(main())

@dadodimauro dadodimauro marked this pull request as ready for review November 8, 2025 22:33
@Gsantomaggio
Copy link
Member

Thank you @dadodimauro ,
Can you reproduce the segmentation fault without the asynchronous part?
I'm wondering if it is a bug/problem in the clib or related to the async part in some way.

@dadodimauro
Copy link
Author

dadodimauro commented Nov 9, 2025

I wasn't able to reproduce it in the sync version. Moreover running the script I provided the segmentation fault isn't deterministic (50% of the times). I think the problem is in how the task running on the separate thread is interrupted, so it shouldn't be a problem with the library.

@Gsantomaggio
Copy link
Member

Gsantomaggio commented Nov 10, 2025

@dadodimauro forgot to say that our company did some changes around open source contribution, and now it's required that you sign a contributor license agreement (CLA) before we can accept your PR.

The process is explained here in this repo README: https://github.com/rabbitmq/cla

Would you review and sign this CLA?

@Gsantomaggio
Copy link
Member

I think the problem is in how the task running on the separate thread is interrupted,

The PR's code does not impact the standard library, so we could mark the feature as experimental and then wait for further feedback. The library is not currently popular, so we have time to understand the problem.

We can add some documentation to the README.

WDYT @Zerpet @lukebakken

@dadodimauro
Copy link
Author

Instead of using the sync Consumer run() and stop() method, I reimplemented them in the AsyncConsumer class to allow the consumer to stop gracefully (as shown in the example).

I’m still not completely satisfied with the current implementation, introducing something like a `loop_forever() method on the consumer that handles signals and signal handlers could make the library’s usage easier and more Pythonic. However, this should be sufficient for now.


@Gsantomaggio I signed the CLA

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants