Development

RabbitMQ: Use-Cases and Best Practices

Yaniv Ben Hemo February 16, 2023 6 min read

What Exactly Is RabbitMQ?

RabbitMQ is an open-source message broker software that facilitates communication between applications or systems. It acts as an intermediary that allows applications to send and receive messages asynchronously.

For example, imagine you have a web application that processes customer orders. When an order is placed, it is sent to RabbitMQ, which then passes it to a background worker process for fulfillment. The worker process can take its time to complete the fulfillment process, and once it is done, it sends a message back to RabbitMQ, which delivers it to the web application to update the order status. This way, the web application can continue to accept new orders without being blocked by the order fulfillment process.

In this scenario, RabbitMQ acts as a message broker that ensures that the message is delivered to its intended recipient, even if they are not available at the time the message was sent. It also provides features such as message persistence, delivery acknowledgment, and message routing, which allow for more complex and robust communication between applications.


Why Queues?

Message Queues are used in applications to ensure that separate components or microservices can communicate with each other asynchronously, decoupled from each other, and without any direct connection. Here are some reasons why we should use message queues:

  1. Decoupled Communication: Message queues allow independent processes to communicate with each other without tightly coupling the processes. This enables them to evolve and scale separately without affecting the other components.
  2. Asynchronous Communication: Message queues allow asynchronous communication between processes, meaning that processes do not have to wait for a response from each other. This can improve the overall responsiveness and reliability of an application.
  3. Load Balancing: Message queues can help distribute workloads evenly across multiple processing units, which can help improve performance and scalability.
  4. Durability: Message queues typically provide a persistent store for messages, meaning that messages can survive system failures and be retrieved when the system restarts.

Here’s an example in Python using the queue module:

import queue # Create a message queue 
q = queue.Queue() # Put a message in the queue 
q.put("Hello World!") # Get a message from the queue 
message = q.get() 
print(message) # Output: Hello World!

This simple example demonstrates how a message queue can be used to pass messages between processes.


Retry Queues

Retry queues in RabbitMQ are used to handle failed message processing by automatically retrying messages that have failed to be processed. This can be important for ensuring that messages are not lost or ignored in the case of temporary processing failures. Here are the steps to use retry queues in RabbitMQ using a Python code example using the pika library:

1. Declare the retry queue: First, you need to declare the retry queue in RabbitMQ. You can do this using the queue_declare method in the pika library. In this step, you can also set the x-message-ttl argument to specify the time interval after which the message will be retried.

channel.queue_declare(queue = 'retry_queue', arguments = {
  'x-dead-letter-exchange': '',
  'x-dead-letter-routing-key': 'queue',
  'x-message-ttl': 5000  # retry after 5 seconds
})

2. Declare the main queue: Next, you need to declare the main queue that will receive messages from the retry queue when they have been retried.

channel.queue_declare(queue='queue')

3. Bind the retry queue to the main queue: After declaring the retry and main queues, you need to bind the retry queue to the main queue as a dead-letter exchange. This is done using the queue_bind method in the pika library.

channel.queue_bind(exchange=”, queue=’retry_queue’,  routing_key=’queue’)

4. Process messages in the retry queue: When a message processing fails, the consumer calls ch.basic_reject with requeue=True to requeue the message in the retry queue. This will cause the message to be delivered again to the consumer after the specified x-message-ttl interval has passed.

def callback(ch, method, properties, body):
    try:
        # Your processing logic here
        print(“Processing message:”, body)
    except Exception as e:
        # If processing fails, requeue the message
        print(“Message processing failed:”, e)
        ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue=‘retry_queue’, on_message_callback=callback, auto_ack=False)

By using retry queues in RabbitMQ, you can ensure that important messages are not lost or ignored due to temporary processing failures. This helps to ensure the reliability and robustness of your messaging system.


How to Use Stateful Consumers

Stateful consumers in RabbitMQ are consumers that maintain a state or context about their processing, allowing them to keep track of their progress even across multiple message deliveries. This can be useful for implementing long-running tasks or for processing messages in a specific order. Here is a Python code example using the pika library that demonstrates how to use stateful consumers with RabbitMQ:

import pika

# Connect to RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange='test_exchange', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='test_exchange', queue=queue_name)

# Stateful consumer callback
def stateful_consumer(ch, method, properties, body):
    print(f'Received message: {body}')
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Set prefetch count
channel.basic_qos(prefetch_count=1)

# Start stateful consumer
channel.basic_consume(queue=queue_name, on_message_callback=stateful_consumer)
print('Stateful consumer started, waiting for messages...')
channel.start_consuming()

In this example, we use the pika library to interact with RabbitMQ. First, we connect to the RabbitMQ server and declare an exchange and queue. We then define a callback function, stateful_consumer, which will be called for each message received from the queue. In this function, we print the received message and acknowledge it using the basic_ack method.

Next, we set the prefetch count using the basic_qos method. The prefetch count determines the number of messages that the consumer can receive before acknowledging the previous messages. This is useful in stateful consumers, as it ensures that the consumer only processes one message at a time and maintains its state between messages.

Finally, we start the stateful consumer by calling the basic_consume method and providing the callback function. The consumer will start waiting for messages and call the callback function for each message received. The start_consuming method will block until the consumer is stopped.


How to Use Locking

Locking in RabbitMQ is a mechanism to synchronize access to a shared resource, such as a queue or a message. This is useful in scenarios where multiple consumers compete for the same message or resource. By using locks, you can ensure that only one consumer at a time has access to the resource and prevent race conditions.

Here’s a code example in Python that demonstrates how to use locks in RabbitMQ:

import pika
import time

# Connect to RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Declare exchange and queue
channel.exchange_declare(exchange='test_exchange', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='test_exchange', queue=queue_name)

# Locking callback
def lock_callback(ch, method, properties, body):
    # Try to acquire lock
    try:
        ch.basic_get(queue=queue_name, no_ack=False)
    except pika.exceptions.ChannelClosedByBroker:
        # Lock not acquired, exit
        return

    print(f'Lock acquired, processing message: {body}')
    time.sleep(10)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('Lock released, message processed')

# Start locking consumer
channel.basic_consume(queue=queue_name, on_message_callback=lock_callback)
print('Locking consumer started, waiting for messages...')
channel.start_consuming()