• Stars
    star
    204
  • Rank 192,063 (Top 4 %)
  • Language
    Python
  • License
    BSD 2-Clause "Sim...
  • Created over 9 years ago
  • Updated 10 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Kale is a python task worker library that supports priority queues on Amazon SQS

Kale: Distributed task worker from Nextdoor

Apache Build Status

Kale is a python task worker library that supports priority queues on Amazon SQS.

Check out our blog post - Nextdoor Taskworker: Simple, Efficient & Scalable

How does it work?

Kale-based Taskworker

Like other distributed task queue system, publishers send task messages to queues and workers fetch messages from queues. For now, Kale supports only Amazon SQS for the queue.

Publisher

A publisher can be any python program that imports a Kale-based task class and invokes the publish function of this class. For example, if a task class looks like this:

# tasks.py

class MyTask:
    def run_task(self, arg1, arg2, *args, **kwargs):
        # Do something

Then the publisher publishes a task to Amazon SQS, which normally takes 10s miliseconds to return:

import tasks
tasks.MyTask.publish(None, arg1, arg2)

The publish() function is a static method of a task class. Other than the first parameter, which can usually be None, it has the same signiture as the run_task() method. A worker process, which may run on a different machine, will pick up the message and execute run_task() method of the task.

While ndkale is usable out of the box, the first parameter in publish(app_data, *args, *kwargs) is designed for more complex situations where certain state may need to be passed outside the context of the actual task parameters. One example of this might be to pass the environment. The app_data must be pickleable so that in can be encoded and inserted into the SQS message.

The default task object will be populated with an app_data attribute, but the default worker will not use it. You will need to extend the default Worker or Task class to take advantage of app_data.

Worker

task lifecycle

A worker process runs an infinite loop. For each iteration, it does the following things:

  1. It runs a queue selection algorithm (select_queue) to decides which queue to fetch tasks from;
  2. It fetches a batch of tasks from a queue (get_messages);
  3. It runs tasks one by one in the same batch (run_task);
  4. Finish up.
    1. If a task succeeded, it'll be deleted from the queue;
    2. If a task runs too long (exceeding time_limit that is a per task property for task SLA) or it fails, it'll be put back to the queue and other workers will pick it up in the future (if retry is allowed);
    3. If a batch of tasks runs too long, exceeding visibility timeout that is a per queue property for task batch SLA, then unfinished tasks will be put back to the queue and other workers will pick them up in the future.
  5. It exits this iteration and enters next iteration and repeat the above steps.

Queue Selection Algorithm

Code: kale/queue_selector.py

A good queue selection algorithm has these requirements:

  1. higher priority queues should have more chances to be selected than lower priority queues;
  2. it should not starve low priority queues;
  3. it should not send too many requests to SQS while retrieving no task, which is waste of compute resource and Amazon charges us by the number of requests.
  4. it should not wait on empty queues for too long, avoiding waste of compute resources.

We experimented and benchmarked quite a few queue selection algorithms. We end up using an improved version of lottery algorithm, ReducedLottery, which fulfill the above requirements.

ReducedLottery works like this:

    Initialize the lottery pool with all queues
    while lottery pool is not empty:
        Run lottery based on queue priority to get a queue who wins the jackpot
        Short poll SQS to see if the selected queue is empty
        if the selected queue is not empty:
            return queue
        else:
            Remove this queue from the lottery pool
    Reset the lottery pool with all queues
    Return whatever queue who wins the jackpot

The beauty of ReducedLottery:

  • It prefers higher priority queues, as higher priority queues get more lottery tickets and have higher chances to win the jackpot. Thus, requirement 1 is fulfilled.
  • It uses randomness to avoid starvation. Lower priority queues still have chance to win the jackpot. Thus, requirement 2 is fulfilled.
  • If the selected queue is empty, SQS will automatically let task worker long poll on the queue, avoiding sending too many requests (short polls). Thus, requirement 3 is fulfilled.
  • It excludes known empty queues from the lottery pool. Only when all queues are empty can it returns an empty queue. So, it's unlikely to long poll on an empty queue. Thus, requirement 4 is fulfilled.

Settings

There are two types of settings, worker config and queue config.

Worker config

Settings are specified in settings modules, including AWS confidentials, queues config, queue selection algorithm, ...

Settings modules are loaded in such order:

  • kale.default_settings
  • the module specified via KALE_SETTINGS_MODULE environment variable

Here's an example

    import os

    AWS_REGION = 'us-west-2'

    #
    # Production settings
    # (use this for prod to talk to Amazon SQS)

    # MESSAGE_QUEUE_USE_PROXY = False
    # AWS_ACCESS_KEY_ID = 'AWS KEY ID'
    # AWS_SECRET_ACCESS_KEY = ''AWS SECRET KEY

    #
    # Development settings
    # (use this for dev to talk to ElasticMQ, which is SQS emulator)

    # Using elasticmq to emulate SQS locally
    MESSAGE_QUEUE_USE_PROXY = True
    MESSAGE_QUEUE_PROXY_PORT = 9324
    MESSAGE_QUEUE_PROXY_HOST = os.getenv('MESSAGE_QUEUE_PROXY_HOST', '0.0.0.0')
    AWS_ACCESS_KEY_ID = 'x'
    AWS_SECRET_ACCESS_KEY = 'x'

    QUEUE_CONFIG = 'taskworker/queue_config.yaml'

    # SQS limits per message size, bytes
    # It can be set anywhere from 1024 bytes (1KB), up to 262144 bytes (256KB).
    # See http://aws.amazon.com/sqs/faqs/
    SQS_TASK_SIZE_LIMIT = 256000

    QUEUE_SELECTOR = 'kale.queue_selector.ReducedLottery'

Settings in the later modules overwrite those in the early-loaded modules.

Queue config

All queues and their properties are in a queues config yaml file whose path is specified in the above settings modules.

Here's an example

    # task SLA: 60/10 = 6 seconds
    high_priority:
        name: high_priority
        priority: 100
        batch_size: 10
        visibility_timeout_sec: 60
        long_poll_time_sec: 1
        num_iterations: 10

    # task SLA: 60 / 10 = 6 seconds
        default:
        name: default
        priority: 40
        batch_size: 10
        visibility_timeout_sec: 60
        long_poll_time_sec: 1
        num_iterations: 5

    # task SLA: 60 / 10 = 6 seconds
    low_priority:
        name: low_priority
        priority: 5
        batch_size: 10
        visibility_timeout_sec: 60
        long_poll_time_sec: 5
        num_iterations: 5

How to implement a distributed task worker system using Kale

Install kale

From source code

python setup.py install

Using pip (from github repo)

#
# Put this in requirements.txt, then run
#    pip install -r requirements.txt
#

# If you want the latest build
git+https://github.com/Nextdoor/ndkale.git#egg=ndkale

# Or put this if you want a specific commit
git+https://github.com/Nextdoor/ndkale.git@67f873ed7b0a8131cc8d72453d749ffb389d695f

#
# Run from command line
#

pip install -e git+https://github.com/Nextdoor/ndkale.git#egg=ndkale

(We'll upload the package to PyPI soon.)

Example implementation

See code in the example/ directory.

More Repositories

1

ndscheduler

A flexible python library for building your own cron-like system, with REST APIs and a Web UI.
Python
1,080
star
2

bender

Bender - Serverless ETL Framework
Java
185
star
3

cspm_evaluation_matrix

Nextdoor's Cloud Security Posture Management (CSPM) Evaluation Matrix
51
star
4

pg-bifrost

PostgreSQL Logical Replication tool into Kinesis, S3 and RabbitMQ
Go
41
star
5

zkwatcher.orig

Service monitoring daemon that registers/deregisters servers in Apache ZooKeeper
Python
37
star
6

kingpin

Deployment Automation Engine
Python
27
star
7

ndserviceregistry

Nextdoor ServiceRegistry Foundational Python Class
Python
26
star
8

git-change

Git command to create and manage Gerrit Code Review changes
Python
22
star
9

puppet_zkwatcher

Puppet library and manifest for use with zk_watcher
Ruby
20
star
10

conductor-open

Continuous Deployment
Go
17
star
11

alphabot

Open source python bot to chat with Slack and other platforms.
Python
14
star
12

gogo

Go Link Shortening Service
Python
10
star
13

NDRefresh

Flexible pull to refresh control for iOS, written in Swift.
Swift
10
star
14

buckle

Bind your command-line toolbelt together
Python
10
star
15

nd_okta_auth

Okta/AWS SAML Authenticator with automatic credential refresh!
Python
8
star
16

helm-set-image-tag-action

Shell
8
star
17

puppet_thycotic

Puppet library for accessing 'secrets' stored in Thycotic's 'Secret Server'
Ruby
8
star
18

corridor

Corridor lets you easily match URLs and extract their values
Swift
5
star
19

puppet-strongswan

Manages StrongSwan on a host with Puppet
Puppet
5
star
20

zkmonitor

Zookeeper Monitoring Daemon
Python
5
star
21

code-crypt

Repository-based application secret Python library (w/ CLI)
Python
4
star
22

drydocker

A simple Phabricator Drydock Host as a Docker Container
Makefile
4
star
23

conda_lockfile

Rust
4
star
24

dutch-boy

A Memory Leak Detector Plugin for Nose
Python
4
star
25

airflow_examples

Python
3
star
26

k8s-charts

Internal Helm Chart Repository
Smarty
3
star
27

tornado_rest_client

Tornado REST Client Framework
Python
3
star
28

NDPhraseParser

A library that allows for formatting strings with context, like Python.
Objective-C
3
star
29

public-ops-tools

Rundeck scripts, etc. that are publicly accessible.
Ruby
2
star
30

docker-registry-cache

An nginx cache to be placed in front of a docker registry
Shell
2
star
31

rollbar-log4j

A Rollbar log4j and log4j2 appender
Java
2
star
32

barrelman

Watch the things that matter to you on Github.
Python
2
star
33

nextdoor-google-tag-manager

Template for implementing Nextdoor Pixel in Google Tag Manager
Smarty
2
star
34

puppet_rightscale

Puppet libraries and modules for interacting with RightScale
Ruby
1
star
35

gradon

Python
1
star
36

nextdoor.github.io

Nextdoor Opensource Projects
Makefile
1
star
37

nd-conda

Shell
1
star
38

konfigenetes

Simple Kubernetes Resource Templating
Python
1
star
39

torus-ae

Go
1
star
40

storage_scripts

Shell
1
star
41

puppet-kibana5

Kibana5 Puppet Module
Puppet
1
star
42

tools-base

Base docker image for tools containers
Dockerfile
1
star
43

shrun

A Circleci-inspired yaml-based command/test runner
Python
1
star