• Stars
    star
    381
  • Rank 112,502 (Top 3 %)
  • Language
    Clojure
  • License
    Eclipse Public Li...
  • Created about 11 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

a disk-backed queue for clojure

Clojars Project cljdoc badge CircleCI

This library implements a disk-backed task queue, allowing for queues that can survive processes dying, and whose size is bounded by available disk rather than memory. It is a small, purely-Clojure implementation focused entirely on the in-process use case, meaning that it is both simpler and more easily embedded than network-aware queue implementations such as Kafka and ActiveMQ.

usage

[factual/durable-queue "0.1.5"]

To interact with queues, first create a queues object by specifying a directory in the filesystem and an options map:

> (require '[durable-queue :refer :all])
nil
> (def q (queues "/tmp" {}))
#'q

This allows us to put! and take! tasks from named queues. take! is a blocking read, and will only return once a task is available or, if a timeout is defined (in milliseconds), once the timeout elapses:

> (take! q :foo 10 :timed-out!)
:timed-out!
> (put! q :foo "a task")
true
> (take! q :foo)
< :in-progress | "a task" >
> (deref *1)
"a task"

Notice that the task has a value describing its progress, and a value describing the task itself. We can get the task descriptor by dereferencing the returned task. Note that since the task is persisted to disk and anything on disk may be corrupted, this involves a checksum which may fail and throw an IOException. Any software which wants to be robust to all failure modes should always dereference within a try/catch clause.

Calling take! removed the task from the queue, but just because we've taken the task doesn't mean we've completed the action associated with it. In order to make sure the task isn't retried on restart, we must mark it as complete!.

> (put! q :foo "another task")
true
> (take! q :foo)
< :in-progress | "another task" >
> (complete! *1)
true

If our task fails and we want to re-enqueue it to be tried again, we can instead call (retry! task). Tasks which are marked for retry are added to the end of the current queue.

To get a description of the current state of the queue, we can use stats, which returns a map of queue names onto various counts:

> (stats q)
{:enqueued 2,
 :retried 0,
 :completed 1,
 :in-progress 1,
 :num-slabs 1,
 :num-active-slabs 1}
field description
:enqueued the number of tasks which have been enqueued via put!, including any pre-existing tasks on-disk when the queues were initialized
:retried the number of tasks which have been retried via retry!
:completed the number of tasks which have been completed via complete!
:in-progress the number of tasks which have been consumed via take!, but are not yet complete
:num-slabs the number of underlying files which are being used to store tasks
:num-active-slabs the number of underlying files which are currently open and mapped into memory

configuring the queues

queues can be given a number of different options, which can affect its performance and correctness.

By default, it is assumed all tasks are idempotent. This is necessary, since the process can die at any time and leave an in-progress task in an undefined state. If your tasks are not idempotent, a :complete? predicate can be defined which, on instantiation of the queues object, will scan through all pre-existing task descriptors and remove those for which the predicate returns true.

A complete list of options is as follows:

name description
:complete? a predicate for identifying already completed tasks, defaults to always returning false
:max-queue-size the maximum number of elements that can be in the queue before put! blocks, defaults to Integer/MAX_VALUE
:slab-size The size, in bytes, of the backing files for the queue. The size of a serialized task cannot be larger than this size, defaults to 64mb.
:fsync-put? Whether an fsync should be performed for each put!. Defaults to true.
:fsync-take? Whether an fsync should be performed for each take!. Defaults to false.
:fsync-threshold The maximum number of writes (puts, takes, retries, completes) that can be performed before an fsync is performed.
:fsync-interval The maximum amount of time, in milliseconds, that can elapse before an fsync is performed.

Disabling :fsync-put? will risk losing tasks if a process dies. Disabling :fsync-take? increases the chance of a task being re-run when a process dies. Disabling both will increase throughput of the queue by at least an order of magnitude (in the default configuration, ~1.5k tasks/sec on rotating disks and ~6k tasks/sec on SSD, with fsync completely disabled ~100k tasks/sec independent of hardware).

Writes can be batched using fsync-threshold and/or fsync-interval, or by explicitly calling (durable-queue/fsync q). Setting the fsync-threshold to 10 will allow for ~25k tasks/sec on SSD, and still enforces a small upper boundary on how much data can be lost when the process dies. An exception will be thrown if both per-task and batch sync options are set.

license

Copyright Β© 2015 Factual Inc

Distributed under the Eclipse Public License 1.0

More Repositories

1

aleph

Asynchronous streaming communication for Clojure - web server, web client, and raw TCP/UDP
Clojure
2,544
star
2

kibit

There's a function for that!
Clojure
1,765
star
3

seesaw

Seesaw turns the Horror of Swing into a friendly, well-documented, Clojure library
Clojure
1,445
star
4

manifold

A compatibility layer for event-driven abstractions
Clojure
1,017
star
5

etaoin

Pure Clojure Webdriver protocol implementation
Clojure
913
star
6

marginalia

Ultra-lightweight literate programming for clojure inspired by docco
Clojure
816
star
7

secretary

A client-side router for ClojureScript.
Clojure
773
star
8

hickory

HTML as data
Clojure
634
star
9

claypoole

Claypoole: Threadpool tools for Clojure
Clojure
607
star
10

pretty

Library for helping print things prettily, in Clojure - ANSI fonts, formatted exceptions
Clojure
597
star
11

rewrite-clj

Rewrite Clojure code and edn
Clojure
576
star
12

potemkin

some ideas which are almost good
Clojure
568
star
13

pomegranate

A sane Clojure API for Maven Artifact Resolver + dynamic runtime modification of the classpath
Clojure
504
star
14

gloss

speaks in bytes, so you don't have to
Clojure
483
star
15

camel-snake-kebab

A Clojure[Script] library for word case conversions
Clojure
476
star
16

cljss

Clojure Style Sheets β€” CSS-in-JS for ClojureScript
Clojure
455
star
17

clooj

clooj, a lightweight IDE for clojure
Clojure
421
star
18

byte-streams

A Rosetta stone for JVM byte representations
Clojure
417
star
19

useful

Some Clojure functions we use all the time, and so can you.
Clojure
365
star
20

metrics-clojure

A thin façade around Coda Hale's metrics library.
Clojure
343
star
21

virgil

Recompile Java code without restarting the REPL
Clojure
302
star
22

citrus

State management library for Rum
Clojure
274
star
23

ordered

Ordered sets and maps, implemented in pure clojure
Clojure
253
star
24

clj-ssh

SSH commands via jsch
Clojure
227
star
25

dirigiste

centrally-planned object and thread pools
Java
204
star
26

iapetos

A Clojure Prometheus Client
Clojure
176
star
27

primitive-math

for the discerning arithmetician
Clojure
170
star
28

humanize

Produce human readable strings in clojure
Clojure
156
star
29

digest

Digest algorithms (md5, sha1 ...) for Clojure
Clojure
156
star
30

clj-yaml

YAML encoding and decoding for Clojure
Clojure
120
star
31

byte-transforms

methods for hashing, compressing, and encoding bytes
Clojure
104
star
32

ring-buffer

A persistent ring-buffer in Clojure
Clojure
96
star
33

tentacles

An Octocat is nothing without his tentacles
Clojure
80
star
34

fs

File system utilities for Clojure. (forked from Raynes/fs)
Clojure
72
star
35

lein-marginalia

A Marginalia plugin to Leiningen
HTML
69
star
36

meta

A meta-repo for clj-commons discussions
46
star
37

ring-gzip-middleware

GZIP your Ring responses
Clojure
41
star
38

rewrite-cljs

Traverse and rewrite Clojure/ClojureScript/EDN from ClojureScript
Clojure
41
star
39

formatter

Building blocks and discussion for building a common Clojure code formatter
36
star
40

vizdeps

Visualize Leiningen dependencies using Graphviz
Clojure
33
star
41

zprint-clj

Node.js wrapper for ZPrint Clojure source code formatter
Clojure
13
star
42

infra

Infrastructure for clj-commons
Clojure
2
star
43

clj-commons.github.io

Clojure Commons Site
HTML
1
star