• Stars
    star
    110
  • Rank 310,732 (Top 7 %)
  • Language
    C#
  • License
    Other
  • Created over 11 years ago
  • Updated 7 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A robust, thread-safe, and multi-process persistent queue library for dotnet

DiskQueue

A robust, thread-safe, and multi-process persistent queue.

Based very heavily on http://ayende.com/blog/3479/rhino-queues-storage-disk

Requirements and Environment

Works on dotnet standard 2.0 platforms

Requires access to filesystem storage.

The file system is used to hold locks, so any bug in your file system may cause issues with DiskQueue -- although it tries to work around them.

Thanks to

These kind folks have helped in the development of DiskQueue

Basic Usage

  • PersistentQueue.WaitFor(...) is the main entry point. This will attempt to gain an exclusive lock on the given storage location. On first use, a directory will be created with the required files inside it.
  • This queue object can be shared among threads. Each thread should call OpenSession() to get its own session object.
  • Both IPersistentQueues and IPersistentQueueSessions should be wrapped in using() clauses, or otherwise disposed of properly. Failure to do this will result in lock contention -- you will get errors that the queue is still in use.

There is also a generic-typed PersistentQueue<T>(...); which will handle the serialisation and deserialization of elements in the queue, as long at the type is decorated with [Serializable]. You can also inject your own ISerializationStrategy<T> into your PersistentQueueSession<T> if you wish to have more granular control over Serialization/Deserialization, or if you wish to use your own serializer (e.g Json.NET).

Use new PersistentQueue<T>(...) in place of new PersistentQueue(...) or PersistentQueue.WaitFor<T>(...) in place of PersistentQueue.WaitFor(...) in any of the examples below.

Note: BinaryFormatter was removed from the default serializer. See https://learn.microsoft.com/en-us/dotnet/standard/serialization/binaryformatter-security-guide.

Example

Queue on one thread, consume on another; retry some exceptions.

Note this is one queue being shared between two sessions. You should not open two queue instances for one storage location at once.

IPersistentQueue queue = new PersistentQueue("queue_a");
var t1 = new Thread(() =>
{
	while (HaveWork())
	{
		using (var session = queue.OpenSession())
		{
			session.Enqueue(NextWorkItem());
			session.Flush();
		}
	}
});
var t2 = new Thread(()=> {
	while (true) {
		using (var session = queue.OpenSession()) {
			var data = session.Dequeue();
			if (data == null) {Thread.Sleep(100); continue;}
			
			try {
				MaybeDoWork(data)
				session.Flush();
			} catch (RetryException) {
				continue;
			} catch {
				session.Flush();
			}
		}
	}
});

t1.Start();
t2.Start();

Example

Batch up a load of work and have another thread work through it.

IPersistentQueue queue = new PersistentQueue("batchQueue");
var worker = new Thread(()=> {
	using (var session = queue.OpenSession()) {
		byte[] data;
		while ((data = session.Dequeue()) != null) {
			MaybeDoWork(data)
			session.Flush();
		}
	}
});

using (var session = queue.OpenSession()) {
	foreach (var item in LoadsOfStuff()) {
		session.Enqueue(item);
	}
	session.Flush();
}

worker.IsBackground = true; // anything not complete when we close will be left on the queue for next time.
worker.Start();

Transactions

Each session is a transaction. Any Enqueues or Dequeues will be rolled back when the session is disposed unless you call session.Flush(). Data will only be visible between threads once it has been flushed. Each flush incurs a performance penalty. By default, each flush is persisted to disk before continuing. You can get more speed at a safety cost by setting queue.ParanoidFlushing = false;

Data loss and transaction truncation

By default, DiskQueue will silently discard transaction blocks that have been truncated; it will throw an InvalidOperationException when transaction block markers are overwritten (this happens if more than one process is using the queue by mistake. It can also happen with some kinds of disk corruption). If you construct your queue with throwOnConflict: false, all recoverable transaction errors will be silently truncated. This should only be used when uptime is more important than data consistency.

using (var queue = new PersistentQueue(path, Constants._32Megabytes, throwOnConflict: false)) {
    . . .
}

Global default settings

Each instance of a PersistentQueue has it's own settings for flush levels and corruption behaviour. You can set these individually after creating an instance, or globally with PersistentQueue.DefaultSettings. Default settings are applied to all queue instances in the same process created after the setting is changed.

For example, if performance is more important than crash safety:

PersistentQueue.DefaultSettings.ParanoidFlushing = false;
PersistentQueue.DefaultSettings.TrimTransactionLogOnDispose = false;

Or if up-time is more important than detecting corruption early (often the case for embedded systems):

PersistentQueue.DefaultSettings.AllowTruncatedEntries = true;
PersistentQueue.DefaultSettings.ParanoidFlushing = true;

Removing or resetting queues

Queues create a directory and set of files for storage. You can remove all files for a queue with the HardDelete method. If you give true as the reset parameter, the directory will be written again.

This WILL delete ANY AND ALL files inside the queue directory. You should not call this method in normal use. If you start a queue with the same path as an existing directory, this method will delete the entire directory, not just the queue files.

var subject = new PersistentQueue("queue_a");
subject.HardDelete(true); // wipe any existing data and start again

Multi-Process Usage

Each IPersistentQueue gives exclusive access to the storage until it is disposed. There is a static helper method PersistentQueue.WaitFor("path", TimeSpan...) which will wait to gain access until other processes release the lock or the timeout expires. If each process uses the lock for a short time and wait long enough, they can share a storage location.

E.g.

...
void AddToQueue(byte[] data) {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		session.Enqueue(data);
		session.Flush();
	}
}

byte[] ReadQueue() {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		var data = session.Dequeue();
		session.Flush();
		return data;
	}
}
...

Cross-process Locking

DiskQueue tries very hard to make sure the lock files are managed correctly. You can use this as an inter-process lock if required. Simply open a session to acquire the lock, and dispose of the session to release it.

If you need the transaction semantics of sessions across multiple processes, try a more robust solution like https://github.com/i-e-b/SevenDigital.Messaging

More Repositories

1

jQueryFileUpload.Net

[Out of Date] .Net handler for http://aquantum-demo.appspot.com/file-upload
JavaScript
68
star
2

MefExperiments

A few quick demo projects showing Dependency injection, run-time dynamic plugins, and hot-swap code using MEF.
C#
17
star
3

RunProcess

Replacement for System.Diagnostics.Process
C#
17
star
4

SevenDigital.Messaging

A distributed broadcast/subscribe event framework based on BearBones-Messaging
C#
12
star
5

SignalR-TypeSafeClient

A type safe client interface for SignalR in C#
C#
11
star
6

SdfCad

CAD program for 3D printing using a signed-distance-field as the model
C#
9
star
7

RawIsapi

Example of a raw ISAPI module communicating with a .Net assembly
C++
8
star
8

HLS---Smooth-Encoder

An ffmpeg based live and offline encoder capable of pushing to HLS and SmoothStreaming end points.
C#
8
star
9

node-tree-surgeon

Tools for editing tree structures using a relational model
JavaScript
5
star
10

piLibs

MIRROR of piLibs
C++
5
star
11

TransactionalFileManager

A copy of http://transactionalfilemgr.codeplex.com/ for my own purposes
C#
5
star
12

Waili

[Mirror] Wavelets with Integer Lifting
C
4
star
13

Shift-it

Collection of low-level HTTP and FTP file transfer tools
C#
4
star
14

windows-ssh-server

From https://launchpad.net/windows-ssh-server. I didn't write this.
C#
4
star
15

ImageTools

A few image algorithms in C# for my own research
C#
3
star
16

Elm-Jump-Man

A very simple SMB game in Elm
Elm
3
star
17

VirtualVpn

An IKEv2/IPSEC VPN gateway that presents an application as if it was on a private network
C#
3
star
18

HomelessShelter

Some scripts for building Mono environments through Vagant (for Windows & Debian)
Ruby
3
star
19

Huygens

Cut down Cassini server for internally hosting IIS sites
C#
3
star
20

LilyGo_Tsim_A7670X

T-SIM A7670 E development setup, tests, and notes
C++
3
star
21

Form8sn

Generate PDF based templates, and fill them in from data objects
C#
2
star
22

DynamicPropertyObject

A tool to drive WinForms PropertyGrid from dynamic data
C#
2
star
23

FluentBdd

[Working, On Hold] Framework for writing BDD specs in C# 3.5+ running under NUnit
C#
2
star
24

DBSS

[Working] A toy spread-sheet engine using SQL for backing
C#
2
star
25

GitBuildPlatform

[Working, Template] A flexible multi-solution build and dependency management platform for using Powershell
PowerShell
2
star
26

blueballs

The old sonic special stage, to play with Love2d shaders
Lua
2
star
27

LessStupidPath

Like .Net's 'Path' class, but with a sane API and semantics
C#
2
star
28

Dave

A mono-space programming font
2
star
29

CsharpVideoSynthesiser

Programmatically output .mp4 files from C# based on a range of algorithms
C#
2
star
30

ImageCompress

Very simple texture compression experiment
C#
1
star
31

Phantom2

[In Progress] Lexer-less LALR parser in C# with an odd semi-fluent interface.
C#
1
star
32

FFmpegControl

[In Progress] A simplified interface for driving encoding and decoding with libavcodec
C
1
star
33

FontReaderCs

A dead simple font reader and renderer, for experimentation
C#
1
star
34

Krypton

Copy of http://krypton.codeplex.com/ with cleaned up code to be integrated with Box2dx at some point...
C#
1
star
35

metaret-npm

A node-ready version of https://github.com/glathoud/js.metaret packaged for NPM
JavaScript
1
star
36

tinyQuickIO

A minimal implementation of https://github.com/SchwabenCode/QuickIO
C#
1
star
37

WindowsJedi

Experiments in making windows more usable
C#
1
star
38

PoshMSpec

[Working] Automated MSpec file watcher and test runner
PowerShell
1
star
39

SymbolSourceSane

A copy of SymbolSource.Community, but hopefully working.
C#
1
star
40

SnivellingGit

An experimental GUI for the Git SCM.
C#
1
star
41

T.g

A simple C# generator for HTML/XML
C#
1
star
42

String_Extensions

A big pile of string manipulation methods for C#
C#
1
star
43

DiffTools

[Working, In Progress] My own diff engine, plus Neil Fraser's diff-match-patch
C#
1
star
44

Microauth

A very small OAuth server for testing
C#
1
star
45

LiteProxy

Tiny object proxying and mocking tools for C#
C#
1
star
46

HaskellKatas

A few simple little things in Haskell
Haskell
1
star
47

DispatchSharp

A library to make multi-threaded code more testable
C#
1
star
48

TypescriptServiceExperiment

Trying to unpick typescript as-a-service
JavaScript
1
star
49

HammerCalc

A calculator for every kind of nail
Java
1
star
50

SaxonNode

Saxon-CE running under Node.js; Experimental
JavaScript
1
star
51

WixBacktraceExtension

.Net exe and website extension for the WiX installer builder, with examples.
C#
1
star
52

NanBoxing

A simple example of using IEEE 754 NaNs to tag and encode data
C#
1
star
53

KVK

Experiments in relational document datastores. Key-Value-Key.
C#
1
star