Battle of the Queues

RQ - Redis Queue (http://python-rq.org/docs/)

Austin Godber (@godber)

Feb 2015

One Queue to Rule Them All

Not really

RQ is Python specific, messages use pickle.

Redis is a key value store, not a highly available message broker.

RQ Workers fork new child processes for every task.

But is quick and easy!

Redis Queue - Install

  • Dependencies
    • Redis >= 2.6
    • Python
    • Thats it
  • Install
    • pip install rq

Redis Queue - Usage

Real Quick

Take any Python function

import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())

Create a queue and enqueue the function reference and arguments

from redis import Redis
from rq import Queue
from count import count_words_at_url

q = Queue(connection=Redis())
q.enqueue(
    count_words_at_url, 'http://uberhip.com'
)

Run the worker on the command line

rqworker

Run the client

rq1.py

Worker logs show

Remember, all we did was enqueue a job, specifying only the function and its arguments.

q = Queue(connection=Redis())
q.enqueue(
    count_words_at_url, 'http://uberhip.com'
)

No queue names, no priorities, no handling of the return values.

Queues

To enqueue something on a specific queue, instantiate Queue() with the queue name as an argument.

low_q = Queue('low', connection=Redis())
low_q.enqueue(
    count_words_at_url, 'http://uberhip.com'
)

You can use any queue name.

Workers can be started on specific queues by providing queue names as command line arguments to rqworker.

rqworker low

Other Queue keyword arguments include: timeout, result_ttl, ttl, depends_on, job_id, and at_font. See http://python-rq.org/docs/ for details.

The task function passed to enqueue can be EITHER a function reference OR a string. So the Workers can be implemented in a separate code base from the enqueueing code.

Queue() has the following methods and attributes:

  • len(q) - Number of jobs in queue
  • q.job_ids - List of enqueued job IDs
  • q.jobs - List of enqueued job instances
  • job = q.fetch_job('my_id') - Returns job having ID: my_id

Workers

Config file

Burst Mode

Lifecycle: Boot -> Reg -> Listen -> Prep Exec -> Fork() -> Process -> Cleanup -> GOTO 3

Custom Workers

Read the docs, we don't have time for this!

Results

Want results from your job? enqueue returns a job reference, save it, ask it for results before the default results_ttl of 500s expires.

q = Queue(connection=Redis())
job = q.enqueue(
    count_words_at_url, 'http://uberhip.com'
)
time.sleep(2)
print job.result

Jobs

What, if we don't have time for Workers, we certainly don't have time for jobs!

Well, other than to emphasize there are two times-to-live

  • result_ttl - result time to live
  • ttl - job time to live

And ... there's a meta property to which you can attach arbitrary data using dictionary syntax.

Monitoring

  • Command Line - rqinfo
  • Web - rq-dashboard

DEMO!!!!!!!

Connections, Exceptions and Tests oh My!

  • Sure, you can connect to multiple Redis instances.

(sounds a little fancy, maybe you want to try a different queueing system)

  • Exceptions end up in the failed queue, rq-dashboard is handy for reviewing this.
  • Tests! Use the SimpleWorker otherwise you might encounter trouble with fork().

Other Cool Things We Don't Have Time For

  • Job Dependencies: depends_on
  • @job decorator

Tips

  • Be careful to weigh your ttls and runtimes appropriately. The default job ttl is 180s, if your jobs runtime is expected to exceed 3min you should be changing the ttl.