• Stars
    star
    124
  • Rank 288,207 (Top 6 %)
  • Language
    Ruby
  • License
    Apache License 2.0
  • Created over 13 years ago
  • Updated about 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Highly available distributed queue built on RDBMS

PerfectQueue

Build Status Coverage Status

PerfectQueue is a highly available distributed queue built on top of RDBMS. PerfectQueue provides similar API to Amazon SQS, while PerfectQueue focuses on reliability and flexible scheduling rather than scalability.

PerfectQueue introduces following concepts:

  • At-least-once semantics: Even if a worker node fails during processing a task, another worker takes over the task.
  • Multiuser-aware fair scheduling: PerfectQueue schedules tasks submitted by users who have larger resource assignment.
  • Decider: Decider is a simple mechanism to implement complex workflows on top of queues while keeping loose coupling.
  • Graceful and live restarting: PerfectQueue continues processing of long-running tasks even during restarting.
  • Idempotent task submission: All tasks have unique identifier and PerfectQueue prevents storing same task twice.
    • Note: client applications should consider how to always generate a same string for a (semantically) same task.
  • Idempotent task processing support: Workers can use a constant unique identifier to process tasks idempotently.

All you have to consider is implementing idempotent worker programs. PerfectQueue manages the other problems.

API overview

# open a queue
PerfectQueue.open(config, &block)  #=> #<Queue>

# submit a task
Queue#submit(task_id, type, data, options={})

# poll a task
# (you don't have to use this method directly. see following sections)
Queue#poll  #=> #<AcquiredTask>

# get data associated with a task
AcquiredTask#data  #=> #<Hash>

# finish a task
AcquiredTask#finish!

# retry a task
AcquiredTask#retry!

# create a task reference
Queue#[](key)  #=> #<Task>

# chack the existance of the task
Task#exists?

# force finish a task
# be aware that worker programs can't detect it
Task#force_finish!

Error classes

TaskError

##
# Workers may get these errors:
#

AlreadyFinishedError < TaskError

PreemptedError < TaskError

ProcessStopError < RuntimeError

ImmediateProcessStopError < ProcessStopError

GracefulProcessStopError < ProcessStopError

##
# Client or other situation:
#

ConfigError < RuntimeError

NotFoundError < TaskError

AlreadyExistsError < TaskError

NotSupportedError < TaskError

Example

# submit tasks
PerfectQueue.open(config) {|queue|
  data = {'key'=>"value"}
  queue.submit("task-id", "type1", data)
}

Writing a worker application

1. Implement PerfectQueue::Application::Base

class TestHandler < PerfectQueue::Application::Base
  # implement run method
  def run
    # do something ...
    puts "acquired task: #{task.inspect}"

    # call task.finish!, task.retry! or task.release!
    task.finish!
  end
end

2. Implement PerfectQueue::Application::Dispatch

class Dispatch < PerfectQueue::Application::Dispatch
  # describe routing
  route "type1" => TestHandler
  route /^regexp-.*$/ => :TestHandler  # String or Regexp => Class or Symbol
end

3. Run the worker

In a launcher script or rake file:

system('perfectqueue run -I. -rapp/workers/dispatch Dispatch')

or:

require 'perfectqueue'
require 'app/workers/dispatch'

PerfectQueue::Worker.run(Dispatch) {
  # this method is called when the worker process is restarted
  raw = File.read('config/perfectqueue.yml')
  yml = YAJL.load(raw)
  yml[ENV['RAILS_ENV'] || 'development']
}

Signal handlers

  • TERM: graceful shutdown
  • QUIT: immediate shutdown
  • USR1: graceful restart
  • HUP: immediate restart
  • USR2: reopen log files
  • INT: detach process for live restarting

Configuration

  • type: backend type (required; see following sections)
  • log: log file path (default: use stderr)
  • processors: number of child processes (default: 1)
  • processor_type: type of processor ('process' or 'thread') (default: 'process')
  • poll_interval: interval to poll tasks in seconds (default: 1.0 sec)
  • retention_time: duration to retain finished tasks (default: 300 sec)
  • task_heartbeat_interval: interval to send heartbeat requests (default: 2 sec)
  • alive_time: duration to continue a heartbeat request (default: 300 sec)
  • retry_wait: duration to retry a retried task (default: 300 sec)
  • child_kill_interval: interval to send signals to a child process (default: 2.0 sec)
  • child_graceful_kill_limit: threshold time to switch SIGTERM to SIGKILL (default: never)
  • child_heartbeat_interval: interval to send heartbeat packets to a child process (default: 2 sec)
  • child_heartbeat_limit: threshold time to detect freeze of a child process (default: 10.0 sec)
  • detach_wait: sleep for this seconds before detaching process for live restarting (default: 10.0 sec)

Backend types

rdb_compat

additional configuration:

  • url: URL to the RDBMS (example: 'mysql://user:password@host:port/database')
  • table: name of the table to use

rdb

Not implemented yet.

config/perfectqueue.yml Example

development:
  type: rdb_compat
  url: mysql2://root:@localhost:3306/perfectqueue
  table: queues

Command line management tool

Usage: perfectqueue [options] <command>

commands:
    list                             Show list of tasks
    submit <key> <type> <data>       Submit a new task
    force_finish <key>               Force finish a task
    run <class>                      Run a worker process
    init                             Initialize a backend database

options:
    -e, --environment ENV            Framework environment (default: development)
    -c, --config PATH.yml            Path to a configuration file (default: config/perfectqueue.yml)

options for submit:
    -u, --user USER                  Set user
    -t, --time UNIXTIME              Set time to run the task

options for run:
    -I, --include PATH               Add $LOAD_PATH directory
    -r, --require PATH               Require files before starting

initializing a database

# assume that the config/perfectqueue.yml exists
$ perfectqueue init

submitting a task

$ perfectqueue submit k1 user_task '{"uid":1}' -u user_1

listing tasks

$ perfectqueue list
                           key            type               user             status                   created_at                      timeout   data
                            k1       user_task             user_1            waiting    2012-05-18 13:05:31 -0700    2012-05-18 14:27:36 -0700   {"uid"=>1, "type"=>"user_task"}
                            k2       user_task             user_2            waiting    2012-05-18 13:35:33 -0700    2012-05-18 14:35:33 -0700   {"uid"=>2, "type"=>"user_task"}
                            k3     system_task                               waiting    2012-05-18 14:04:02 -0700    2012-05-22 15:04:02 -0700   {"task_id"=>32, "type"=>"system_task"}
3 entries.

force finish a tasks

$ perfectqueue force_finish k2

running a worker

$ perfectqueue run -I. -Ilib -rconfig/boot.rb -rapps/workers/task_dispatch.rb TaskDispatch

Test

Running spec utilize 'mysql2://root:@localhost/perfectqueue_test' as the connection string. Please install MySQL server at localhost then run;

$ mysql -h localhost -u root -e 'create database perfectqueue_test;'

You can run spec.

$ bundle exec rake spec

More Repositories

1

digdag

Workload Automation System
Java
1,305
star
2

serverengine

A framework to implement robust multiprocess servers like Unicorn
Ruby
758
star
3

prestogres

PostgreSQL protocol gateway for Presto distributed SQL query engine
C
292
star
4

chef-td-agent

Chef Cookbook for td-agent (Treasure Agent or Fluentd)
Ruby
127
star
5

td-agent

This repository is OBSOLETE, check gh/treasure-data/omnibus-td-agent
Shell
109
star
6

treasure-boxes

Treasure Boxes - pre-built pieces of code for developing, optimizing, and analyzing your data.
Python
109
star
7

perfectsched

Highly available distributed cron built on RDBMS
Ruby
97
star
8

omnibus-td-agent

td-agent (Fluentd) Packaging Scripts
Shell
82
star
9

trino-client-ruby

Trino/Presto client library for Ruby
Ruby
70
star
10

td-js-sdk

JavaScript SDK for Treasure Data
JavaScript
70
star
11

digdag-docs

Documents for Digdag Workflow Engine
HTML
50
star
12

td

CUI Interface
Ruby
49
star
13

elastic-beanstalk-td-agent

Example of installing td-agent on AWS Elastic Beanstalk (see .ebextentions directory)
Ruby
49
star
14

td-client-python

Treasure Data API library for Python
Python
47
star
15

pandas-td

Interactive data analysis with Pandas and Treasure Data.
Python
38
star
16

angular-treasure-overlay-spinner

Add a spinner to an element when binding is truthy.
JavaScript
36
star
17

kafka-fluentd-consumer

Kafka Consumer for Fluentd
Java
32
star
18

td-logger-ruby

Treasure Data logging library for Ruby / Rails
Ruby
27
star
19

luigi-td-example

Example Repository for Building Complex Data Pipeline with Luigi +TD
Python
24
star
20

td-ios-sdk

iOS SDK for Treasure Data
Objective-C
23
star
21

td-client-ruby

Ruby Client Library for Treasure Data
Ruby
23
star
22

td-android-sdk

Android SDK for Treasure Data
Java
22
star
23

heroku-td-agent

Treasure Agent on Heroku platform (accept HTTP logging)
Ruby
20
star
24

pytd

Treasure Data Driver for Python
Jupyter Notebook
18
star
25

luigi-td

Luigi Workflow Engine integration for Treasure Data
Python
16
star
26

td-logger-java

Treasure Data Logging Library for Java
Java
12
star
27

fluent-plugin-metricsense

MetricSense - application metrics collection plugin
Ruby
12
star
28

td-client-java

Java Client Library for Treasure Data
Java
12
star
29

td-client-node

Node.js Client Library for Treasure Data
JavaScript
12
star
30

metricsense

MetricSense for Ruby - application metrics collection API
Ruby
11
star
31

td-jdbc

JDBC Driver for Treasure Data
Java
11
star
32

embulk-input-google_analytics

Embulk Input Plugin for Google Analytics
Ruby
11
star
33

td-client-go

Go Client Library for Treasure Data
Go
11
star
34

sqsrun

Generic Amazon SQS Worker Executor Service
Ruby
10
star
35

Lead-List-from-CrunchBase-

Python
10
star
36

embulk-output-td

Embulk output plugin for Treasure Data
Java
10
star
37

td-ue4-sdk

Treasure Data Unreal Engine 4 SDK
C++
10
star
38

fluent-plugin-td-monitoring

Fluentd Plugin for Treasure Agent Monitoring Service
Ruby
10
star
39

stdout-hook

Import your event logs from STDOUT to TD or Fluentd
Ruby
9
star
40

ipython-notebook-examples

iPython notebook examples for Treasure Data
9
star
41

treasuredata_fdw

PostgreSQL Foreign Data Wrapper for Treasure Data
C
8
star
42

embulk-input-zendesk

Embulk Input Plugin for Zendesk
Java
8
star
43

embulk-input-td

Treasure Data Input Plugin for Embulk
Java
8
star
44

embulk-input-mixpanel

Embulk Input Plugin for Mixpanel
Ruby
8
star
45

td-notebooks

Jupyter notebook examples for Treasure Data
Jupyter Notebook
8
star
46

lambda-local-proxy

Local API proxy that calls an AWS Lambda function
Go
7
star
47

embulk-input-marketo

Embulk Input Plugin for Marketo
Java
7
star
48

treasure-academy-cdp

Python
6
star
49

fluent-plugin-td

Fluentd plugin for Treasure Data Service
Ruby
6
star
50

embulk-output-mailchimp

Embulk output plugin for Mailchimp
Java
6
star
51

lda-board

Auto segmentation UI using LDA
Ruby
5
star
52

fluent-plugin-librato-metrics

Librato Metrics output plugin for Fluentd event collector
Ruby
5
star
53

td-logger-python

Python logging module for td-agent
Python
4
star
54

embulk-filter-add_time

Java
4
star
55

embulk-input-jira

Embulk Input Plugin for JIRA
Java
4
star
56

RTD

Simple R client for Treasure Data
HTML
4
star
57

td-unity-sdk-package

Unity SDK for TreasureData
C#
3
star
58

eslint-plugin-td

Stores td-console config so that it can be reused
JavaScript
3
star
59

embulk-parser-query_string

Embulk parser plugin for URL-encoded key value pairs
Ruby
3
star
60

react-treasure-preview-table

console preview table for user data, react component
JavaScript
3
star
61

facebook-open-academy-fluentd-2015

This is the "course page" for Facebook Open Academy 2015 (Winter) for Fluentd
3
star
62

td-cordova-sdk

Treasure Data SDK Cordova Plugin
JavaScript
3
star
63

rsched

Generic Reliable Scheduler
Ruby
3
star
64

js-examples

HTML
3
star
65

hive-udf-neologd

Hive Japanese NLP UDFs with NEologd
Java
2
star
66

angular-treasure-focus-class

Adds a class to an element on focus and removes it when focus is lost.
JavaScript
2
star
67

prestogres-odbc

Fork of PostgreSQL ODBC driver for Prestogres
C
2
star
68

dockerfiles

The collection of Dockerfile
Shell
2
star
69

td-js-consent

This repo is for Treasure Data JavaScript Consent Management UIs
JavaScript
2
star
70

td2slack

Treasure Data to Slack app
Ruby
2
star
71

td-import-java

Treasure Data Import Tool by Java
Java
2
star
72

underwrap

A very thin wrapper of Undertow and Resteasy
Java
2
star
73

TD-API-Documentation-postman-collections

1
star
74

embulk-reporter-fluentd

Java
1
star
75

juju-layer-td-agent

Shell
1
star
76

subtree-deploy

Ruby
1
star
77

heroku-td

Heroku CLI plugin for Treasure Data
Ruby
1
star
78

td-react-native-sdk

Treasure Data React Native SDK
JavaScript
1
star
79

td2email

td2email
Ruby
1
star
80

PodSpecs

Ruby
1
star
81

treasure-academy-sql

1
star
82

td-libyaml

Binary Packaging Scripts for td-libyaml (dependency of td-agent package)
1
star
83

pytd-legacy

[DEPRECATED] This repo is being deprecated. Please check out
Python
1
star