|  | /*
  ZynAddSubFX - a software synthesizer
  MultiPseudoStack.cpp - Multiple-Writer Lock Free Datastructure
  Copyright (C) 2016 Mark McCurry
  This program is free software; you can redistribute it and/or
  modify it under the terms of the GNU General Public License
  as published by the Free Software Foundation; either version 2
  of the License, or (at your option) any later version.
*/
#include "MultiPseudoStack.h"
#include <cassert>
#include <cstdio>
#define INVALID ((int32_t)0xffffffff)
#define MAX     ((int32_t)0x7fffffff)
namespace zyncarla {
QueueListItem::QueueListItem(void)
    :memory(0), size(0)
{
}
LockFreeQueue::LockFreeQueue(qli_t *data_, int n)
    :data(data_), elms(n), next_r(0), next_w(0), avail(0)
{
    tag  = new std::atomic<uint32_t>[n];
    for(int i=0; i<n; ++i)
        tag[i] = INVALID;
}
LockFreeQueue::~LockFreeQueue(void)
{
    delete [] tag;
}
qli_t *LockFreeQueue::read(void) {
retry:
    int8_t free_elms = avail.load();
    if(free_elms <= 0)
        return 0;
    int32_t next_tag      = next_r.load();
    int32_t next_next_tag = (next_tag+1)&MAX;
    assert(next_tag != INVALID);
    for(int i=0; i<elms; ++i) {
        uint32_t elm_tag = tag[i].load();
        //attempt to remove tagged element
        //if and only if it's next
        if(((uint32_t)next_tag) == elm_tag) {
            if(!tag[i].compare_exchange_strong(elm_tag, INVALID))
                goto retry;
            //Ok, now there is no element that can be removed from the list
            //Effectively there's mutual exclusion over other readers here
            //Set the next element
            int sane_read = next_r.compare_exchange_strong(next_tag, next_next_tag);
            assert(sane_read && "No double read on a single tag");
            //Decrement available elements
            int32_t free_elms_next = avail.load();
            while(!avail.compare_exchange_strong(free_elms_next, free_elms_next-1));
            //printf("r%d ", free_elms_next-1);
            return &data[i];
        }
    }
    goto retry;
}
//Insert Node Q
void LockFreeQueue::write(qli_t *Q) {
retry:
    if(!Q)
        return;
    int32_t write_tag = next_w.load();
    int32_t next_write_tag = (write_tag+1)&MAX;
    if(!next_w.compare_exchange_strong(write_tag, next_write_tag))
        goto retry;
    uint32_t invalid_tag = INVALID;
    //Update tag
    int sane_write = tag[Q-data].compare_exchange_strong(invalid_tag, write_tag);
    assert(sane_write);
    //Increment available elements
    int32_t free_elms = avail.load();
    while(!avail.compare_exchange_strong(free_elms, free_elms+1))
        assert(free_elms <= 32);
    //printf("w%d ", free_elms+1);
}
MultiQueue::MultiQueue(void)
    :pool(new qli_t[32]), m_free(pool, 32), m_msgs(pool, 32)
{
    //32 instances of 2kBi memory chunks
    for(int i=0; i<32; ++i) {
        qli_t &ptr  = pool[i];
        ptr.size   = 2048;
        ptr.memory = new char[2048];
        free(&ptr);
    }
}
MultiQueue::~MultiQueue(void)
{
    for(int i=0; i<32; ++i)
        delete [] pool[i].memory;
    delete [] pool;
}
}
 |