Migrating Rabbitmq Kombu

It’s not common, but every now and then you may need to move tasks that have already been enqueued from one RabbitMQ instance to another — for example, if you need to upgrade your plan, or if you’re switching providers. I’ve had to do this a couple times, and the process is usually something like this:

  • Spin up a new plan/instance and start routing new tasks to this as they’re produced
  • Migrate tasks that didn’t get consumed over to the new plan for consumption
  • Remove the old plan

We’re going to focus on that second piece here. We’ve used Kombu for this, a project built around celery, the task queuing system we use for our Django app.

If you’re new to RabbitMQ and some of the terminology, I found this post super helpful. Let’s dive in and migrate tasks!

Cheers to my colleague Omar, for writing the majority of this code.

This was written as a Python script — here are the docs for argparse, if you haven’t created a Python script with command line arguments before.

def _get_cli_input():    
    parser = argparse.ArgumentParser()    
    parser.add_argument('source_broker')    
    parser.add_argument('destination_broker')    
    parser.add_argument('--queues', required=True, help='queues comma separated')        
    args = parser.parse_args()    
    brokers = [args.source_broker, args.destination_broker]    
    queues = args.queues.split(',')    
    return brokers, queues

The first arguments are the source (where your’re migrating from) and destination (where you’re migrating to) brokers. These are expected to be formatted like: amqp://user:password@domain:port/vhost, and adding some validation here to ensure that they are may not be a bad idea — I’ll leave that to you.

The final argument is a comma separated list of which queues you want to migrate tasks from.

The function then returns a list of the brokers, and a list of the queues.

BROKERS, QUEUES = _get_cli_input()
SRC_BROKER = BROKERS[0]
DEST_BROKER = BROKERS[1]
src_conn = Connection(SRC_BROKER)dest_conn = Connection(DEST_BROKER)

This just does the basic setup needed to do any producing or consuming of messages. Connections take in the broker url as an argument.

First the code:

for q in QUEUES:    
    src_channel = src_conn.channel()    
    src_exchange = Exchange(q, 'direct')    
    src_queue = Queue(q, exchange=src_exchange, routing_key=q)    
    dest_channel = dest_conn.channel()    
    dest_exchange = Exchange(q, 'direct')    
    producer = Producer(dest_channel, exchange=dest_exchange, serializer='json', routing_key=q)    
    consumer = Consumer(src_channel, src_queue, callbacks=[_process_message(producer)])    
    consumer.consume()

For each queue we need to migrate tasks from, we need to set up an exchange. If you’re curious about exchange types, I found this to be a helpful explanation. We also need a consumer to take messages off the source broker, and a producer to add messages to the destination broker.

The consumer is created with a callback, which we’ll talk about next — this is how each message that is consumed is handled. It needs to take in the destination producer as an argument so that we have access to that in the callback.

The signature of a callback is always a body and a message — because we also need access to the producer for the destination broker, we’ve defined this as a class that’s initialized with a producer — the __call__ function takes the default arguments and runs after initialization.

class _process_message():    
    def __init__(self, producer):        
        self.producer = producer            

    def __call__(self, body, message):        
        task = body.get('task')        
        eta = body.get('eta')        
        kwargs = body.get('kwargs')        
        dest_queue = Queue(q, exchange=self.producer.exchange, routing_key=q)        
        self.producer.publish(body, declare=[dest_queue])        message.ack()

Creating an instance of a Queue here and declaring it when we publish the new message means that if a queue with the name given to your producer as the routing key doesn’t already exist on your broker, it will be created. If it does already exist, the existing one will be used.

Calling message.ack() removes a message from the source broker — if you skip this step, once your migration is complete, the tasks will exist on both the source and destination queues. Leaving this off can be useful if you’re testing and don’t want to risk losing messages, but be careful to add it if there’s a chance messages will be consumed from both brokers and your tasks aren’t idempotent.

Finally, we create a loop that times out after 10 seconds and exits if there aren’t any more messages to move over. This of course assumes new messages aren’t being published to your source broker anymore — the script won’t ever exit if they are!

while True:    
    timeout = 10    
    try:        
        src_conn.drain_events(timeout=10)    
    except socket.timeout:        
        src_conn.release()        
        exit(0)

Happy migrating!