RabbitMQ Broker Deployment and Message Routing Patterns on Ubuntu

Core Installation and Service Configuration

Execute the package manager to deploy the RabbitMQ server on Ubuntu:

sudo apt update
sudo apt install -y rabbitmq-server
sudo systemctl enable --now rabbitmq-server

Once the service is active, provision an administrative account and assign comprehensive access rights:

sudo rabbitmqctl add_user sys_admin Str0ng!MQpass
sudo rabbitmqctl set_permissions sys_admin ".*" ".*" ".*"
sudo rabbitmqctl set_user_tags sys_admin administrator

Activate the management interface to expose operational metrics and queue cotnrols via HTTP:

sudo rabbitmq-plugins enable rabbitmq_management

Navigate to http://<host-ip>:15672 and authenticate using the newly created credentials to monitor connections, chennels, and virtual hosts.

Client Integration with Pika

Install the Python AMQP library:

pip install pika

Asynchronous Task Processing

The producer establishes a durable channel and pushes a payload into the workflow queue. Durable queues survive broker restarts, and delivery_mode=2 ensures messages are written to disk.

import pika

def dispatch_workload(task_payload):
    params = pika.ConnectionParameters(host='localhost')
    conn = pika.BlockingConnection(params)
    ch = conn.channel()
    
    ch.queue_declare(queue='task_pipeline', durable=True)
    
    ch.basic_publish(
        exchange='',
        routing_key='task_pipeline',
        body=task_payload,
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"Dispatched: {task_payload}")
    conn.close()

The consumer processes incoming tasks sequentially. Configuring prefetch_count=1 prevents worker saturation by limiting unacknowledged deliveries per consumer instance.

import pika

def execute_callback(channel, method, props, payload):
    print(f"Executing: {payload.decode()}")
    # Simulate processing time
    import time
    time.sleep(1)
    channel.basic_ack(delivery_tag=method.delivery_tag)

def run_worker():
    params = pika.ConnectionParameters(host='localhost')
    conn = pika.BlockingConnection(params)
    ch = conn.channel()
    
    ch.queue_declare(queue='task_pipeline', durable=True)
    ch.basic_qos(prefetch_count=1)
    ch.basic_consume(queue='task_pipeline', on_message_callback=execute_callback)
    
    print("Worker initialized. Press CTRL+C to exit.")
    ch.start_consuming()

Secure Remote Connectivity

Remote clients require explicit credential validation. The PlainCredentials object encapsulates authentication data before establishing the TCP connection.

import pika

remote_creds = pika.PlainCredentials('sys_admin', 'Str0ng!MQpass')
remote_params = pika.ConnectionParameters(
    host='192.168.50.10',
    port=5672,
    virtual_host='/',
    credentials=remote_creds
)

with pika.BlockingConnection(remote_params) as conn:
    channel = conn.channel()
    # Channel operations follow...

Exchange Routing Architectures

Exchanges decouple publishers from queues, directing messages based on binding rules.

Broadcast Pattern (Fanout)

Messages are duplicated and routed to every queue attached to the exchange. Temporary, exclusive queues ensure cleanup upon client disconnect.

# Fanout Publisher
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='event_broadcast', exchange_type='fanout')

ch.basic_publish(exchange='event_broadcast', routing_key='', body='System alert: high load')
conn.close()

# Fanout Subscriber
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='event_broadcast', exchange_type='fanout')

result = ch.queue_declare(queue='', exclusive=True)
queue_id = result.method.queue
ch.queue_bind(exchange='event_broadcast', queue=queue_id)

def on_event(ch, method, props, body):
    print(f"Received broadcast: {body.decode()}")

ch.basic_consume(queue=queue_id, on_message_callback=on_event, auto_ack=True)
ch.start_consuming()

Key-Based Filtering (Direct)

Routing relies on exact string matching between the routing_key and queue bindings.

# Direct Publisher
import sys
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='log_router', exchange_type='direct')

level = sys.argv[1] if len(sys.argv) > 1 else 'info'
log_msg = ' '.join(sys.argv[2:]) or 'Default log entry'
ch.basic_publish(exchange='log_router', routing_key=level, body=log_msg)
conn.close()

# Direct Subscriber
import pika
import sys

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='log_router', exchange_type='direct')

result = ch.queue_declare(queue='', exclusive=True)
q_name = result.method.queue

for severity in sys.argv[1:]:
    ch.queue_bind(exchange='log_router', queue=q_name, routing_key=severity)

def handle_log(ch, method, props, body):
    print(f"[{method.routing_key}] {body.decode()}")

ch.basic_consume(queue=q_name, on_message_callback=handle_log, auto_ack=True)
ch.start_consuming()

Pattern Matching (Topic)

Topic exchanges evaluate routing keys using wildcard syntax: * matches exactly one word, while # matches zero or more words separated by dots.

# Topic Publisher
import sys
import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='metric_stream', exchange_type='topic')

routing_pattern = sys.argv[1] if len(sys.argv) > 1 else 'app.default.info'
data = ' '.join(sys.argv[2:]) or 'Metric payload'
ch.basic_publish(exchange='metric_stream', routing_key=routing_pattern, body=data)
conn.close()

# Topic Subscriber
import pika
import sys

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='metric_stream', exchange_type='topic')

result = ch.queue_declare(queue='', exclusive=True)
q_name = result.method.queue

for pattern in sys.argv[1:]:
    ch.queue_bind(exchange='metric_stream', queue=q_name, routing_key=pattern)

def ingest_metric(ch, method, props, body):
    print(f"Route: {method.routing_key} | Data: {body.decode()}")

ch.basic_consume(queue=q_name, on_message_callback=ingest_metric, auto_ack=True)
ch.start_consuming()

Remote Procedure Call Implementation

RPC enables synchronous commend execution across distributed nodes. The client publishes to a request queue and awaits a reply on a temporary callback queue.

# RPC Server
import pika

def calculate_fibonacci(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a

def on_rpc_call(ch, method, props, body):
    num = int(body.decode())
    print(f"Processing fib({num})")
    result = calculate_fibonacci(num)
    
    ch.basic_publish(
        exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(result)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.queue_declare(queue='compute_queue')
ch.basic_qos(prefetch_count=1)
ch.basic_consume(queue='compute_queue', on_message_callback=on_rpc_call)
print("RPC server ready.")
ch.start_consuming()

# RPC Client
import pika
import uuid

def execute_remote_call(n):
    conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    ch = conn.channel()
    
    callback_res = ch.queue_declare(queue='', exclusive=True)
    callback_q = callback_res.method.queue
    corr_id = str(uuid.uuid4())
    
    def handle_reply(ch, method, props, body):
        if props.correlation_id == corr_id:
            nonlocal response
            response = int(body.decode())
    
    ch.basic_consume(queue=callback_q, on_message_callback=handle_reply, auto_ack=True)
    
    response = None
    ch.basic_publish(
        exchange='',
        routing_key='compute_queue',
        properties=pika.BasicProperties(
            reply_to=callback_q,
            correlation_id=corr_id
        ),
        body=str(n)
    )
    
    while response is None:
        conn.process_data_events(time_limit=1)
    
    conn.close()
    return response

print(f"Remote result: {execute_remote_call(30)}")

Tags: RabbitMQ message-queue Ubuntu python pika

Posted on Sun, 10 May 2026 02:57:49 +0000 by keithschm