Hacker Newsnew | past | comments | ask | show | jobs | submitlogin
Show HN: Your Own Task Queue for Python (github.com/josetomastocino)
75 points by omegote on June 14, 2017 | hide | past | favorite | 49 comments


Oh wow, I was so close to embarking on a Python task queue the other day because Celery was driving me nuts. I just wanted various apps/processes to be able to fire and listen for events, something that AMQP with a topic exchange seemed great for[1]. The thing that really tripped me up was the overlap in terminology between the AMQP protocol and Celery. Sometimes a queue means a queue, but sometimes it means an exchange. How do task names and routing keys interact? Why does the celery worker die when you listen to more than 3 queues on python 3.6 [2]?

Don't get me wrong, for basic stuff celery is great, albeit with a bit of a learning curve for the beginner. (And who knows, maybe I'm just missing something when it comes to more complex applications.) I also think the devs have done a great job and are super responsive. I think they also made a wise choice to chop features a little while back.

What I would really love to see (create?) is a cut-down python task queue which deals solely and specifically with some kind of AMQP (or similar) broker. Also with a task scheduler with a UI, that is something I'm loathed to lose.

Maybe this seems ridiculous, I get the impression that everyone is talking about microservices, ZeroMQ, service discovery, billions of messages per picosecond etc. I'm talking about medium size projects where a maybe a dozen apps just need to coordinate a bit. I feel like this middle ground lacks solutions.

Sorry for the ranty comment. I suspect I'm just trying to talk myself into creating this at some point.

[1] https://www.rabbitmq.com/tutorials/tutorial-five-python.html

[2] https://github.com/celery/kombu/issues/675


Couldn't agree more.

I wanted to create a task for my web app to run every 15 minutes and I didn't realize how complicated it was going to be. I'm using AWS beanstalk and I wanted the task to run on 0,15,30,45. The "simplest" way to do that was to add a worker box and create a cron job (it uses AWS SQS as an intermediary). I want to make sending email tasks as well but that means I'm going to have to move deeper into SQS integration and I don't want to be stuck with AWS. My only other option seems to be setting up and maintaining rabbitmq/celery and that looks like a nightmare. :/


The other "simplest" way is to run the cron job on the web workers, and put something in the job to abort unless this host is the first web-worker instance. (Otherwise your jobs would run multiply on each instance.) That's what we're doing to kick off scheduled tasks for Django and it works well. It'd be very messy if the scheduled tasks created load because backend load in the web layer would really confuse autoscaling, but ours don't.

I'm also running little celery instances on each web worker, using Django as the backend. No SQS/Redis/RabbitMQ, no huge AWS lock in, no having to set up a backend worker layer. In general it's been fine for a year. You don't get good visibility into processing queue length etc but we usually have four bone-idle web workers so there is no queue length. If that might be a fit for your situation I could bang together a blog post or something.


Oh interesting. I also recently came across Huey and TaskTiger that seem to be doing similar things. Have you looked into those?


You could create a stepped lambda function that emits topic every minute with the date and time in it, something like YY.MM.DD.h.m and then register one function handler for multiple topics, I.e. .0, .15,.30, .45. I use this concept extensively in a few edge devices for which I don't want to build dedicated/centralized infrastructure. It works rather well. There seems to be some drift and the timing is not 100% accurate but is ok in my setup


I can't, or rather, I don't want to use lambda because my functions need access to both the DB and the open internet. The only way I can get that working is to add a NAT Gateway to my VPC which has a relatively high fixed monthly cost. :/


You emit a topic from the lambda function, your handlers can be hosted anywhere and can access anything they need. I think you misunderstood


RabbitMQ is super simple to setup and use (from my experience). I've been quite pleased with it.


That has been my experience too. I've setup a small RabbitMQ cluster on kubernetes and it was pretty straightforward. It also comes with a handy web UI which was more than I expected.

The hardest part seems to be answering the "Why isn't X receiving messages from Y?" question. Although it didn't help that I got '*' and '#' switched around in my head (seriously, '#' is the wildcard?!)


>I wanted to create a task for my web app to run every 15 minutes

Cron would probably be a better bet then.

I love celery but I'm not a fan of celery beat.


As a proof of concept, this is pretty neat!

This bit of code in worker.py makes me uncomfortable though:

    # Deserialize the task
    d_fun, d_args = dill.loads(data)

    # Run the task
    d_fun(*d_args)

Basically the function (as in the Python object that holds the function's code, etc) to run gets pulled out of redis along with its arguments, then it gets executed.

If the entire system is trusted, this is a really cool way to let workers run code without needing to deploy it to them. Likewise, you wouldn't even need to restart the workers for a new version of the function to run - just start feeding the new code into the queue and the workers will run it. The more I think about this, the cooler it seems.

On the other hand, from the worker's perspective, this is basically a wide-open "feed me any code and I'll run it" thing.


That sort of behaviour definitely comes with some caveats, but probably works pretty well in a closed ecosystem.

A few years back we were using a service called PiCloud that effectively allowed you to do that. It went a whole lever deeper and actually bundled and shipped your local code to the remote runner so you could actually run a single function from your codebase remotely. It was a one of the best products I've used but a) their service was unreliable and b) the got acqui-hired (by Dropbox, I think).

It was something simple like:

    import cloud

    def my_long_fun():
        .....

    cloud.run(my_long_fun)
and all the deps and everything would be shipped. Next time you called a function, if it already had everything it would be able to run remotely without shipping again.


> PiCloud ... one of the best products I've used

It was so wonderful. And I was so disappointed that Multyvac (http://www.multyvac.com/) dropped the ball. I thought I'd be able to continue using the service ...

BTW, Multyvac has one of the more hilarious "trusted by" sections I've seen on the internet.


Aaron told me they were shutting down the service shortly before I was due to go on holiday. I looked at the multivac thing and decided it was just to risky to assume they would actually pick it up (especially given their Trusted By section!)

I spent a couple of weeks replacing it with celery, and then later RQ. It was the correct call, but you never know with these things. As far as it goes it was a pretty cheap lesson in not relying on external providers. Having said that, I've spent a fortune on EC2 since because I can't have micro billing like I could with picloud!


This still exists as cloudpickle, I believe?

    import cloudpickle

    pkl = cloudpickle.dumps(my_long_fun)
    # Ship it somewhere
    new_fun = cloudpickle.loads(pkl)
    new_fun(...)
https://github.com/cloudpipe/cloudpickle


Holy Moly, thanks for pointing that out!


Yep, you definitely need to secure your system in order to avoid the worker running malicious code. Now that you mention it, I've left a test vps running this on the wide and now I'm laying on the bed about to fall asleep, too tired to shut it down. Hope noone detects the redis server running and sends some simple task with os.system('rm -rf /') :D


I'm curious as to what was so difficult with picking up Celery? I found it very straightforward, not appreciably more complex than what you've laid out.

Not that Celery is the only way to go. This is good work - I'm just interested in the why.


We used Celery and it was a major pain point for us.

At the time (it was a couple of years back) there was no way to stop it from taking 2 tasks at a time from the queue. We have a smaller number of long running tasks and it made it very difficult to scale out the work. Also, the monitoring story was pretty rubbish and debugging was hard.

Now we use RQ, which is mostly pretty good but we've been slowly adjusting parts of it's behaviour to make it more suitable for us. Recently, we did some work around how it does it's TTL so we could more quickly see where workers had terminated abnormally.

We've also changed it so a given worker instance will become increasingly unlikely to take on new work as the machine it's running on (AWS) nears the end of the hour since it was started. As the work load lowers, we can remove workers from the system in the most cost effective way.


Certainly, the EC2 instance time-until-nearest-hour should be integrated into virtually every service which queues or has priority.


The number of options celery has is overwhelming. Also, the concepts used by rabbitmq and celery (the most common setup) are not straightforward to understand: when you mix queues, exchanges and routing keys you'd better stop and get a good grasp of those elements before going any further.


Yep. Celery + RabbitMQ is a pain in the ass for very simple cases. RabbitMQ is more sophisticated than most basic projects require.


I have yet to encounter a legit use case for RabbitMQ.

I'm sure they exist in theory, but it's overkill in every situation that I've seen it deployed.


well, depends on your load! at work, we have millions of message per minute (our whole infrastructure is based on rabbitmq). rabbitmq can handle that load without any issues.

also, rabbitmq has an amazing admin interface, monitoring is really easy (most things you use for monitoring support it) and the whole thing has really sane defaults (imho).

all in all, it's one of my favorites pieces of software ever!


honestly, i always found quite easy to setup. do you remember your issue?

(btw, i remember people having problems with celery because of they way they imported the tasks. if you always do full imports (as in, from a.b.c import tasks instead of from . import tasks) you shouldn't have any issues).


Celery + Redis is way simpler.


I actually found the RabbitMQ tutorial docs[1] to be an excellent introduction to the concepts, some of the best I've seen. Half an hour to an hour of reading and I had a decent grasp of what was going on. I think it would be overkill for a single application, but with multiple applications it provides some useful concepts and features. It also comes with a web UI which can be handy.

[1] https://www.rabbitmq.com/getstarted.html

PS. I'll stop evangelising soon, I promise.


Hi HN. I'm a relatively new coder so I have only just begun to play around with more sophisticated issues like running code on clusters, networking (ports and stuff like that).

I was hoping someone could kindly explain what a task queue is used for. If I understand it correctly, a task queue would be used for running automation code that is written during the development of a python project? But that doesn't sound right.

Can someone please tell me what a "task queue for python" means in context


A task queue or message queue is generally used to delegate jobs to worker processes.

Imagine you are GitHub and a user comments on an issue that has 1000 people participating. As soon as the user makes a comment you want to send an email to every user that there are new comments.

The straight forward way is to do that in the web application when the comment is posted, but that takes a lot of time and leaves the commenter staring at a loading screen until everyone is notified. (There are other reasons as well)

With a message/task queue you would create 1000 messages (each saying "send e-mail about this issue to this user"). Then you have worker processes that in a loop look at one message at a time and sends an e-email. When the message is sent the worker marks (acks) that message as handled and it is removed from the message/task queue.



The retry and locking mechanisms look pretty handy - I've had to implement those on my own whenever I've used Celery.

What's your experience been using this and Redis for your queue backend?


We wrote it for ourselves at Close.io and do use Redis for its backend... been happy with it.


If you are interested in rolling your own queue and would consider backing the queue with a postgres database then a relevant "skip locked" feature was added to postgres 9.5:

https://blog.2ndquadrant.com/what-is-select-skip-locked-for-...

assuming you are happy to serialise things as JSON or pickle then this is pretty easy to integrate with python


Thanks for sharing! It's great to see Postgres continue to push out new features.


This looks nice. Taking this one step further, is there a solution in Python where I can describe a set of tasks and their inter-dependencies and the "co-ordinator" can execute tasks as per the dependencies (including taking defined outputs of one task and feed it to the next child task). This sort of "orchestration" would be very useful e.g. in scenarios like gateway apis which need to pull data from multiple sources and combine the results as a single response.


Luigi from Spotify is very nice. We use it for computational pipelines in our neuroscience research.

https://github.com/spotify/luigi/blob/master/README.rst


To add another option, you might also want to check out dask.

The various options all have pros and cons, and fit different problems better or worse. It's worth trying a few different ones out to see what's best for you.

http://dask.pydata.org/en/latest/


Maybe something like Common Workflow Language[0]?

[0] http://www.commonwl.org/v1.0/UserGuide.html


airflow, pegasus/htcondor, luigi, pinball.


> Sure you could check celery, and after three months trying to understand the basic configuration options you'll be good to go.

A minimal celery deployment is 1 or two configuration options. Celery does have a metric ton of moving parts but distributed systems are hard and celery can do a lot of things.

This isn't the first time I have seen this sentiment. Perhaps there is a documentation problem to be solved and/or a basic API that could be provided by Celery?


It's called feature bloat and seems to be in the spirit of some projects (large web frameworks are an example). IMO, it's desirable to avoid unneeded complexity until you actually need it.


This is really really cool!

Quick questions:

Can the worker push tasks back onto the queue e.g. using r.lpush('tasks',mytask)? This would be useful in a try-catch where the worker fails its task and needs to post it back.

Can the worker send its results back to Redis, it is just r.lpush('finished',...)?


I agree that Celery is too much complex, but this solution is overly simple. If your worker process fails after the BRPOP and before running the task, then the task is lost.


You're right. This is just a proof of concept to discover the basics of task queues. You know, it's good to reinvent the wheel to know how it works. It's not meant to be used in production environments ootb.


> it's good to reinvent the wheel to know how it works

Very true :-)


rq is a library in this spirit, so if you're looking for something very simple, written in python, and using redis as a broker, I think rq is it.

And if you just want a simple broker that does the right thing, take a look at beanstalkd - I built a system to mimic GAE's queues awhile back and beanstalkd was very nice -- designed in spirit of memcached where it has a very simple api and aims to do one thing well.



Holy cow. Didnt know you could serialize functions. Nice. Also the name "dill" made me chuckle. When i understood it after a minute :)


I was excited when I saw this too but it's pretty limited, from memory you can't do methods, functions have to be globally defined, that are more limitations that make it hard to use in any general way I think.

Edit: no closures, I think




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: