Custom Celery Tasks Including the Enqueuing Requestid to a Taskid in Django

Debugging background tasks can be challenging, given they’re not necessarily executed in the order they’re enqueued (assuming you have multiple workers processing them), may be retried if they fail the first time through, and have no direct link to the web request that enqueued them.

Including the id of the web request that enqueued them in the id of the task itself can help alleviate a lot of these debugging issues, and deobfuscate some of the “what happened, and why?” questions that may arise.

I’m using the celery library with Django — this implementation is specific to that combination.

The primary implementation outlined below was first written up in this Rover blog — I found this article super helpful, but still had a couple hurdles to figure out, so thought I’d go into more detail here! Specifically, things that weren’t covered in that blog post that I’ll cover here include:

  • What if you’re enqueuing your tasks via .delay instead of .apply_async?
  • Options for the id of background tasks that are themselves enqueued by another background task.
  • How to use the custom task class you’ve defined

The apply_async function of a celery Task takes a keyword argument called task_id, which it then passes on to the send_task method. If a task_id is not provided, within send_task, we see:

task_id = task_id or uuid()

In order to generalize our tasks to ensure that they always define their own task_id that also includes the id of the request that enqueued it, we can define a custom task class that handles this for us. Let’s look at the parts and prerequisites for this.

Of course, this strategy assumes you already have a request_id attribute available to you on all of your request objects. I covered this in some detail in a previous blog post — assuming you’ve done something similar, you can create a helper function such as:

import threading

local = threading.local()

def get_local_request_id():    
    return getattr(local, 'request_id')

Given this, within our custom task class, we can have a helper function that does something like this:

from uuid import uuid4from celery import task

def _generate_task_id():        
    request_id = get_local_request_id()        
    task_id = str(uuid4())        
    if not request_id and task.current and hasattr(task.current, 'request'):            
        request_id = task.current.request.id        
        if request_id:            
            request_id = request_id.split("_")[0]            
            task_id = '_'.join((request_id, task_id))        
    return task_id

Let’s talk through each of those pieces, one by one:

  • request_id=get_local_request_id() — we talked about this above, this will retrieve the request_id off your local thread, assuming it was set there by a piece of middleware.
  • task_id = str(uuid4()) — this just generates our unique task_id in the same way the celery library would if we didn’t do it ourselves
  • if not request_id and task.current and hasattr(task.current, 'request'): — this bit handles the scenario where a background task was enqueued by another background task. In this case, the request won’t have gone through our middleware to set the request_id on the local thread, but the all tasks have the id of the request that enqueued them, we can get it that way: request_id = task.current.request.id
  • request_id = request_id.split(“_”)[0] — This is necessary for the same reason the step above may be necessary — tasks can be enqueued from other background tasks. If you want to be able to trace that entire flow back to the beginning, you may consider skipping this step. Doing this means that each task_id will only be prepended by the id of the actual web request that led to its enqueuing, but will not include the id of the task that directly enqueued it, if there was one. We chose this approach because, especially once retries are taken into account, there is potential for the task_id to get really really long if you skip this — this can make it both less useful, and depending on how often you do this, or how deep the enqueuing of tasks from other tasks goes in your application, you could overrun the max length of 255 for a task_id!
  • task_id = '_'.join((request_id, task_id)) — this is the final step of prepending our uniquely generated task_id with the request_id! Because we’re joining them with an _, which is not a character that will be found in either of the ids themselves, we know we will always be able to separate one id from the other.

Now that we have our custom task_id…let’s use it! We’ll rely on the fact that celery’s apply_async takes in a task_id as a keyword argument. However, this can present a problem if you’re enqueuing your tasks using .delay instead. You could of course change all of your usages of .delay to be apply_async, but that strategy would require both a larger code change than necessary, and also means that any future additions of calls to .delay by engineers who aren’t aware of the custom task implementation will lose the custom task_id functionality. Better if we could handle both. Good news — we can!

In celery’s base Task class, the docstring on the delay method says this:

Star argument version of :meth:`apply_async`

And the method itself is a handy one-liner that does this:

return self.apply_async(args, kwargs)

So we can apply a similar strategy, but move it up the stack a bit. The apply_async method on our custom task class will call our helper method to generate a custom task_id, and pass that into a call to super of apply_async, which will take us into celery’s, as we need.

def apply_async(self, args=None, kwargs=None, task_id=None, **options):        
    task_id = task_id or self._generate_task_id()        
    return super(CustomTask, self).apply_async(args=args, kwargs=kwargs, task_id=task_id, **options)

And then, our custom task class’s implementation of .delay will….do the same thing! Instead of calling super on the delay function, we’ll call straight into celery’s apply_async from our delay function. That’ll look like this:

def delay(self, *args, **kwargs):        
    task_id = self._generate_task_id()        
    return super(CustomTask, self).apply_async(args, kwargs, task_id=task_id)

Once your custom task is defined, you can use it by adding it as the base to any of your already defined tasks. So something that used to be defined as:

@shared_task(ignore_result=True)

becomes:

@shared_task(ignore_result=True, base=CustomTask)

And we’re off!