• Stars
    star
    374
  • Rank 114,346 (Top 3 %)
  • Language
    Python
  • Created about 11 years ago
  • Updated 4 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A PostgreSQL job queueing system

PQ

A transactional queue system for PostgreSQL written in Python.

PQ does the job!

It allows you to push and pop items in and out of a queue in various ways and also provides two scheduling options: delayed processing and prioritization.

The system uses a single table that holds all jobs across queues; the specifics are easy to customize.

The system currently supports only the psycopg2 database driver - or psycopg2cffi for PyPy.

The basic queue implementation is similar to Ryan Smith's queue_classic library written in Ruby, but uses SKIP LOCKED for concurrency control.

In terms of performance, the implementation clock in at about 1,000 operations per second. Using the PyPy interpreter, this scales linearly with the number of cores available.

Getting started

All functionality is encapsulated in a single class PQ.

class PQ(conn=None, pool=None, table="queue", schema=None)

The optional schema argument can be used to qualify the table with a schema if necessary.

Example usage:

from psycopg2 import connect
from pq import PQ

conn = connect('dbname=example user=postgres')
pq = PQ(conn)

For multi-threaded operation, use a connection pool such as psycopg2.pool.ThreadedConnectionPool.

You probably want to make sure your database is created with the utf-8 encoding.

To create and configure the queue table, call the create() method.

pq.create()

Queues

The pq object exposes queues through Python's dictionary interface:

queue = pq['apples']

The queue object provides get and put methods as explained below, and in addition, it also works as a context manager where it manages a transaction:

with queue as cursor:
    ...

The statements inside the context manager are either committed as a transaction or rejected, atomically. This is useful when a queue is used to manage jobs because it allows you to retrieve a job from the queue, perform a job and write a result, with transactional semantics.

Methods

Use the put(data) method to insert an item into the queue. It takes a JSON-compatible object such as a Python dictionary:

queue.put({'kind': 'Cox'})
queue.put({'kind': 'Arthur Turner'})
queue.put({'kind': 'Golden Delicious'})

Items are pulled out of the queue using get(block=True). The default behavior is to block until an item is available with a default timeout of one second after which a value of None is returned.

def eat(kind):
    print 'umm, %s apples taste good.' % kind

job = queue.get()
eat(**job.data)

The job object provides additional metadata in addition to the data attribute as illustrated by the string representation:

>>> job
<pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>

The get operation is also available through iteration:

for job in queue:
    if job is None:
        break

    eat(**job.data)

The iterator blocks if no item is available. Again, there is a default timeout of one second, after which the iterator yields a value of None.

An application can then choose to break out of the loop, or wait again for an item to be ready.

for job in queue:
    if job is not None:
        eat(**job.data)

    # This is an infinite loop!

Scheduling

Items can be scheduled such that they're not pulled until a later time:

queue.put({'kind': 'Cox'}, '5m')

In this example, the item is ready for work five minutes later. The method also accepts datetime and timedelta objects.

Priority

If some items are more important than others, a time expectation can be expressed:

queue.put({'kind': 'Cox'}, expected_at='5m')

This tells the queue processor to give priority to this item over an item expected at a later time, and conversely, to prefer an item with an earlier expected time. Note that items without a set priority are pulled last.

The scheduling and priority options can be combined:

queue.put({'kind': 'Cox'}, '1h', '2h')

This item won't be pulled out until after one hour, and even then, it's only processed subject to it's priority of two hours.

Encoding and decoding

The task data is encoded and decoded into JSON using the built-in json module. If you want to use a different implementation or need to configure this, pass encode and/or decode arguments to the PQ constructor.

Pickles

If a queue name is provided as <name>/pickle (e.g. 'jobs/pickle'), items are automatically pickled and unpickled using Python's built-in cPickle module:

queue = pq['apples/pickle']

class Apple(object):
    def __init__(self, kind):
       self.kind = kind

queue.put(Apple('Cox'))

This allows you to store most objects without having to add any further serialization code.

The old pickle protocol 0 is used to ensure the pickled data is encoded as ascii which should be compatible with any database encoding. Note that the pickle data is still wrapped as a JSON string at the database level.

While using the pickle protocol is an easy way to serialize objects, for advanced users t might be better to use JSON serialization directly on the objects, using for example the object hook mechanism in the built-in json module or subclassing JSONEncoder <https://docs.python.org/2/library/json.html#json.JSONEncoder>.

Tasks

pq comes with a higher level API that helps to manage tasks.

from pq.tasks import PQ

pq = PQ(...)

queue = pq['default']

@queue.task(schedule_at='1h')
def eat(job_id, kind):
    print 'umm, %s apples taste good.' % kind

eat('Cox')

queue.work()

tasks's jobs can optionally be re-scheduled on failure:

@queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
def eat(job_id, kind):
    # ...

Time expectations can be overriden at task call:

eat('Cox', _expected_at='2m', _schedule_at='1m')

** NOTE ** First positional argument is id of job. It's PK of record in PostgreSQL.

Thread-safety

All objects are thread-safe as long as a connection pool is provided where each thread receives its own database connection.

More Repositories

1

chameleon

Fast HTML/XML template engine for Python
Python
175
star
2

ts-postgres

Non-blocking PostgreSQL client for Node.js written in TypeScript.
TypeScript
107
star
3

macfsevents

Thread-based interface to file system observation primitives.
Python
66
star
4

skipdict

A skip dict is a Python dictionary which is permanently sorted by value.
C
19
star
5

dobbin

Dobbin is a transactional object database for Python (2.6+). It's a fast and convenient way to persist Python objects on disk.
Python
16
star
6

google-protobuf

Protocol Buffers - Google's data interchange format
C++
12
star
7

otto

WSGI-compliant HTTP publisher
Python
11
star
8

appendlayer

Append a tarball to an existing image in a container registry – without having to pull down the image locally.
Python
7
star
9

mdplayer

Spotlight-based music scheduler for Mac OS X. Playlists are text files with track queries.
Python
5
star
10

bs-virtualdom

A virtual DOM library written in OCaml/BuckleScript with a focus on ease-of-use, immutability and performance.
OCaml
5
star
11

ugandasms

SMS platform based on Python and the Django Web Framework
Python
3
star
12

static-container-registry

Tools to maintain a static container registry
Shell
2
star
13

requests-xauth

XAuth Support for Python-Requests!
Python
2
star
14

translationrecorder

Utility to record gettext translations based on calls to a wrapped translation function.
Python
2
star
15

quadtree-cd

Quadtree library for Rust for detecting collisions between shapes in bounded 2D space.
Rust
1
star
16

cvs

Community Vulnerability Surveillance using SMS
Python
1
star
17

sourcecodegen

Python (legacy, 2.4 and below) AST source code generation library
Python
1
star
18

django-process-tools

Process-based WSGI application host for Django.
Python
1
star
19

lptimesheet

LiquidPlanner Timesheet Extraction and Formatting Tool
Python
1
star
20

audreyc

Audrey Compiler
1
star
21

s3proxy

S3 HTTP proxy server in Rust
Rust
1
star