On 9/23/2024 3:32 PM, MitchAlsup1 wrote:
On Mon, 23 Sep 2024 22:19:37 +0000, Chris M. Thomasson wrote:
[...]
Getting around ABA and/or DWCAS. A way, in a portable C++11 program. One of my proxy collectors can do it. It's not the only tool in the box for sure! So, well, that's that. Here is some code. Can you compile and give it a run?
The missing (#include <functional>) aside for a moment, does it compile on your end?
_____________
#include <cassert>
#include <iostream>
#include <atomic>
#include <thread>
#define CT_READERS 10
#define CT_WRITERS 5
#define CT_THREADS (CT_WRITERS + CT_READERS)
#define CT_ITERS 10000000
#define CT_COLLECT 1024
// These are for debug and some statistics only...
static std::atomic<unsigned long> g_debug_alloc(0);
static std::atomic<unsigned long> g_debug_free(0);
// Chris M. Thomassons single target proxy collector:
//
https://pastebin.com/raw/f71480694namespace ct_proxy
{
struct node
{
std::atomic<node*> m_next;
node* m_defer_next;
node()
{
// debug only!
g_debug_alloc.fetch_add(1);
}
~node()
{
// debug only!
g_debug_free.fetch_add(1);
}
};
class proxy
{
static void prv_destroy(node* n)
{
while (n)
{
node* next = n->m_defer_next;
delete n;
n = next;
}
}
public:
class collector
{
friend class proxy;
private:
std::atomic<node*> m_defer;
std::atomic<unsigned int> m_count;
public:
collector()
: m_defer(nullptr),
m_count(0)
{
}
~collector()
{
prv_destroy(m_defer.load(std::memory_order_relaxed));
}
};
private:
std::atomic<unsigned int> m_current;
std::atomic<bool> m_quiesce;
node* m_defer;
collector m_collectors[2];
public:
proxy()
: m_current(0),
m_quiesce(false),
m_defer(nullptr)
{
}
~proxy()
{
prv_destroy(m_defer);
}
private:
void prv_quiesce_begin()
{
// Try to begin the quiescence process.
if (! m_quiesce.exchange(true, std::memory_order_acquire))
{
// advance the current collector and grab the old one.
unsigned int old = m_current.load(std::memory_order_relaxed) & 0xFU;
old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
collector& c = m_collectors[old & 0xFU];
// decode reference count.
unsigned int refs = old & 0xFFFFFFF0U;
// verify reference count and previous collector index.
assert(! (refs & 0x10U) && (old & 0xFU) == (&c - m_collectors));
// increment and generate an odd reference count.
if (c.m_count.fetch_add(refs + 0x10U, std::memory_order_release) == -(int)refs)
{
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
}
void prv_quiesce_complete(collector& c)
{
// the collector `c' is now in a quiescent state! :^)
std::atomic_thread_fence(std::memory_order_acquire);
// maintain the back link and obtain "fresh" objects from
// this collection.
node* n = m_defer;
m_defer = c.m_defer.load(std::memory_order_relaxed);
c.m_defer.store(0, std::memory_order_relaxed);
// verify and reset the reference count.
assert(c.m_count.load(std::memory_order_relaxed) == 0x10U);
c.m_count.store(0, std::memory_order_relaxed);
// release the quiesce lock.
m_quiesce.store(false, std::memory_order_release);
// destroy nodes.
prv_destroy(n);
}
public:
collector& acquire()
{
// increment the master count _and_ obtain current collector.
unsigned int current =
m_current.fetch_add(0x20U, std::memory_order_acquire);
// decode the collector index.
return m_collectors[current & 0xFU];
}
void release(collector& c)
{
// decrement the collector.
unsigned int count =
c.m_count.fetch_sub(0x20U, std::memory_order_release);
// check for the completion of the quiescence process.
if ((count & 0xFFFFFFF0U) == 0x30U)
{
// odd reference count and drop-to-zero condition detected!
prv_quiesce_complete(c);
}
}
collector& sync(collector& c)
{
// check if the `c' is in the middle of a quiescence process.
if (c.m_count.load(std::memory_order_relaxed) & 0x10U)
{
// drop `c' and get the next collector.
release(c);
return acquire();
}
return c;
}
void collect()
{
prv_quiesce_begin();
}
void collect(collector& c, node* n)
{
if (!n) return;
// link node into the defer list.
node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
n->m_defer_next = prev;
}
};
}
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course! ;^)
class ct_stack
{
std::atomic<ct_proxy::node*> m_head;
public:
ct_stack()
: m_head(nullptr)
{
}
public:
void push(ct_proxy::node* n)
{
ct_proxy::node* head = m_head.load(std::memory_order_relaxed);
do
{
n->m_next.store(head, std::memory_order_relaxed);
}
while (! m_head.compare_exchange_weak(
head,
n,
std::memory_order_release));
}
ct_proxy::node* flush()
{
return m_head.exchange(nullptr, std::memory_order_acquire);
}
ct_proxy::node* get_head()
{
return m_head.load(std::memory_order_acquire);
}
ct_proxy::node* pop()
{
ct_proxy::node* head = m_head.load(std::memory_order_acquire);
ct_proxy::node* xchg;
do
{
if (! head) return nullptr;
xchg = head->m_next.load(std::memory_order_relaxed);
}
while (! m_head.compare_exchange_weak(
head,
xchg,
std::memory_order_acquire));
return head;
}
};
// Test Threads...
struct ct_shared
{
ct_proxy::proxy m_proxy;
ct_stack m_stack;
};
// Push and pop nodes in the lock-free stack...
void
ct_thread_writer(
ct_shared& shared
) {
for (unsigned long i = 0; i < CT_ITERS; ++i)
{
// push
{
ct_proxy::node* n0 = new ct_proxy::node();
shared.m_stack.push(n0);
}
std::this_thread::yield();
// pop
{
ct_proxy::proxy::collector& c0 = shared.m_proxy.acquire();
shared.m_proxy.collect(c0, shared.m_stack.pop());
shared.m_proxy.release(c0);
}
if (! ((i + 1) % CT_COLLECT))
{
shared.m_proxy.collect();
}
}
}
// Iterate throught the lock-free stack...
void
ct_thread_reader(
ct_shared& shared
) {
for (unsigned long i = 0; i < CT_ITERS; ++i)
{
ct_proxy::proxy::collector& c0 = shared.m_proxy.acquire();
ct_proxy::node* head = shared.m_stack.get_head();
while (head)
{
if (! ((i + 1) % 256))
{
std::this_thread::yield();
}
head = head->m_next.load(std::memory_order_relaxed);
}
shared.m_proxy.release(c0);
}
}
// Get things going...
int
main()
{
std::cout << "ctProxy Test...\n\n";
{
ct_shared shared;
std::thread threads[CT_THREADS];
// Create threads...
std::cout << "launching threads..." << std::endl;
for (unsigned long i = 0; i < CT_THREADS; ++i)
{
if (i < CT_WRITERS)
{
threads[i] = std::thread(ct_thread_writer, std::ref(shared));
}
else
{
threads[i] = std::thread(ct_thread_reader, std::ref(shared));
}
}
std::cout << "processing...\n" << std::endl;
// Join threads...
for (unsigned long i = 0; i < CT_THREADS; ++i)
{
threads[i].join();
}
}
std::cout << "\nctProxy Test Complete! :^D\n\n";
// Sanity check...
{
if (g_debug_alloc.load() != g_debug_free.load())
{
std::cout << "\nUMMM... THIS IS NOT GOOD!!!! god damn it!\n\n";
}
}
return 0;
}
____________________