• Stars
    star
    93
  • Rank 359,906 (Top 8 %)
  • Language
    C++
  • License
    MIT License
  • Created about 2 years ago
  • Updated about 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Simple, reliable & efficient distributed task queue for C++17

TOC

Overview

cppq is a simple, reliable & efficient distributed task queue for C++17.

cppq is a C++ library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable and easy to get started with.

Highlevel overview of how cppq works:

  • Client puts tasks on a queue
  • Server pulls tasks off queues and starts a thread for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Features

  • Guaranteed at least one execution of a task
  • Retries of failed tasks
  • Automatic recovery of tasks in the event of a worker crash
  • Low latency to add a task since writes are fast in Redis
  • Queue priorities
  • Scheduling of tasks
  • Periodic tasks
  • Ability to pause queue to stop processing tasks from the queue
  • Web UI to inspect and control queues and tasks
  • CLI to inspect and control queues and tasks

Quickstart

cppq is a header-only library with 2 dependencies: libuuid and hiredis.

Just include the header: #include "cppq.h" and add these flags to your build -luuid -lhiredis.

libuuid and hiredis can be installed using your distro's package manager.

For Arch Linux that'd be: sudo pacman -S hiredis util-linux-libs

Example

#include "cppq.hpp"

#include <nlohmann/json.hpp>

// Specify task type name
const std::string TypeEmailDelivery = "email:deliver";

// Define a payload type for your task
typedef struct {
  int UserID;
  std::string TemplateID;
} EmailDeliveryPayload;

// Provide conversion to JSON (optional, you can use any kind of payload)
void to_json(nlohmann::json& j, const EmailDeliveryPayload& p) {
  j = nlohmann::json{{"UserID", p.UserID}, {"TemplateID", p.TemplateID}};
}

// Helper function to create a new task with the given payload
cppq::Task NewEmailDeliveryTask(EmailDeliveryPayload payload) {
  nlohmann::json j = payload;
  // "10" is maxRetry -- the number of times the task will be retried on exception
  return cppq::Task{TypeEmailDelivery, j.dump(), 10};
}

// The actual task code
void HandleEmailDeliveryTask(cppq::Task& task) {
  // Fetch the parameters
  nlohmann::json parsedPayload = nlohmann::json::parse(task.payload);
  int userID = parsedPayload["UserID"];
  std::string templateID = parsedPayload["TemplateID"];

  // Send the email...

  // Return a result
  nlohmann::json r;
  r["Sent"] = true;
  task.result = r.dump();
  return;
}

int main(int argc, char *argv[]) {
  // Register task types and handlers
  cppq::registerHandler(TypeEmailDelivery, &HandleEmailDeliveryTask);

  // Create a Redis connection for enqueuing, you can reuse this for subsequent enqueues
  redisOptions redisOpts = {0};
  REDIS_OPTIONS_SET_TCP(&redisOpts, "127.0.0.1", 6379);
  redisContext *c = redisConnectWithOptions(&redisOpts);
  if (c == NULL || c->err) {
    std::cerr << "Failed to connect to Redis" << std::endl;
    return 1;
  }

  // Create tasks
  cppq::Task task = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 666, .TemplateID = "AH"});
  cppq::Task task2 = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 606, .TemplateID = "BH"});
  cppq::Task task3 = NewEmailDeliveryTask(EmailDeliveryPayload{.UserID = 666, .TemplateID = "CH"});

  // Enqueue a task on default queue
  cppq::enqueue(c, task, "default");
  // Enqueue a task on high priority queue
  cppq::enqueue(c, task2, "high");
  // Enqueue a task on default queue to be run at exactly 1 minute from now
  cppq::enqueue(
    c,
    task3,
    "default",
    cppq::scheduleOptions(std::chrono::system_clock::now() + std::chrono::minutes(1))
  );

  // Pause queue to stop processing tasks from it
  cppq::pause(c, "default");
  // Unpause queue to continue processing tasks from it
  cppq::unpause(c, "default");

  // This call will loop forever checking the pending queue and processing tasks in the thread pool.
  // Second argument defines queues and their priorities.
  // Third argument is time in seconds that task can be alive in active queue
  // before being pushed back to pending queue (i.e. when worker dies in middle of execution).
  cppq::runServer(redisOpts, {{"low", 5}, {"default", 10}, {"high", 20}}, 1000);
}

Web UI

If you are on Linux then web UI can be started by running: cd web && ./start.sh

Web UI is made with React/TypeScript and Flask/Python. It is still work-in-progress.

Web UI demo

CLI

CLI can be run with: cd cli && pip3 install -r requirements && python3 main.py

CLI is made with Python. It is still work-in-progress.

usage: main.py [-h] [--redis_uri REDIS_URI] [--queues] [--stats QUEUE] [--list QUEUE STATE] [--task QUEUE UUID] [--pause QUEUE] [--unpause QUEUE]

cppq CLI

options:
  -h, --help            show this help message and exit
  --redis_uri REDIS_URI
  --queues              print queues, priorities, and pause status
  --stats QUEUE         print queue statistics
  --list QUEUE STATE    list task UUIDs in queue
  --task QUEUE UUID     get task details
  --pause QUEUE         pause a queue
  --unpause QUEUE       unpause a queue

License

cppq is MIT-licensed.

Thread pooling functionality is retrofitted from https://github.com/bshoshany/thread-pool

More Repositories

1

serpentine

C++/Win32/Boost Windows RAT (Remote Administration Tool) with a multiplatform Java/Spring RESTful C2 server and Go, C++/Qt5 frontends
C++
446
star
2

rconn

rconn is a multiplatform program for creating generic reverse connections. Lets you consume services that are behind firewall or NAT without opening ports or port-forwarding.
Go
320
star
3

connmap

connmap is an X11 desktop widget that shows location of your current network peers on a world map
C
311
star
4

sysm

sysm makes your system play custom sounds when any configured system or external event happens
C++
214
star
5

rssnix

Unix-style filesystem-based RSS/Atom/JSON Feed fetcher/reader
Go
171
star
6

modreveal

Utility to find hidden Linux kernel modules
C
144
star
7

clex

clex is a simple lexer generator
C
93
star
8

revp

Reverse HTTP proxy that works on Linux, Windows, and macOS. Made with C++ and Boost.
C++
85
star
9

cparse

cparse is an LR(1) and LALR(1) parser generator
C
50
star
10

core86

[WIP] 8086 IBM PC emulator
Rust
25
star
11

eemit

Tiny event emitter library for Java
Java
23
star
12

symposium

Modern discussion forums software. Made with Go, React, and PostgreSQL.
JavaScript
22
star
13

c-skeleton

Boilerplate for basic C projects
C++
17
star
14

ccompile

[WIP] C compiler
C
5
star
15

cppdataloader

cppdataloader is a batching and caching library for C++17
C++
5
star
16

cpreprocess

[WIP] C preprocessor
C
4
star
17

uefi

[WIP] UEFI bootloader
Shell
4
star
18

8086emulate

[WIP] 8086 emulator
C
4
star
19

repotrack

Tracks comments on GitHub issues and PRs assigned to you
Python
3
star
20

dotfiles-old

Personal system configuration files, scripts, etc.
Shell
3
star
21

hnreader

HackerNews reader app for Wear OS
Kotlin
2
star
22

file-hosting

2
star
23

forumd

[WIP] Modern discussion forums software
TypeScript
2
star
24

mepsitahl

[WIP] x86_64 OS
C
2
star
25

geolocation-service

Go
1
star
26

addressbook

Go
1
star
27

8086assemble

[WIP] 8086 assembler
C
1
star
28

x86nix

[WIP] x86 OS
C
1
star
29

archive

Archived projects
C++
1
star