• Stars
    star
    351
  • Rank 117,366 (Top 3 %)
  • Language
    Go
  • License
    BSD 2-Clause "Sim...
  • Created about 2 years ago
  • Updated 3 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A simple, customisable distributed job/worker in Go

taskqueue

Run Tests Go Report Card

Tasqueue is a simple, lightweight distributed job/worker implementation in Go

Installation

go get -u github.com/kalbhor/tasqueue/v2

Concepts

  • tasqueue.Broker is a generic interface to enqueue and consume messages from a single queue. Currently supported brokers are redis and nats-jetstream. Note: It is important for the broker (or your enqueue, consume implementation) to guarantee atomicity. ie : Tasqueue does not provide locking capabilities to ensure unique job consumption.
  • tasqueue.Results is a generic interface to store the status and results of jobs. Currently supported result stores are redis and nats-jetstream.
  • tasqueue.Task is a pre-registered job handler. It stores a handler functions which is called to process a job. It also stores callbacks (if set through options), executed during different states of a job.
  • tasqueue.Job represents a unit of work pushed to a queue for consumption. It holds:
    • []byte payload (encoded in any manner, if required)
    • task name used to identify the pre-registed task which will processes the job.

Server

A tasqueue server is the main store that holds the broker and the results interfaces. It also acts as a hub to register tasks.

Server Options

Server options are used to configure the server. Broker & Results are mandatory, while logger and open telemetry provider are optional. Refer to the in-memory example for an open telemetry implementation.

type ServerOpts struct {
	// Mandatory results & broker implementations.
	Broker        Broker
	Results       Results

	// Optional logger and telemetry provider.
	Logger        logf.Logger
	TraceProvider *trace.TracerProvider
}

Usage

package main

import (
	"log"

	"github.com/kalbhor/tasqueue/v2"
	rb "github.com/kalbhor/tasqueue/v2/brokers/redis"
	rr "github.com/kalbhor/tasqueue/v2/results/redis"
	"github.com/zerodha/logf"
)

func main() {
	lo := logf.New(logf.Opts{})

	broker := rb.New(rb.Options{
		Addrs:    []string{"127.0.0.1:6379"},
		Password: "",
		DB:       0,
	}, lo)
	results := rr.New(rr.Options{
		Addrs:    []string{"127.0.0.1:6379"},
		Password: "",
		DB:       0,
	}, lo)

	srv, err := tasqueue.NewServer(tasqueue.ServerOpts{
		Broker:        broker,
		Results:       results,
		Logger:        lo,
	})
	if err != nil {
		log.Fatal(err)
	}
}

Task Options

Concurrency is the number of processors run for this task. Queue is the queue to consume for this task. Task options contains callbacks that are executed one a state change.

type TaskOpts struct {
	Concurrency  uint32
	Queue        string
	SuccessCB    func(JobCtx)
	ProcessingCB func(JobCtx)
	RetryingCB   func(JobCtx)
	FailedCB     func(JobCtx)
}

Registering tasks

A task can be registered by supplying a name, handler and options. Jobs can be processed using a task registered by a particular name. A handler is a function with the signature func([]byte, JobCtx) error. It is the responsibility of the handler to deal with the []byte payload in whatever manner (decode, if required).

package tasks

import (
	"encoding/json"

	"github.com/kalbhor/tasqueue/v2"
)

type SumPayload struct {
	Arg1 int `json:"arg1"`
	Arg2 int `json:"arg2"`
}

type SumResult struct {
	Result int `json:"result"`
}

// SumProcessor prints the sum of two integer arguements.
func SumProcessor(b []byte, m tasqueue.JobCtx) error {
	var pl SumPayload
	if err := json.Unmarshal(b, &pl); err != nil {
		return err
	}

	rs, err := json.Marshal(SumResult{Result: pl.Arg1 + pl.Arg2})
	if err != nil {
		return err
	}

	m.Save(rs)

	return nil
}
srv.RegisterTask("add", tasks.SumProcessor, TaskOpts{Concurrency: 5})

Start server

Start() starts the job consumer and processor. It is a blocking function. It listens for jobs on the queue and spawns processor go routines.

srv.Start(ctx)

Job

A tasqueue job represents a unit of work pushed onto the queue, that requires processing using a registered Task. It holds a []byte payload, a task name (which will process the payload) and various options.

Job Options

// JobOpts holds the various options available to configure a job.
type JobOpts struct {
	// Optional ID passed by client. If empty, Tasqueue generates it.
	ID string

	Queue      string
	MaxRetries uint32
	Schedule   string
	Timeout    time.Duration
}

Creating a job

NewJob returns a job with the supplied payload. It accepts the name of the task, the payload and a list of options.

b, _ := json.Marshal(tasks.SumPayload{Arg1: 5, Arg2: 4})
job, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
	log.Fatal(err)
}

Enqueuing a job

Once a job is created, it can be enqueued via the server for processing. Calling srv.Enqueue returns a job id which can be used to query the status of the job.

id, err := srv.Enqueue(ctx, job)
if err != nil {
	log.Fatal(err)
}

Getting a job message

To query the details of a job that was enqueued, we can use srv.GetJob. It returns a JobMessage which contains details related to a job.

jobMsg, err := srv.GetJob(ctx, id)
if err != nil {
	log.Fatal(err)
}

Fields available in a JobMessage (embeds Meta):

// Meta contains fields related to a job. These are updated when a task is consumed.
type Meta struct {
	ID          string
	OnSuccessID string
	Status        string
	Queue         string
	Schedule      string
	MaxRetry      uint32
	Retried       uint32
	PrevErr       string
	ProcessedAt   time.Time

	// PrevJobResults contains any job result set by the previous job in a chain.
	// This will be nil if the previous job doesn't set the results on JobCtx.
	PrevJobResult []byte
}

JobCtx

JobCtx is passed to handler functions and callbacks. It can be used to view the job's meta information (JobCtx embeds Meta) and also to save arbitrary results for a job using func (c *JobCtx) Save(b []byte) error

Group

A tasqueue group holds multiple jobs and pushes them all simultaneously onto the queue, the Group is considered successful only if all the jobs finish successfully.

Creating a group

NewGroup returns a Group holding the jobs passed.

var group []tasqueue.Job

for i := 0; i < 3; i++ {
	b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
	job, err := tasqueue.NewJob("add", b)
	if err != nil {
			log.Fatal(err)
	}
	group = append(group, job)
}

grp, err := tasqueue.NewGroup(group, tasqueue.GroupOpts{})
if err != nil {
	log.Fatal(err)
}

Enqueuing a group

Once a group is created, it can be enqueued via the server for processing. Calling srv.EnqueueGroup returns a group id which can be used to query the status of the group.

groupID, err := srv.EnqueueGroup(ctx, grp)
if err != nil {
	log.Fatal(err)
}

Getting a group message

To query the details of a group that was enqueued, we can use srv.GetGroup. It returns a GroupMessage which contains details related to a group.

groupMsg, err := srv.GetGroup(ctx, groupID)
if err != nil {
	log.Fatal(err)
}

Fields available in a GroupMessage (embeds GroupMeta):

// GroupMeta contains fields related to a group job. These are updated when a task is consumed.
type GroupMeta struct {
	ID   string
	Status string
	// JobStatus is a map of individual job id -> status
	JobStatus map[string]string
}

Chain

A tasqueue chain holds multiple jobs and pushes them one after the other (after a job succeeds), the Chain is considered successful only if the final job completes successfuly.

Creating a chain

NewChain returns a chain holding the jobs passed in the order.

var chain []tasqueue.Job

for i := 0; i < 3; i++ {
	b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
	task, err := tasqueue.NewJob("add", b)
	if err != nil {
		log.Fatal(err)
	}
	chain = append(chain, task)
}

chn, err := tasqueue.NewChain(chain, tasqueue.ChainOpts{})
if err != nil {
	log.Fatal(err)
}

Enqueuing a chain

Once a chain is created, it can be enqueued via the server for processing. Calling srv.EnqueueChain returns a chain id which can be used to query the status of the chain.

chainID, err := srv.EnqueueChain(ctx, chn)
if err != nil {
	log.Fatal(err)
}

Getting results of previous job in a chain

A job in the chain can access the results of the previous job in the chain by getting JobCtx.Meta.PrevJobResults. This will contain any job result saved by the previous job by JobCtx.Save().

Getting a chain message

To query the details of a chain that was enqueued, we can use srv.GetChain. It returns a ChainMessage which contains details related to a chian.

chainMsg, err := srv.GetChain(ctx, chainID)
if err != nil {
	log.Fatal(err)
}

Fields available in a ChainMessage (embeds ChainMeta):

// ChainMeta contains fields related to a chain job.
type ChainMeta struct {
	ID string
	// Status of the overall chain
	Status string
	// ID of the current job part of chain
	JobID string
	// List of IDs of completed jobs
	PrevJobs []string
}

Result

A result is arbitrary []byte data saved by a handler or callback via JobCtx.Save().

Get Result

b, err := srv.GetResult(ctx, jobID)
if err != nil {
	log.Fatal(err)
}

Delete Result

DeleteJob removes the job's saved metadata from the store

err := srv.DeleteResult(ctx, jobID)
if err != nil {
	log.Fatal(err)
}

Credits

  • @knadh for the logo & feature suggestions

License

BSD-2-Clause-FreeBSD

More Repositories

1

MusicRepair

Fixes music metadata and adds album art.
Go
594
star
2

thirsty

Reminds you to drink water - on your terminal.
Shell
315
star
3

MusicNow

CLI tool to download songs with metadata.
Python
158
star
4

tracesite

Go implementation of the traceroute tool
Go
110
star
5

MusicTools

Python library to download, label and sort music files.
Python
88
star
6

MemeDensity

:trollface: CLI tool to let you know amount of memes in facebook feed.
Python
44
star
7

Image-Scraper

Fast concurrent image scraper
Go
35
star
8

GoTrending

A GitHub bot that stars trending Go repositories.
Go
21
star
9

Unofficial-Quora-API

🍰 Unofficial Quora API (outdated..)
Python
20
star
10

goChat

Simple tcp chat built on go.
Go
7
star
11

clickhousetest

A library to manage an ephemeral Clickhouse server for Go tests
Go
7
star
12

AsyncWrapper

An Async Wrapper for Cassandra's Python ORM (datastax)
Python
7
star
13

MusicSeize

A website that provides free music with metadata
CSS
5
star
14

mutago

Simple ID3 tagger for Go
Go
4
star
15

MIT-Hodor

A messenger bot for making life easier for Manipal students.
Python
4
star
16

dotfiles

πŸ’Ύ My configs
Vim Script
3
star
17

BOSS-Issues

A issue tracker for coding block's events
Python
2
star
18

CarbonSteps

Tracks your carbon footprint
Python
2
star
19

BangSLCM

Bang SLCM peeps with loads of emails.
Python
2
star
20

TeamSSH

Project for SIH
Python
2
star
21

Whatsapp-BotChat

πŸ’¬ Lets the cleverbot talk to your friends on whatsapp
Python
2
star
22

kalbhor.xyz

A new updated personal website
1
star
23

BoxOffice

A neat offline webpage to sort all your movies.
Python
1
star
24

Bulletin

CRUD backend for a freelance app
Go
1
star
25

kalbhor

A bio for my Github profile
1
star
26

r_server

Submission for coursera
R
1
star
27

Getting_Clean_Data

Project submission
R
1
star
28

MSeize

A music downloading service
Go
1
star
29

Scripts

A collection of my scripts.
Python
1
star
30

Coursera-Digital

HTML
1
star
31

InstaScrape

πŸ“· Image & data scraper for Instagram
Python
1
star
32

manan0308.github.io

CSS
1
star
33

Webloom-Restaurants

Python
1
star