This is a just quick port form Relacy to C++. The collector has the ability to allow a lock-free stack, queues, ect... to be used without any DWCAS on a pop operation. Also, it takes care of memory management issues with the nodes...
Well here is my crude test code. Keep in mind that the g_debug_alloc/free global's are only there for debugging purposes at this early stage. They can be removed. This is akin to a poor mans RCU, in a sense... Well, can you compile and run it when you get some really free time to burn?
Thanks everybody. :^)
_________________________________________
#include <cassert>
#include <iostream>
#include <atomic>
#include <thread>
#define CT_READERS 10
#define CT_WRITERS 10
#define CT_THREADS (CT_WRITERS + CT_READERS)
#define CT_ITERS 10000000
#define CT_COLLECT 10240
// These are for debug and some statistics only...
static std::atomic<unsigned long> g_debug_alloc(0);
static std::atomic<unsigned long> g_debug_free(0);
// Chris M. Thomassons single target proxy collector:
//
https://pastebin.com/raw/f71480694namespace ct_proxy
{
struct node
{
std::atomic<node*> m_next;
node* m_defer_next;
node()
{
// debug only!
g_debug_alloc.fetch_add(1);
}
~node()
{
// debug only!
g_debug_free.fetch_add(1);
}
};
class proxy
{
static void prv_destroy(node* n)
{
while (n)
{
node* next = n->m_defer_next;
delete n;
n = next;
}
}
public:
class collector
{
friend class proxy;
private:
std::atomic<node*> m_defer;
std::atomic<unsigned int> m_count;
public:
collector()
: m_defer(nullptr),
m_count(0)
{
}
~collector()
{
prv_destroy(m_defer.load(std::memory_order_relaxed));
}
};
private:
std::atomic<unsigned int> m_current;
std::atomic<bool> m_quiesce;
node* m_defer;
collector m_collectors[2];
public:
proxy()
: m_current(0),
m_quiesce(false),
m_defer(nullptr)
{
}
~proxy()
{
prv_destroy(m_defer);
}
private:
void prv_quiesce_begin()
{
// Try to begin the quiescence process.
if (! m_quiesce.exchange(true, std::memory_order_acquire))
{
// advance the current collector and grab the old one.
unsigned int old = m_current.load(std::memory_order_relaxed) & 0xFU;
old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
collector& c = m_collectors[old & 0xFU];
// decode reference count.
unsigned int refs = old & 0xFFFFFFF0U;
// verify reference count and previous collector index.
assert(! (refs & 0x10U) && (old & 0xFU) == (&c - m_collectors));
// increment and generate an odd reference count.
if (c.m_count.fetch_add(refs + 0x10U, std::memory_order_release) == -(int)refs)
{
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
}
void prv_quiesce_complete(collector& c)
{
// the collector `c' is now in a quiescent state! :^)
std::atomic_thread_fence(std::memory_order_acquire);
// maintain the back link and obtain "fresh" objects from
// this collection.
node* n = m_defer;
m_defer = c.m_defer.load(std::memory_order_relaxed);
c.m_defer.store(0, std::memory_order_relaxed);
// verify and reset the reference count.
assert(c.m_count.load(std::memory_order_relaxed) == 0x10U);
c.m_count.store(0, std::memory_order_relaxed);
// release the quiesce lock.
m_quiesce.store(false, std::memory_order_release);
// destroy nodes.
prv_destroy(n);
}
public:
collector& acquire()
{
// increment the master count _and_ obtain current collector.
unsigned int current =
m_current.fetch_add(0x20U, std::memory_order_acquire);
// decode the collector index.
return m_collectors[current & 0xFU];
}
void release(collector& c)
{
// decrement the collector.
unsigned int count =
c.m_count.fetch_sub(0x20U, std::memory_order_release);
// check for the completion of the quiescence process.
if ((count & 0xFFFFFFF0U) == 0x30U)
{
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
collector& sync(collector& c)
{
// check if the `c' is in the middle of a quiescence process.
if (c.m_count.load(std::memory_order_relaxed) & 0x10U)
{
// drop `c' and get the next collector.
release(c);
return acquire();
}
return c;
}
void collect()
{
prv_quiesce_begin();
}
void collect(collector& c, node* n)
{
if (!n) return;
// link node into the defer list.
node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
n->m_defer_next = prev;
}
};
}
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course! ;^)
class ct_stack
{
std::atomic<ct_proxy::node*> m_head;
public:
ct_stack()
: m_head(nullptr)
{
}
public:
void push(ct_proxy::node* n)
{
ct_proxy::node* head = m_head.load(std::memory_order_relaxed);
do
{
n->m_next.store(head, std::memory_order_relaxed);
}
while (! m_head.compare_exchange_weak(
head,
n,
std::memory_order_release));
}
ct_proxy::node* flush()
{
return m_head.exchange(nullptr, std::memory_order_acquire);
}
ct_proxy::node* get_head()
{
return m_head.load(std::memory_order_acquire);
}
ct_proxy::node* pop()
{
ct_proxy::node* head = m_head.load(std::memory_order_acquire);
ct_proxy::node* xchg;
do
{
if (! head) return nullptr;
xchg = head->m_next.load(std::memory_order_relaxed);
}
while (! m_head.compare_exchange_weak(
head,
xchg,
std::memory_order_acquire));
return head;
}
};
// Test Threads...
struct ct_shared
{
ct_proxy::proxy m_proxy;
ct_stack m_stack;
};
// Push and pop nodes in the lock-free stack...
void
ct_thread_writer(
ct_shared& shared
) {
for (unsigned long i = 0; i < CT_ITERS; ++i)
{
// push
{
ct_proxy::node* n0 = new ct_proxy::node();
shared.m_stack.push(n0);
}
std::this_thread::yield();
// pop
{
ct_proxy::proxy::collector& c0 = shared.m_proxy.acquire();
shared.m_proxy.collect(c0, shared.m_stack.pop());
shared.m_proxy.release(c0);
}
if (! ((i + 1) % CT_COLLECT))
{
shared.m_proxy.collect();
}
}
}
// Iterate throught the lock-free stack...
void
ct_thread_reader(
ct_shared& shared
) {
for (unsigned long i = 0; i < CT_ITERS; ++i)
{
ct_proxy::proxy::collector& c0 = shared.m_proxy.acquire();
ct_proxy::node* head = shared.m_stack.get_head();
while (head)
{
if (! ((i + 1) % 256))
{
std::this_thread::yield();
}
head = head->m_next.load(std::memory_order_relaxed);
}
shared.m_proxy.release(c0);
}
}
// Get things going...
int
main()
{
std::cout << "ctProxy Test...\n\n";
{
ct_shared shared;
std::thread threads[CT_THREADS];
// Create threads...
std::cout << "launching threads..." << std::endl;
for (unsigned long i = 0; i < CT_THREADS; ++i)
{
if (i < CT_WRITERS)
{
threads[i] = std::thread(ct_thread_writer, std::ref(shared));
}
else
{
threads[i] = std::thread(ct_thread_reader, std::ref(shared));
}
}
std::cout << "processing...\n" << std::endl;
// Join threads...
for (unsigned long i = 0; i < CT_THREADS; ++i)
{
threads[i].join();
}
}
std::cout << "\nctProxy Test Complete! :^D\n\n";
// Sanity check...
{
if (g_debug_alloc.load() != g_debug_free.load())
{
std::cout << "\nUMMM... THIS IS NOT GOOD!!!! god damn it!\n\n";
}
}
return 0;
}
__________________________________________
Any thoughts?