Browse Source

Improve thread safety of ring buffers.

tags/v2.0.0
Andrew Belt 4 years ago
parent
commit
83b7c68cae
1 changed files with 52 additions and 34 deletions
  1. +52
    -34
      include/dsp/ringbuffer.hpp

+ 52
- 34
include/dsp/ringbuffer.hpp View File

@@ -1,4 +1,6 @@
#pragma once #pragma once
#include <atomic>

#include <dsp/common.hpp> #include <dsp/common.hpp>




@@ -6,25 +8,30 @@ namespace rack {
namespace dsp { namespace dsp {




/** A simple cyclic buffer.
S must be a power of 2.
Thread-safe for single producers and consumers.
/** Lock-free queue with fixed size and no allocations.
If S is not a power of 2, performance might be reduced, and the index could overflow in a thousand years, but it should usually be fine for your purposes.

Supports only a single producer and consumer.
To my knowledge, nobody has invented a 100% correct multiple producer/consumer lock-free ring buffer for x86_64.
*/ */
template <typename T, size_t S> template <typename T, size_t S>
struct RingBuffer { struct RingBuffer {
std::atomic<size_t> start{0};
std::atomic<size_t> end{0};
T data[S]; T data[S];
size_t start = 0;
size_t end = 0;


size_t mask(size_t i) const {
return i & (S - 1);
}
/** Adds an element to the end of the buffer.
*/
void push(T t) { void push(T t) {
size_t i = mask(end++);
size_t i = end % S;
data[i] = t; data[i] = t;
end++;
} }
/** Copies an array to the end of the buffer.
`n` must be at most S.
*/
void pushBuffer(const T* t, int n) { void pushBuffer(const T* t, int n) {
size_t i = mask(end);
size_t i = end % S;
size_t e1 = i + n; size_t e1 = i + n;
size_t e2 = (e1 < S) ? e1 : S; size_t e2 = (e1 < S) ? e1 : S;
std::memcpy(&data[i], t, sizeof(T) * (e2 - i)); std::memcpy(&data[i], t, sizeof(T) * (e2 - i));
@@ -33,11 +40,19 @@ struct RingBuffer {
} }
end += n; end += n;
} }
/** Removes and returns an element from the start of the buffer.
*/
T shift() { T shift() {
return data[mask(start++)];
size_t i = start % S;
T t = data[i];
start++;
return t;
} }
/** Removes and copies an array from the start of the buffer.
`n` must be at most S.
*/
void shiftBuffer(T* t, size_t n) { void shiftBuffer(T* t, size_t n) {
size_t i = mask(start);
size_t i = start % S;
size_t s1 = i + n; size_t s1 = i + n;
size_t s2 = (s1 < S) ? s1 : S; size_t s2 = (s1 < S) ? s1 : S;
std::memcpy(t, &data[i], sizeof(T) * (s2 - i)); std::memcpy(t, &data[i], sizeof(T) * (s2 - i));
@@ -47,13 +62,13 @@ struct RingBuffer {
start += n; start += n;
} }
void clear() { void clear() {
start = end;
start = end.load();
} }
bool empty() const { bool empty() const {
return start == end;
return start >= end;
} }
bool full() const { bool full() const {
return end - start == S;
return end - start >= S;
} }
size_t size() const { size_t size() const {
return end - start; return end - start;
@@ -64,34 +79,34 @@ struct RingBuffer {
}; };


/** A cyclic buffer which maintains a valid linear array of size S by keeping a copy of the buffer in adjacent memory. /** A cyclic buffer which maintains a valid linear array of size S by keeping a copy of the buffer in adjacent memory.
S must be a power of 2.
Thread-safe for single producers and consumers?
This is not thread-safe.
*/ */
template <typename T, size_t S> template <typename T, size_t S>
struct DoubleRingBuffer { struct DoubleRingBuffer {
T data[S * 2];
size_t start = 0;
size_t end = 0;
std::atomic<size_t> start{0};
std::atomic<size_t> end{0};
T data[2 * S];


size_t mask(size_t i) const {
return i & (S - 1);
}
void push(T t) { void push(T t) {
size_t i = mask(end++);
size_t i = end % S;
data[i] = t; data[i] = t;
data[i + S] = t; data[i + S] = t;
end++;
} }
T shift() { T shift() {
return data[mask(start++)];
size_t i = start % S;
T t = data[i];
start++;
return t;
} }
void clear() { void clear() {
start = end;
start = end.load();
} }
bool empty() const { bool empty() const {
return start == end;
return start >= end;
} }
bool full() const { bool full() const {
return end - start == S;
return end - start >= S;
} }
size_t size() const { size_t size() const {
return end - start; return end - start;
@@ -104,14 +119,15 @@ struct DoubleRingBuffer {
Pointer is invalidated when any other method is called. Pointer is invalidated when any other method is called.
*/ */
T* endData() { T* endData() {
return &data[mask(end)];
size_t i = end % S;
return &data[i];
} }
void endIncr(size_t n) { void endIncr(size_t n) {
size_t e = mask(end);
size_t e1 = e + n;
size_t i = end % S;
size_t e1 = i + n;
size_t e2 = (e1 < S) ? e1 : S; size_t e2 = (e1 < S) ? e1 : S;
// Copy data forward // Copy data forward
std::memcpy(&data[S + e], &data[e], sizeof(T) * (e2 - e));
std::memcpy(&data[S + i], &data[i], sizeof(T) * (e2 - i));


if (e1 > S) { if (e1 > S) {
// Copy data backward from the doubled block to the main block // Copy data backward from the doubled block to the main block
@@ -123,7 +139,8 @@ struct DoubleRingBuffer {
If any data is consumed, call startIncr afterwards. If any data is consumed, call startIncr afterwards.
*/ */
const T* startData() const { const T* startData() const {
return &data[mask(start)];
size_t i = start % S;
return &data[i];
} }
void startIncr(size_t n) { void startIncr(size_t n) {
start += n; start += n;
@@ -131,6 +148,7 @@ struct DoubleRingBuffer {
}; };


/** A cyclic buffer which maintains a valid linear array of size S by sliding along a larger block of size N. /** A cyclic buffer which maintains a valid linear array of size S by sliding along a larger block of size N.
This is not thread-safe.
The linear array of S elements are moved back to the start of the block once it outgrows past the end. The linear array of S elements are moved back to the start of the block once it outgrows past the end.
This happens every N - S pushes, so the push() time is O(1 + S / (N - S)). This happens every N - S pushes, so the push() time is O(1 + S / (N - S)).
For example, a float buffer of size 64 in a block of size 1024 is nearly as efficient as RingBuffer. For example, a float buffer of size 64 in a block of size 1024 is nearly as efficient as RingBuffer.
@@ -138,9 +156,9 @@ Not thread-safe.
*/ */
template <typename T, size_t S, size_t N> template <typename T, size_t S, size_t N>
struct AppleRingBuffer { struct AppleRingBuffer {
T data[N];
size_t start = 0; size_t start = 0;
size_t end = 0; size_t end = 0;
T data[N];


void returnBuffer() { void returnBuffer() {
// move end block to beginning // move end block to beginning


Loading…
Cancel
Save