Celery and Twistd

I’ve been working with CeleryTwisted and Cyclone recently on a side project I have going on, however the integration between Celery and Twisted is not ideal for asynchronous programming which prompted me to jot down some notes for what I’ve worked out.  Hopefully this will be useful to someone, also if anyone wants to offer a better way I’m all ears :) .

Celery has some built in methods to allow you to check the completeness of a remote job by calling the successful() method on a returned AsyncResult object.  For example you can do something like this…

From Python interpreter…  (BTW, the add task as a sleep in it for testing purposes)

>>> from celery.execute import send_task
>>> result=send_task('tasks.add', [4,5])
>>> result.successful()
False
>>> result.successful()
False
>> result.successful()
True
>>> result.get()
9

So the question was how to wrap this Celery mechanism in something I could use from Twisted and Cyclone. Here is what I came up…

from twisted.internet import task
def monitor_task(self, celery_jobResult, reactor):
  if celery_jobResult.successful():
     self.result = celery_jobResult.get()
     print self.result
       return self.result
   else:
       self.result = None
  return task.deferLater(reactor, 0.5, self.monitor_task, celery_jobResult, reactor)

This allowed me to call Celery jobs and poll periodically to see if the task had finished before returning the result.

def printResult(result):
   print result

from twisted.internet.threads import deferToThread
from celery.execute import send_task as send_celery_task
deferred = deferToThread(send_celery_task, 'tasks.add', [4,5]).\
addCallback(monitor_task, reactor).\
addCallback(printResult)

This is a bit simplified and doesn’t take into account any error handling but this is the jist of it. If you’ve found a better more elegant way please leave a comment.

Facebooktwittergoogle_plusredditpinterestlinkedinmailFacebooktwittergoogle_plusredditpinterestlinkedinmailby feather