• Stars
    star
    800
  • Rank 54,647 (Top 2 %)
  • Language
    C++
  • License
    MIT License
  • Created about 8 years ago
  • Updated 4 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A bounded single-producer single-consumer wait-free and lock-free queue written in C++11

SPSCQueue.h

C/C++ CI License

A single producer single consumer wait-free and lock-free fixed size queue written in C++11. This implementation is faster than both boost::lockfree::spsc and folly::ProducerConsumerQueue.

Example

SPSCQueue<int> q(1);
auto t = std::thread([&] {
  while (!q.front());
  std::cout << *q.front() << std::endl;
  q.pop();
});
q.push(1);
t.join();

See src/SPSCQueueExample.cpp for the full example.

Usage

  • SPSCQueue<T>(size_t capacity);

    Create a SPSCqueue holding items of type T with capacity capacity. Capacity needs to be at least 1.

  • void emplace(Args &&... args);

    Enqueue an item using inplace construction. Blocks if queue is full.

  • bool try_emplace(Args &&... args);

    Try to enqueue an item using inplace construction. Returns true on success and false if queue is full.

  • void push(const T &v);

    Enqueue an item using copy construction. Blocks if queue is full.

  • template <typename P> void push(P &&v);

    Enqueue an item using move construction. Participates in overload resolution only if std::is_constructible<T, P&&>::value == true. Blocks if queue is full.

  • bool try_push(const T &v);

    Try to enqueue an item using copy construction. Returns true on success and false if queue is full.

  • template <typename P> bool try_push(P &&v);

    Try to enqueue an item using move construction. Returns true on success and false if queue is full. Participates in overload resolution only if std::is_constructible<T, P&&>::value == true.

  • T *front();

    Return pointer to front of queue. Returns nullptr if queue is empty.

  • void pop();

    Dequeue first item of queue. You must ensure that the queue is non-empty before calling pop. This means that front() must have returned a non-nullptr before each call to pop(). Requires std::is_nothrow_destructible<T>::value == true.

  • size_t size();

    Return the number of items available in the queue.

  • bool empty();

    Return true if queue is currently empty.

Only a single writer thread can perform enqueue operations and only a single reader thread can perform dequeue operations. Any other usage is invalid.

Huge page support

In addition to supporting custom allocation through the standard custom allocator interface this library also supports standard proposal P0401R3 Providing size feedback in the Allocator interface. This allows convenient use of huge pages without wasting any allocated space. Using size feedback is only supported when C++17 is enabled.

The library currently doesn't include a huge page allocator since the APIs for allocating huge pages are platform dependent and handling of huge page size and NUMA awareness is application specific.

Below is an example huge page allocator for Linux:

#include <sys/mman.h>

template <typename T> struct Allocator {
  using value_type = T;

  struct AllocationResult {
    T *ptr;
    size_t count;
  };

  size_t roundup(size_t n) { return (((n - 1) >> 21) + 1) << 21; }

  AllocationResult allocate_at_least(size_t n) {
    size_t count = roundup(sizeof(T) * n);
    auto p = static_cast<T *>(mmap(nullptr, count, PROT_READ | PROT_WRITE,
                                   MAP_PRIVATE | MAP_ANONYMOUS | MAP_HUGETLB,
                                   -1, 0));
    if (p == MAP_FAILED) {
      throw std::bad_alloc();
    }
    return {p, count / sizeof(T)};
  }

  void deallocate(T *p, size_t n) { munmap(p, roundup(sizeof(T) * n)); }
};

See src/SPSCQueueExampleHugepages.cpp for the full example on how to use huge pages on Linux.

Implementation

Memory layout

The underlying implementation is based on a ring buffer.

Care has been taken to make sure to avoid any issues with false sharing. The head and tail indices are aligned and padded to the false sharing range (cache line size). Additionally the slots buffer is padded with the false sharing range at the beginning and end, this prevents false sharing with any adjacent allocations.

This implementation has higher throughput than a typical concurrent ring buffer by locally caching the head and tail indices in the writer and reader respectively. The caching increases throughput by reducing the amount of cache coherency traffic.

To understand how that works first consider a read operation in absence of caching: the head index (read index) needs to be updated and thus that cache line is loaded into the L1 cache in exclusive state. The tail (write index) needs to be read in order to check that the queue is not empty and is thus loaded into the L1 cache in shared state. Since a queue write operation needs to read the head index it's likely that a write operation requires some cache coherency traffic to bring the head index cache line back into exclusive state. In the worst case there will be one cache line transition from shared to exclusive for every read and write operation.

Next consider a queue reader that caches the tail index: if the cached tail index indicates that the queue is empty, then load the tail index into the cached tail index. If the queue was non-empty multiple read operations up until the cached tail index can complete without stealing the writer's tail index cache line's exclusive state. Cache coherency traffic is therefore reduced. An analogous argument can be made for the queue write operation.

This implementation allows for arbitrary non-power of two capacities, instead allocating a extra queue slot to indicate full queue. If you don't want to waste storage for a extra queue slot you should use a different implementation.

References:

Testing

Testing lock-free algorithms is hard. I'm using two approaches to test the implementation:

  • A single threaded test that the functionality works as intended, including that the item constructor and destructor is invoked correctly.
  • A multi-threaded fuzz test verifies that all items are enqueued and dequeued correctly under heavy contention.

Benchmarks

Throughput benchmark measures throughput between 2 threads for a queue of int items.

Latency benchmark measures round trip time between 2 threads communicating using 2 queues of int items.

Benchmark results for a AMD Ryzen 9 3900X 12-Core Processor, the 2 threads are running on different cores on the same chiplet:

Queue Throughput (ops/ms) Latency RTT (ns)
SPSCQueue 362723 133
boost::lockfree::spsc 209877 222
folly::ProducerConsumerQueue 148818 147

Cited by

SPSCQueue have been cited by the following papers:

  • Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506

About

This project was created by Erik Rigtorp <[email protected]>.

More Repositories

1

awesome-modern-cpp

A collection of resources on modern C++
HTML
11,209
star
2

awesome-lockfree

A collection of resources on wait-free and lock-free programming
1,634
star
3

MPMCQueue

A bounded multi-producer multi-consumer concurrent queue written in C++11
C++
1,048
star
4

ipc-bench

Latency benchmarks of Unix IPC mechanisms
C
531
star
5

spartan

A collection of High-Frequency trading components
C++
250
star
6

udpreplay

Replay UDP packets from a pcap file
C++
242
star
7

HashMap

An open addressing linear probing hash table, tuned for delete heavy workloads
C++
183
star
8

Seqlock

An implementation of Seqlock in C++11
C++
167
star
9

nanomq

Ultra low latency messaging kernel
C++
151
star
10

c2clat

A tool to measure CPU core to core latency
C++
104
star
11

hiccups

Measures the system induced jitter ("hiccups") a CPU bound thread experiences
C++
85
star
12

efvicap

erfvicap is a packet capture tool for network adapters from Solarflare
C++
53
star
13

statkit

Statistics toolkit for JavaScript
JavaScript
51
star
14

Function

Heap allocation free version of C++11 std::function
C++
49
star
15

TokenBucket

Lock-free implementation of the token bucket algorithm in C++
C++
44
star
16

isatomic

Test if AVX vector loads and stores are atomic
C++
20
star
17

CharConv

Fast integer to string and string to integer conversion functions
C++
16
star
18

BinarySemaphore

Binary semaphore using futexes.
C++
8
star
19

RingMap

Hybrid data structure that acts like a ring buffer and map
C++
7
star
20

dotemacs

My emacs config
Emacs Lisp
6
star
21

unats

A simple single threaded NATS client for C++.
C++
4
star
22

nordpool

Data preprocessing / munging scripts for Nordpool power market data
HTML
3
star
23

openonload

git import of https://www.openonload.org/
Shell
3
star
24

go-pikchr

Pikchr wrapped for Go using WebAssembly. No cgo required.
Go
3
star
25

sfpl

Simple functional programming language
Python
2
star
26

spark-dht11

Spark app to read a DHT11 sensor and publish humidity and temperature as Spark variables
C++
2
star
27

gmbackup

Simple tool to backup a Gmail account
Go
2
star
28

t-amp

Tripath Class-T audio amplifier
1
star
29

anything-git

Anything sources for Git
Emacs Lisp
1
star
30

go-graphviz

Graphviz wrapped for Go using WebAssembly. No cgo required.
CMake
1
star
31

chipamp

A hand crafted power amplifier of the Gainclone type
1
star
32

sysjitter2

1
star
33

gccsense

Fork of http://cx4a.org/repo/gccsense.git
Emacs Lisp
1
star