• Stars
    star
    160
  • Rank 234,703 (Top 5 %)
  • Language
    Python
  • License
    BSD 2-Clause "Sim...
  • Created over 10 years ago
  • Updated 5 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Simple database-backed job queue

django-db-queue

pypi release

Simple database-backed job queue. Jobs are defined in your settings, and are processed by management commands.

Asynchronous tasks are run via a job queue. This system is designed to support multi-step job workflows.

Supported and tested against:

  • Django 3.2, 4.0, 4.1
  • Python 3.7, 3.8, 3.9, 3.10, 3.11

Getting Started

Installation

Install from PIP

pip install django-db-queue

Add django_dbq to your installed apps

INSTALLED_APPS = [
    ...,
    "django_dbq",
]

Run migrations

manage.py migrate

Upgrading from 1.x to 2.x

Note that version 2.x only supports Django 3.1 or newer. If you need support for Django 2.2, please stick with the latest 1.x release.

Describe your job

In e.g. project.common.jobs:

import time


def my_task(job):
    logger.info("Working hard...")
    time.sleep(10)
    logger.info("Job's done!")

Set up your job

In project.settings:

JOBS = {
    "my_job": {
        "tasks": ["project.common.jobs.my_task"],
    },
}

Hooks

Failure Hooks

When an unhandled exception is raised by a job, a failure hook will be called if one exists enabling you to clean up any state left behind by your failed job. Failure hooks are run in your worker process (if your job fails).

A failure hook receives the failed Job instance along with the unhandled exception raised by your failed job as its arguments. Here's an example:

def my_task_failure_hook(job, e):
    ...  # clean up after failed job

To ensure this hook gets run, simply add a failure_hook key to your job config like so:

JOBS = {
    "my_job": {
        "tasks": ["project.common.jobs.my_task"],
        "failure_hook": "project.common.jobs.my_task_failure_hook",
    },
}

Creation Hooks

You can also run creation hooks, which happen just after the creation of your Job instances and are executed in the process in which the job was created, not the worker process.

A creation hook receives your Job instance as its only argument. Here's an example:

def my_task_creation_hook(job):
    ...  # configure something before running your job

To ensure this hook gets run, simply add a creation_hook key to your job config like so:

JOBS = {
    "my_job": {
        "tasks": ["project.common.jobs.my_task"],
        "creation_hook": "project.common.jobs.my_task_creation_hook",
    },
}

Start the worker

In another terminal:

python manage.py worker

Create a job

Using the name you configured for your job in your settings, create an instance of Job.

Job.objects.create(name="my_job")

Prioritising jobs

Sometimes it is necessary for certain jobs to take precedence over others. For example; you may have a worker which has a primary purpose of dispatching somewhat important emails to users. However, once an hour, you may need to run a really important job which needs to be done on time and cannot wait in the queue for dozens of emails to be dispatched before it can begin.

In order to make sure that an important job is run before others, you can set the priority field to an integer higher than 0 (the default). For example:

Job.objects.create(name="normal_job")
Job.objects.create(name="important_job", priority=1)
Job.objects.create(name="critical_job", priority=2)

Jobs will be ordered by their priority (highest to lowest) and then the time which they were created (oldest to newest) and processed in that order.

Scheduling jobs

If you'd like to create a job but have it run at some time in the future, you can use the run_after field on the Job model:

Job.objects.create(
    name="scheduled_job",
    run_after=(timezone.now() + timedelta(minutes=10)),
)

Of course, the scheduled job will only be run if your python manage.py worker process is running at the time when the job is scheduled to run. Otherwise, it will run the next time you start your worker process after that time has passed.

It's also worth noting that, by default, scheduled jobs run as part of the same queue as all other jobs, and so if a job is already being processed at the time when your scheduled job is due to run, it won't run until that job has finished. If increased precision is important, you might consider using the queue_name feature to run a separate worker dedicated to only running scheduled jobs.

Terminology

Job

The top-level abstraction of a standalone piece of work. Jobs are stored in the database (ie they are represented as Django model instances).

Task

Jobs are processed to completion by tasks. These are simply Python functions, which must take a single argument - the Job instance being processed. A single job will often require processing by more than one task to be completed fully. Creating the task functions is the responsibility of the developer. For example:

def my_task(job):
    logger.info("Doing some hard work")
    do_some_hard_work()

Workspace

The workspace is an area that can be used 1) to provide additional arguments to task functions, and 2) to categorize jobs with additional metadata. It is implemented as a Python dictionary, available on the job instance passed to tasks as job.workspace. The initial workspace of a job can be empty, or can contain some parameters that the tasks require (for example, API access tokens, account IDs etc).

When creating a Job, the workspace is passed as a keyword argument:

Job.objects.create(name="my_job", workspace={"key": value})

Then, the task function can access the workspace to get the data it needs to perform its task:

def my_task(job):
    cats_import = CatsImport.objects.get(pk=job.workspace["cats_import_id"])

Tasks within a single job can use the workspace to communicate with each other. A single task can edit the workspace, and the modified workspace will be passed on to the next task in the sequence. For example:

def my_first_task(job):
    job.workspace['message'] = 'Hello, task 2!'

def my_second_task(job):
    logger.info("Task 1 says: %s" % job.workspace['message'])

The workspace can be queried like any JSONField. For instance, if you wanted to display a list of jobs that a certain user had initiated, add user_id to the workspace when creating the job:

Job.objects.create(name="foo", workspace={"user_id": request.user.id})

Then filter the query with it in the view that renders the list:

user_jobs = Job.objects.filter(workspace__user_id=request.user.id)

Worker process

A worker process is a long-running process, implemented as a Django management command, which is responsible for executing the tasks associated with a job. There may be many worker processes running concurrently in the final system. Worker processes wait for a new job to be created in the database, and call the each associated task in the correct sequeunce.. A worker can be started using python manage.py worker, and a single worker instance is included in the development procfile.

Configuration

Jobs are configured in the Django settings.py file. The JOBS setting is a dictionary mapping a job name (eg import_cats) to a list of one or more task function paths. For example:

JOBS = {
    'import_cats': ['apps.cat_importer.import_cats.step_one', 'apps.cat_importer.import_cats.step_two'],
}

Job states

Jobs have a state field which can have one of the following values:

  • NEW (has been created, waiting for a worker process to run the next task)
  • READY (has run a task before, awaiting a worker process to run the next task)
  • PROCESSING (a task is currently being processed by a worker)
  • STOPPING (the worker process has received a signal from the OS requesting it to exit)
  • COMPLETED (all job tasks have completed successfully)
  • FAILED (a job task failed)

State diagram

state diagram

API

Model methods

Job.get_queue_depths

If you need to programatically get the depth of any queue you can run the following:

from django_dbq.models import Job

...

Job.objects.create(name="do_work", workspace={})
Job.objects.create(name="do_other_work", queue_name="other_queue", workspace={})

queue_depths = Job.get_queue_depths()
print(queue_depths)  # {"default": 1, "other_queue": 1}

Important: When checking queue depths, do not assume that the key for your queue will always be available. Queue depths of zero won't be included in the dict returned by this method.

Management commands

manage.py delete_old_jobs

There is a management command, manage.py delete_old_jobs, which deletes any jobs from the database which are in state COMPLETE or FAILED and were created more than 24 hours ago. This could be run, for example, as a cron task, to ensure the jobs table remains at a reasonable size.

manage.py worker

To start a worker:

manage.py worker [queue_name] [--rate_limit]
  • queue_name is optional, and will default to default
  • The --rate_limit flag is optional, and will default to 1. It is the minimum number of seconds that must have elapsed before a subsequent job can be run.
manage.py queue_depth

If you'd like to check your queue depth from the command line, you can run manage.py queue_depth [queue_name [queue_name ...]] and any jobs in the "NEW" or "READY" states will be returned.

Important: If you misspell or provide a queue name which does not have any jobs, a depth of 0 will always be returned.

Testing

It may be necessary to supply a DATABASE_PORT environment variable.

Windows support

Windows is supported on a best-effort basis only, and is not covered by automated or manual testing.

Code of conduct

For guidelines regarding the code of conduct when contributing to this repository please review https://www.dabapps.com/open-source/code-of-conduct/

More Repositories

1

django-log-request-id

Django middleware and log filter to attach a unique ID to every log message generated as part of a request
Python
368
star
2

django-zen-queries

Explicit control over database query execution in Django applications
Python
366
star
3

django-email-as-username

DEPRECATED: User authentication with email addresses instead of usernames.
Python
261
star
4

django-readers

A lightweight function-oriented toolkit for better organisation of business logic and efficient selection and projection of data in Django projects.
Python
184
star
5

django-user-roles

DEPRECATED: Simple role-based user permissions for Django.
Python
125
star
6

django-forms-dynamic

Resolve form field arguments dynamically when a form is instantiated
Python
117
star
7

django-user-streams

DEPRECATED: Simple, fast user news feeds for Django
Python
52
star
8

periodical

NO LONGER MAINTAINED A library for working with time and date series in Python
Python
47
star
9

django-reusable-app

DEPRECATED
Python
45
star
10

django-geosimple

DEPRECATED/NO LONGER MAINTAINED
Python
38
star
11

django-private-views

DEPRECATED: Site-wide login protection
Python
37
star
12

crab

🦀 a simple unix toolkit for working with local development environments.
Python
32
star
13

postcodeserver

A tiny JSON server for UK postcode lookups
Python
22
star
14

django-enforce-host

Middleware to redirect requests to a canonical host
Python
17
star
15

betta

Simple Bootstrap Variable Editor
JavaScript
16
star
16

django-rest-framework-serialization-spec

DEPRECATED, see https://github.com/dabapps/django-readers instead
Python
11
star
17

django-wrapwith

A Django template tag for wrapping a template block in a reusable enclosing template
Python
5
star
18

django-s3-file-upload-server

Upload files from the browser to S3 - server side implementation
Python
5
star
19

roe

DabApps' Project Development Kit
TypeScript
4
star
20

django-auth-base-template

NO LONGER MAINTAINED Switchable logged in/logged out base templates for Django
Python
4
star
21

csv-wrangler

Statically typed Python 3 CSV generation helpers
Python
3
star
22

django-s3-file-upload-client

Upload files from the browser to S3 - client side implementation
TypeScript
3
star
23

react-set-props

Store arbitrary state in redux with a similar API to setState
TypeScript
2
star
24

django-simple-robots

Simple robots.txt file serving for web applications
Python
2
star
25

django-topography

EXPERIMENTAL: Map the URL structure of a Django application
Python
1
star
26

dablab

DEPRECATED - DabApps Laboratories
HTML
1
star
27

heroku-buildpack-catfish

Compatibility shim to help build catfish-compatible repositories on Heroku
Shell
1
star
28

simple-records

Simple Readonly TypeScript Records
TypeScript
1
star
29

heroku-buildpack-cleanup

Cleanup many things from a Heroku slug after build
Shell
1
star
30

create-webpack-config

A utility for creating webpack configs with common settings
JavaScript
1
star
31

brightonbrains-2011

DEAD ☠
CSS
1
star
32

redux-create-reducer

A utility to create redux reducers from a set of handlers
TypeScript
1
star
33

heroku-buildpack-rds-certificate

Heroku buildpack to download the RDS SSL certificate and store it in the root of your app
Shell
1
star
34

django-readers-debug

A pretty-printer for debugging django-readers queryset functions
Python
1
star