I’ve been working with Celery, Twisted and Cyclone recently on a side project I have going on, however the integration between Celery and Twisted is not ideal for asynchronous programming which prompted me to jot down some notes for what I’ve worked out. Hopefully this will be useful to someone, also if anyone wants to offer a better way I’m all ears .
Celery has some built in methods to allow you to check the completeness of a remote job by calling the successful() method on a returned AsyncResult object. For example you can do something like this…
From Python interpreter… (BTW, the add task as a sleep in it for testing purposes)
>>> from celery.execute import send_task >>> result=send_task('tasks.add', [4,5]) >>> result.successful() False >>> result.successful() False >> result.successful() True >>> result.get() 9
So the question was how to wrap this Celery mechanism in something I could use from Twisted and Cyclone. Here is what I came up…
from twisted.internet import task def monitor_task(self, celery_jobResult, reactor): if celery_jobResult.successful(): self.result = celery_jobResult.get() print self.result return self.result else: self.result = None return task.deferLater(reactor, 0.5, self.monitor_task, celery_jobResult, reactor)
This allowed me to call Celery jobs and poll periodically to see if the task had finished before returning the result.
def printResult(result): print result from twisted.internet.threads import deferToThread from celery.execute import send_task as send_celery_task deferred = deferToThread(send_celery_task, 'tasks.add', [4,5]).\ addCallback(monitor_task, reactor).\ addCallback(printResult)
This is a bit simplified and doesn’t take into account any error handling but this is the jist of it. If you’ve found a better more elegant way please leave a comment.by