Ported my proxy collector...

Liste des GroupesRevenir à cl c++ 
Sujet : Ported my proxy collector...
De : chris.m.thomasson.1 (at) *nospam* gmail.com (Chris M. Thomasson)
Groupes : comp.lang.c++
Date : 17. Sep 2024, 01:08:51
Autres entêtes
Organisation : A noiseless patient Spider
Message-ID : <vcadq3$340q3$1@dont-email.me>
User-Agent : Mozilla Thunderbird
This is a just quick port form Relacy to C++. The collector has the ability to allow a lock-free stack, queues, ect... to be used without any DWCAS on a pop operation. Also, it takes care of memory management issues with the nodes...
Well here is my crude test code. Keep in mind that the g_debug_alloc/free global's are only there for debugging purposes at this early stage. They can be removed. This is akin to a poor mans RCU, in a sense... Well, can you compile and run it when you get some really free time to burn?
Thanks everybody. :^)
_________________________________________
#include <cassert>
#include <iostream>
#include <atomic>
#include <thread>
#define CT_READERS 10
#define CT_WRITERS 10
#define CT_THREADS (CT_WRITERS + CT_READERS)
#define CT_ITERS 10000000
#define CT_COLLECT 10240
// These are for debug and some statistics only...
static std::atomic<unsigned long> g_debug_alloc(0);
static std::atomic<unsigned long> g_debug_free(0);
// Chris M. Thomassons single target proxy collector:
// https://pastebin.com/raw/f71480694
namespace ct_proxy
{
     struct node
     {
         std::atomic<node*> m_next;
         node* m_defer_next;
         node()
         {
             // debug only!
             g_debug_alloc.fetch_add(1);
         }
         ~node()
         {
             // debug only!
             g_debug_free.fetch_add(1);
         }
     };
     class proxy
     {
         static void prv_destroy(node* n)
         {
             while (n)
             {
                 node* next = n->m_defer_next;
                 delete n;
                 n = next;
             }
         }
     public:
         class collector
         {
             friend class proxy;
         private:
             std::atomic<node*> m_defer;
             std::atomic<unsigned int> m_count;
         public:
             collector()
             :   m_defer(nullptr),
                 m_count(0)
             {
             }
             ~collector()
             {
                 prv_destroy(m_defer.load(std::memory_order_relaxed));
             }
         };
     private:
         std::atomic<unsigned int> m_current;
         std::atomic<bool> m_quiesce;
         node* m_defer;
         collector m_collectors[2];
     public:
         proxy()
         :   m_current(0),
             m_quiesce(false),
             m_defer(nullptr)
         {
         }
         ~proxy()
         {
             prv_destroy(m_defer);
         }
     private:
         void prv_quiesce_begin()
         {
             // Try to begin the quiescence process.
             if (! m_quiesce.exchange(true, std::memory_order_acquire))
             {
                 // advance the current collector and grab the old one.
                 unsigned int old = m_current.load(std::memory_order_relaxed) & 0xFU;
                 old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
                 collector& c = m_collectors[old & 0xFU];
                 // decode reference count.
                 unsigned int refs = old & 0xFFFFFFF0U;
                 // verify reference count and previous collector index.
                 assert(! (refs & 0x10U) && (old & 0xFU) == (&c - m_collectors));
                 // increment and generate an odd reference count.
                 if (c.m_count.fetch_add(refs + 0x10U, std::memory_order_release) == -(int)refs)
                 {
                     // odd reference count and drop-to-zero condition detected!
                     prv_quiesce_complete(c);
                 }
             }
         }
         void prv_quiesce_complete(collector& c)
         {
             // the collector `c' is now in a quiescent state! :^)
             std::atomic_thread_fence(std::memory_order_acquire);
             // maintain the back link and obtain "fresh" objects from
             // this collection.
             node* n = m_defer;
             m_defer = c.m_defer.load(std::memory_order_relaxed);
             c.m_defer.store(0, std::memory_order_relaxed);
             // verify and reset the reference count.
             assert(c.m_count.load(std::memory_order_relaxed) == 0x10U);
             c.m_count.store(0, std::memory_order_relaxed);
             // release the quiesce lock.
             m_quiesce.store(false, std::memory_order_release);
             // destroy nodes.
             prv_destroy(n);
         }
     public:
         collector& acquire()
         {
             // increment the master count _and_ obtain current collector.
             unsigned int current =
                 m_current.fetch_add(0x20U, std::memory_order_acquire);
             // decode the collector index.
             return m_collectors[current & 0xFU];
         }
         void release(collector& c)
         {
             // decrement the collector.
             unsigned int count =
                 c.m_count.fetch_sub(0x20U, std::memory_order_release);
             // check for the completion of the quiescence process.
             if ((count & 0xFFFFFFF0U) == 0x30U)
             {
                 // odd reference count and drop-to-zero condition detected!
                 prv_quiesce_complete(c);
             }
         }
         collector& sync(collector& c)
         {
             // check if the `c' is in the middle of a quiescence process.
             if (c.m_count.load(std::memory_order_relaxed) & 0x10U)
             {
                 // drop `c' and get the next collector.
                 release(c);
                 return acquire();
             }
             return c;
         }
         void collect()
         {
             prv_quiesce_begin();
         }
         void collect(collector& c, node* n)
         {
             if (!n) return;
             // link node into the defer list.
             node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
             n->m_defer_next = prev;
         }
     };
}
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course!  ;^)
class ct_stack
{
     std::atomic<ct_proxy::node*> m_head;
public:
     ct_stack()
     : m_head(nullptr)
     {
     }
public:
     void push(ct_proxy::node* n)
     {
         ct_proxy::node* head = m_head.load(std::memory_order_relaxed);
         do
         {
             n->m_next.store(head, std::memory_order_relaxed);
         }
         while (! m_head.compare_exchange_weak(
             head,
             n,
             std::memory_order_release));
     }
     ct_proxy::node* flush()
     {
         return m_head.exchange(nullptr, std::memory_order_acquire);
     }
     ct_proxy::node* get_head()
     {
         return m_head.load(std::memory_order_acquire);
     }
     ct_proxy::node* pop()
     {
         ct_proxy::node* head = m_head.load(std::memory_order_acquire);
         ct_proxy::node* xchg;
         do
         {
             if (! head) return nullptr;
             xchg = head->m_next.load(std::memory_order_relaxed);
         }
         while (! m_head.compare_exchange_weak(
             head,
             xchg,
             std::memory_order_acquire));
         return head;
     }
};
// Test Threads...
struct ct_shared
{
     ct_proxy::proxy m_proxy;
     ct_stack m_stack;
};
// Push and pop nodes in the lock-free stack...
void
ct_thread_writer(
     ct_shared& shared
) {
     for (unsigned long i = 0; i < CT_ITERS; ++i)
     {
         // push
         {
             ct_proxy::node* n0 = new ct_proxy::node();
             shared.m_stack.push(n0);
         }
         std::this_thread::yield();
         // pop
         {
             ct_proxy::proxy::collector& c0 = shared.m_proxy.acquire();
             shared.m_proxy.collect(c0, shared.m_stack.pop());
             shared.m_proxy.release(c0);
         }
         if (! ((i + 1) % CT_COLLECT))
         {
             shared.m_proxy.collect();
         }
     }
}
// Iterate throught the lock-free stack...
void
ct_thread_reader(
     ct_shared& shared
) {
     for (unsigned long i = 0; i < CT_ITERS; ++i)
     {
         ct_proxy::proxy::collector& c0 = shared.m_proxy.acquire();
         ct_proxy::node* head = shared.m_stack.get_head();
         while (head)
         {
             if (! ((i + 1) % 256))
             {
                 std::this_thread::yield();
             }
             head = head->m_next.load(std::memory_order_relaxed);
         }
         shared.m_proxy.release(c0);
     }
}
// Get things going...
int
main()
{
     std::cout << "ctProxy Test...\n\n";
     {
         ct_shared shared;
         std::thread threads[CT_THREADS];
         // Create threads...
         std::cout << "launching threads..." << std::endl;
         for (unsigned long i = 0; i < CT_THREADS; ++i)
         {
             if (i < CT_WRITERS)
             {
                 threads[i] = std::thread(ct_thread_writer, std::ref(shared));
             }
             else
             {
                 threads[i] = std::thread(ct_thread_reader, std::ref(shared));
             }
         }
         std::cout << "processing...\n" << std::endl;
         // Join threads...
         for (unsigned long i = 0; i < CT_THREADS; ++i)
         {
             threads[i].join();
         }
     }
     std::cout << "\nctProxy Test Complete! :^D\n\n";
     // Sanity check...
     {
         if (g_debug_alloc.load() != g_debug_free.load())
         {
             std::cout << "\nUMMM... THIS IS NOT GOOD!!!! god damn it!\n\n";
         }
     }
     return 0;
}
__________________________________________
Any thoughts?

Date Sujet#  Auteur
17 Sep01:08 * Ported my proxy collector...6Chris M. Thomasson
17 Sep01:15 +- Re: Ported my proxy collector...1Chris M. Thomasson
17 Sep09:38 `* Re: Ported my proxy collector...4Muttley
17 Sep20:46  `* Re: Ported my proxy collector...3Chris M. Thomasson
18 Sep09:20   `* Re: Ported my proxy collector...2Muttley
18 Sep21:27    `- Re: Ported my proxy collector...1Chris M. Thomasson

Haut de la page

Les messages affichés proviennent d'usenet.

NewsPortal