portable proxy collector test...

Liste des GroupesRevenir à c arch 
Sujet : portable proxy collector test...
De : chris.m.thomasson.1 (at) *nospam* gmail.com (Chris M. Thomasson)
Groupes : comp.arch
Date : 05. Dec 2024, 02:17:08
Autres entêtes
Organisation : A noiseless patient Spider
Message-ID : <viquuj$16v40$1@dont-email.me>
User-Agent : Mozilla Thunderbird
I am wondering if anybody can try to compile and run this C++11 code of mine for a portable word-based proxy collector, a sort of poor mans RCU, on an ARM based system? I don't have access to one. I am interested in the resulting output.
https://pastebin.com/raw/CYZ78gVj
(raw text link, no ads... :^)
_______________________________________
// Chris M. Thomassons Poor Mans RCU... Example 456...
#include <iostream>
#include <atomic>
#include <thread>
#include <cstdlib>
#include <cstdint>
#include <climits>
#include <functional>
// Masks
static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
static constexpr std::uint32_t ct_ref_complete = 0x30U;
static constexpr std::uint32_t ct_ref_inc = 0x20U;
static constexpr std::uint32_t ct_proxy_mask = 0xFU;
static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;
// Iteration settings
static constexpr unsigned long ct_reader_iters_n = 2000000;
static constexpr unsigned long ct_writer_iters_n = 200000;
// Thread counts
static constexpr unsigned long ct_reader_threads_n = 53;
static constexpr unsigned long ct_writer_threads_n = 11;
// Some debug/sanity check things...
// Need to make this conditional in compilation with some macros...
static std::atomic<std::uint32_t> g_debug_node_allocations(0);
static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
static std::atomic<std::uint32_t> g_debug_dtor_collect(0);
static std::atomic<std::uint32_t> g_debug_release_collect(0);
static std::atomic<std::uint32_t> g_debug_quiesce_begin(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete_nodes(0);
// Need to align and pad data structures! To do...
struct ct_node
{
     std::atomic<ct_node*> m_next;
     ct_node* m_defer_next;
     ct_node() : m_next(nullptr), m_defer_next(nullptr)
     {
         g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
     }
     ~ct_node()
     {
         g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
     }
};
// The proxy collector itself... :^)
template<std::size_t T_defer_limit>
class ct_proxy
{
     static std::uint32_t prv_destroy(ct_node* n)
     {
         std::uint32_t count = 0;
         while (n)
         {
             ct_node* next = n->m_defer_next;
             delete n;
             count++;
             n = next;
         }
         return count;
     }
public:
     class collector
     {
         friend class ct_proxy;
     private:
         std::atomic<ct_node*> m_defer;
         std::atomic<std::uint32_t> m_defer_count;
         std::atomic<std::uint32_t> m_count;
     public:
         collector()
             : m_defer(nullptr),
             m_defer_count(0),
             m_count(0)
         {
         }
         ~collector()
         {
             prv_destroy(m_defer.load(std::memory_order_relaxed));
         }
     };
private:
     std::atomic<std::uint32_t> m_current;
     std::atomic<bool> m_quiesce;
     ct_node* m_defer;
     collector m_collectors[2];
public:
     ct_proxy()
         : m_current(0),
         m_quiesce(false),
         m_defer(nullptr)
     {
     }
     ~ct_proxy()
     {
         prv_destroy(m_defer);
     }
private:
     void prv_quiesce_begin()
     {
         // Try to begin the quiescence process.
         if (! m_quiesce.exchange(true, std::memory_order_acquire))
         {
             g_debug_quiesce_begin.fetch_add(1, std::memory_order_relaxed);
             // advance the current collector and grab the old one.
             std::uint32_t old = m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
             old = m_current.exchange((old + 1) & 1, std::memory_order_acq_rel);
             collector& c = m_collectors[old & ct_proxy_mask];
             // decode reference count.
             std::uint32_t refs = old & ct_ref_mask;
             // increment and generate an odd reference count.
             std::uint32_t old_refs = c.m_count.fetch_add(refs + ct_proxy_quiescent, std::memory_order_release);
             if (old_refs == 0 - refs)
             {
                 g_debug_dtor_collect.fetch_add(1, std::memory_order_relaxed);
                 // odd reference count and drop-to-zero condition detected!
                 prv_quiesce_complete(c);
             }
         }
     }
     void prv_quiesce_complete(collector& c)
     {
         g_debug_quiesce_complete.fetch_add(1, std::memory_order_relaxed);
         // the collector `c' is now in a quiescent state! :^)
         std::atomic_thread_fence(std::memory_order_acquire);
         // maintain the back link and obtain "fresh" objects from
         // this collection.
         ct_node* n = m_defer;
         m_defer = c.m_defer.load(std::memory_order_relaxed);
         c.m_defer.store(0, std::memory_order_relaxed);
         // reset the reference count.
         c.m_count.store(0, std::memory_order_relaxed);
         c.m_defer_count.store(0, std::memory_order_relaxed);
         // release the quiesce lock.
         m_quiesce.store(false, std::memory_order_release);
         // destroy nodes.
         std::uint32_t count = prv_destroy(n);
         g_debug_quiesce_complete_nodes.fetch_add(count, std::memory_order_relaxed);
     }
public:
     collector& acquire()
     {
         // increment the master count _and_ obtain current collector.
         std::uint32_t current =
             m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);
         // decode the collector index.
         return m_collectors[current & ct_proxy_mask];
     }
     void release(collector& c)
     {
         // decrement the collector.
         std::uint32_t count =
             c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);
         // check for the completion of the quiescence process.
         if ((count & ct_ref_mask) == ct_ref_complete)
         {
             // odd reference count and drop-to-zero condition detected!
             g_debug_release_collect.fetch_add(1, std::memory_order_relaxed);
             prv_quiesce_complete(c);
         }
     }
     collector& sync(collector& c)
     {
         // check if the `c' is in the middle of a quiescence process.
         if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
         {
             // drop `c' and get the next collector.
             release(c);
             return acquire();
         }
         return c;
     }
     void collect()
     {
         prv_quiesce_begin();
     }
     void collect(collector& c, ct_node* n)
     {
         if (! n) return;
         // link node into the defer list.
         ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
         n->m_defer_next = prev;
         // bump the defer count and begin quiescence process if over
         // the limit.
         std::uint32_t count =
             c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
         if (count >= (T_defer_limit / 2))
         {
             prv_quiesce_begin();
         }
     }
};
typedef ct_proxy<10> ct_proxy_collector;
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course!  ;^)
class ct_stack
{
     std::atomic<ct_node*> m_head;
public:
     ct_stack() : m_head(nullptr)
     {
     }
public:
     void push(ct_node* n)
     {
         ct_node* head = m_head.load(std::memory_order_relaxed);
         do
         {
             n->m_next.store(head, std::memory_order_relaxed);
         }
         while (! m_head.compare_exchange_weak(
             head,
             n,
             std::memory_order_release));
     }
     ct_node* flush()
     {
         return m_head.exchange(nullptr, std::memory_order_acquire);
     }
     ct_node* get_head()
     {
         return m_head.load(std::memory_order_acquire);
     }
     ct_node* pop()
     {
         ct_node* head = m_head.load(std::memory_order_acquire);
         ct_node* xchg;
         do
         {
             if (! head) return nullptr;
             xchg = head->m_next.load(std::memory_order_relaxed);
         }
         while (!m_head.compare_exchange_weak(
             head,
             xchg,
             std::memory_order_acquire));
         return head;
     }
};
// The shared state
struct ct_shared
{
     ct_proxy<10> m_proxy_gc;
     ct_stack m_stack;
};
// Reader threads
// Iterates through the lock free stack
void ct_thread_reader(ct_shared& shared)
{
     // iterate the lockfree stack
     for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
     {
         ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
         ct_node* n = shared.m_stack.get_head();
         while (n)
         {
             // need to add in some processing...
             // std::this_thread::yield();
             n = n->m_next.load(std::memory_order_relaxed);
         }
         shared.m_proxy_gc.release(c);
     }
}
// Writer threads
// Mutates the lock free stack
void ct_thread_writer(ct_shared& shared)
{
     for (unsigned long wloop = 0; wloop < 42; ++wloop)
     {
         shared.m_proxy_gc.collect();
         for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
         {
             shared.m_stack.push(new ct_node());
         }
         //std::this_thread::yield();
         ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
         for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
         {
             shared.m_proxy_gc.collect(c, shared.m_stack.pop());
         }
         shared.m_proxy_gc.release(c);
         for (unsigned long i = 0; i < ct_writer_iters_n / 2; ++i)
         {
             shared.m_proxy_gc.collect();
         }
         {
             ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
             for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
             {
                 ct_node* n = shared.m_stack.pop();
                 if (! n) break;
                 shared.m_proxy_gc.collect(c, n);
             }
             shared.m_proxy_gc.release(c);
         }
         if ((wloop % 3) == 0)
         {
             shared.m_proxy_gc.collect();
         }
     }
}
int main()
{
     std::cout << "Chris M. Thomassons Proxy Collector Port ver .0.0.2...\n";
     std::cout << "_______________________________________\n\n";
     {
         ct_shared shared;
         std::thread readers[ct_reader_threads_n];
         std::thread writers[ct_writer_threads_n];
         std::cout << "Booting threads...\n";
         for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
         {
             writers[i] = std::thread(ct_thread_writer, std::ref(shared));
         }
         for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
         {
             readers[i] = std::thread(ct_thread_reader, std::ref(shared));
         }
         std::cout << "Threads running...\n";
         for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
         {
             readers[i].join();
         }
         for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
         {
             writers[i].join();
         }
     }
     std::cout << "Threads completed!\n\n";
     // Sanity check!
     {
         std::uint32_t node_allocations = g_debug_node_allocations.load(std::memory_order_relaxed);
         std::uint32_t node_deallocations = g_debug_node_deallocations.load(std::memory_order_relaxed);
         std::uint32_t dtor_collect = g_debug_dtor_collect.load(std::memory_order_relaxed);
         std::uint32_t release_collect = g_debug_release_collect.load(std::memory_order_relaxed);
         std::uint32_t quiesce_complete = g_debug_quiesce_complete.load(std::memory_order_relaxed);
         std::uint32_t quiesce_begin = g_debug_quiesce_begin.load(std::memory_order_relaxed);
         std::uint32_t quiesce_complete_nodes = g_debug_quiesce_complete_nodes.load(std::memory_order_relaxed);
         std::cout << "node_allocations = " << node_allocations << "\n";
         std::cout << "node_deallocations = " << node_deallocations << "\n\n";
         std::cout << "dtor_collect = " << dtor_collect << "\n";
         std::cout << "release_collect = " << release_collect << "\n";
         std::cout << "quiesce_complete = " << quiesce_complete << "\n";
         std::cout << "quiesce_begin = " << quiesce_begin << "\n";
         std::cout << "quiesce_complete_nodes = " << quiesce_complete_nodes << "\n";
         if (node_allocations != node_deallocations)
         {
             std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " << node_allocations - node_deallocations << "\n\n";
         }
     }
     std::cout << "\n\nTest Completed!\n\n";
     return 0;
}
_______________________________________

Date Sujet#  Auteur
5 Dec 24 * portable proxy collector test...16Chris M. Thomasson
6 Dec 24 `* Re: portable proxy collector test...15Brett
6 Dec 24  +- Re: portable proxy collector test...1Chris M. Thomasson
6 Dec 24  +- Re: portable proxy collector test...1Chris M. Thomasson
6 Dec 24  `* Re: portable proxy collector test...12Chris M. Thomasson
6 Dec 24   `* Re: portable proxy collector test...11jseigh
8 Dec 24    +- Re: portable proxy collector test...1Chris M. Thomasson
8 Dec 24    +* Re: portable proxy collector test...2Chris M. Thomasson
8 Dec 24    i`- Re: portable proxy collector test...1Chris M. Thomasson
9 Dec 24    `* Re: portable proxy collector test...7Chris M. Thomasson
9 Dec 24     +* Re: portable proxy collector test...2aph
9 Dec 24     i`- Re: portable proxy collector test...1Chris M. Thomasson
9 Dec 24     `* Re: portable proxy collector test...4jseigh
9 Dec 24      +* Re: portable proxy collector test...2jseigh
9 Dec 24      i`- Re: portable proxy collector test...1Chris M. Thomasson
9 Dec 24      `- Re: portable proxy collector test...1Chris M. Thomasson

Haut de la page

Les messages affichés proviennent d'usenet.

NewsPortal