|
|
@@ -0,0 +1,100 @@ |
|
|
|
#include "MultiPseudoStack.h" |
|
|
|
#include <cassert> |
|
|
|
#include <cstdio> |
|
|
|
|
|
|
|
#define INVALID ((int32_t)0xffffffff) |
|
|
|
#define MAX ((int32_t)0x7fffffff) |
|
|
|
QueueListItem::QueueListItem(void) |
|
|
|
:memory(0), size(0) |
|
|
|
{ |
|
|
|
} |
|
|
|
|
|
|
|
LockFreeQueue::LockFreeQueue(qli_t *data_, int n) |
|
|
|
:data(data_), elms(n), next_r(0), next_w(0), avail(0) |
|
|
|
{ |
|
|
|
tag = new std::atomic<uint32_t>[n]; |
|
|
|
for(int i=0; i<n; ++i) |
|
|
|
tag[i] = INVALID; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
qli_t *LockFreeQueue::read(void) { |
|
|
|
retry: |
|
|
|
int8_t free_elms = avail.load(); |
|
|
|
if(free_elms <= 0) |
|
|
|
return 0; |
|
|
|
|
|
|
|
int32_t next_tag = next_r.load(); |
|
|
|
int32_t next_next_tag = (next_tag+1)&MAX; |
|
|
|
|
|
|
|
assert(next_tag != INVALID); |
|
|
|
|
|
|
|
for(int i=0; i<elms; ++i) { |
|
|
|
uint32_t elm_tag = tag[i].load(); |
|
|
|
|
|
|
|
//attempt to remove tagged element |
|
|
|
//if and only if it's next |
|
|
|
if(((uint32_t)next_tag) == elm_tag) { |
|
|
|
if(!tag[i].compare_exchange_strong(elm_tag, INVALID)) |
|
|
|
goto retry; |
|
|
|
|
|
|
|
//Ok, now there is no element that can be removed from the list |
|
|
|
//Effectively there's mutual exclusion over other readers here |
|
|
|
|
|
|
|
//Set the next element |
|
|
|
int sane_read = next_r.compare_exchange_strong(next_tag, next_next_tag); |
|
|
|
assert(sane_read && "No double read on a single tag"); |
|
|
|
|
|
|
|
//Decrement available elements |
|
|
|
int32_t free_elms_next = avail.load(); |
|
|
|
while(!avail.compare_exchange_strong(free_elms_next, free_elms_next-1)); |
|
|
|
|
|
|
|
//printf("r%d ", free_elms_next-1); |
|
|
|
|
|
|
|
return &data[i]; |
|
|
|
} |
|
|
|
} |
|
|
|
goto retry; |
|
|
|
} |
|
|
|
|
|
|
|
//Insert Node Q |
|
|
|
void LockFreeQueue::write(qli_t *Q) { |
|
|
|
retry: |
|
|
|
if(!Q) |
|
|
|
return; |
|
|
|
int32_t write_tag = next_w.load(); |
|
|
|
int32_t next_write_tag = (write_tag+1)&MAX; |
|
|
|
if(!next_w.compare_exchange_strong(write_tag, next_write_tag)) |
|
|
|
goto retry; |
|
|
|
|
|
|
|
uint32_t invalid_tag = INVALID; |
|
|
|
|
|
|
|
//Update tag |
|
|
|
int sane_write = tag[Q-data].compare_exchange_strong(invalid_tag, write_tag); |
|
|
|
assert(sane_write); |
|
|
|
|
|
|
|
//Increment available elements |
|
|
|
int32_t free_elms = avail.load(); |
|
|
|
while(!avail.compare_exchange_strong(free_elms, free_elms+1)) |
|
|
|
assert(free_elms <= 32); |
|
|
|
//printf("w%d ", free_elms+1); |
|
|
|
} |
|
|
|
|
|
|
|
MultiQueue::MultiQueue(void) |
|
|
|
:pool(new qli_t[32]), m_free(pool, 32), m_msgs(pool, 32) |
|
|
|
{ |
|
|
|
//32 instances of 2kBi memory chunks |
|
|
|
for(int i=0; i<32; ++i) { |
|
|
|
qli_t &ptr = pool[i]; |
|
|
|
ptr.size = 2048; |
|
|
|
ptr.memory = new char[2048]; |
|
|
|
free(&ptr); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
MultiQueue::~MultiQueue(void) |
|
|
|
{ |
|
|
|
for(int i=0; i<32; ++i) |
|
|
|
delete [] pool[i].memory; |
|
|
|
delete [] pool; |
|
|
|
} |