Re: portable proxy collector test...

Liste des GroupesRevenir à c arch 
Sujet : Re: portable proxy collector test...
De : ggtgp (at) *nospam* yahoo.com (Brett)
Groupes : comp.arch
Date : 06. Dec 2024, 20:55:07
Autres entêtes
Organisation : A noiseless patient Spider
Message-ID : <vivkqq$2hpdg$1@dont-email.me>
References : 1
User-Agent : NewsTap/5.5 (iPad)
Chris M. Thomasson <chris.m.thomasson.1@gmail.com> wrote:
I am wondering if anybody can try to compile and run this C++11 code of
mine for a portable word-based proxy collector, a sort of poor mans RCU,
on an ARM based system? I don't have access to one. I am interested in
the resulting output.

https://godbolt.org

https://pastebin.com/raw/CYZ78gVj
(raw text link, no ads... :^)
 
_______________________________________
 
// Chris M. Thomassons Poor Mans RCU... Example 456...
 
 
#include <iostream>
#include <atomic>
#include <thread>
#include <cstdlib>
#include <cstdint>
#include <climits>
#include <functional>
 
 
// Masks
static constexpr std::uint32_t ct_ref_mask = 0xFFFFFFF0U;
static constexpr std::uint32_t ct_ref_complete = 0x30U;
static constexpr std::uint32_t ct_ref_inc = 0x20U;
static constexpr std::uint32_t ct_proxy_mask = 0xFU;
static constexpr std::uint32_t ct_proxy_quiescent = 0x10U;
 
 
// Iteration settings
static constexpr unsigned long ct_reader_iters_n = 2000000;
static constexpr unsigned long ct_writer_iters_n = 200000;
 
 
// Thread counts
static constexpr unsigned long ct_reader_threads_n = 53;
static constexpr unsigned long ct_writer_threads_n = 11;
 
 
// Some debug/sanity check things...
// Need to make this conditional in compilation with some macros...
static std::atomic<std::uint32_t> g_debug_node_allocations(0);
static std::atomic<std::uint32_t> g_debug_node_deallocations(0);
static std::atomic<std::uint32_t> g_debug_dtor_collect(0);
static std::atomic<std::uint32_t> g_debug_release_collect(0);
static std::atomic<std::uint32_t> g_debug_quiesce_begin(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete(0);
static std::atomic<std::uint32_t> g_debug_quiesce_complete_nodes(0);
 
// Need to align and pad data structures! To do...
 
struct ct_node
{
    std::atomic<ct_node*> m_next;
    ct_node* m_defer_next;
 
    ct_node() : m_next(nullptr), m_defer_next(nullptr)
    {
        g_debug_node_allocations.fetch_add(1, std::memory_order_relaxed);
    }
 
    ~ct_node()
    {
        g_debug_node_deallocations.fetch_add(1, std::memory_order_relaxed);
    }
};
 
 
// The proxy collector itself... :^)
template<std::size_t T_defer_limit>
class ct_proxy
{
    static std::uint32_t prv_destroy(ct_node* n)
    {
        std::uint32_t count = 0;
 
        while (n)
        {
            ct_node* next = n->m_defer_next;
            delete n;
            count++;
            n = next;
        }
 
        return count;
    }
 
 
public:
    class collector
    {
        friend class ct_proxy;
 
    private:
        std::atomic<ct_node*> m_defer;
        std::atomic<std::uint32_t> m_defer_count;
        std::atomic<std::uint32_t> m_count;
 
 
 
    public:
        collector()
            : m_defer(nullptr),
            m_defer_count(0),
            m_count(0)
        {
 
        }
 
 
        ~collector()
        {
            prv_destroy(m_defer.load(std::memory_order_relaxed));
        }
    };
 
 
private:
    std::atomic<std::uint32_t> m_current;
    std::atomic<bool> m_quiesce;
    ct_node* m_defer;
    collector m_collectors[2];
 
 
public:
    ct_proxy()
        : m_current(0),
        m_quiesce(false),
        m_defer(nullptr)
    {
 
    }
 
    ~ct_proxy()
    {
        prv_destroy(m_defer);
    }
 
 
private:
    void prv_quiesce_begin()
    {
        // Try to begin the quiescence process.
        if (! m_quiesce.exchange(true, std::memory_order_acquire))
        {
            g_debug_quiesce_begin.fetch_add(1, std::memory_order_relaxed);
 
            // advance the current collector and grab the old one.
            std::uint32_t old =
m_current.load(std::memory_order_relaxed) & ct_proxy_mask;
            old = m_current.exchange((old + 1) & 1,
std::memory_order_acq_rel);
            collector& c = m_collectors[old & ct_proxy_mask];
 
            // decode reference count.
            std::uint32_t refs = old & ct_ref_mask;
 
            // increment and generate an odd reference count.
            std::uint32_t old_refs = c.m_count.fetch_add(refs +
ct_proxy_quiescent, std::memory_order_release);
 
            if (old_refs == 0 - refs)
            {
                g_debug_dtor_collect.fetch_add(1,
std::memory_order_relaxed);
 
                // odd reference count and drop-to-zero condition detected!
                prv_quiesce_complete(c);
            }
        }
    }
 
 
    void prv_quiesce_complete(collector& c)
    {
        g_debug_quiesce_complete.fetch_add(1, std::memory_order_relaxed);
 
        // the collector `c' is now in a quiescent state! :^)
        std::atomic_thread_fence(std::memory_order_acquire);
 
        // maintain the back link and obtain "fresh" objects from
        // this collection.
        ct_node* n = m_defer;
        m_defer = c.m_defer.load(std::memory_order_relaxed);
        c.m_defer.store(0, std::memory_order_relaxed);
 
        // reset the reference count.
        c.m_count.store(0, std::memory_order_relaxed);
        c.m_defer_count.store(0, std::memory_order_relaxed);
 
        // release the quiesce lock.
        m_quiesce.store(false, std::memory_order_release);
 
        // destroy nodes.
        std::uint32_t count = prv_destroy(n);
 
        g_debug_quiesce_complete_nodes.fetch_add(count,
std::memory_order_relaxed);
    }
 
 
public:
    collector& acquire()
    {
        // increment the master count _and_ obtain current collector.
        std::uint32_t current =
            m_current.fetch_add(ct_ref_inc, std::memory_order_acquire);
 
        // decode the collector index.
        return m_collectors[current & ct_proxy_mask];
    }
 
    void release(collector& c)
    {
        // decrement the collector.
        std::uint32_t count =
            c.m_count.fetch_sub(ct_ref_inc, std::memory_order_release);
 
        // check for the completion of the quiescence process.
        if ((count & ct_ref_mask) == ct_ref_complete)
        {
            // odd reference count and drop-to-zero condition detected!
            g_debug_release_collect.fetch_add(1,
std::memory_order_relaxed);
 
            prv_quiesce_complete(c);
        }
    }
 
 
    collector& sync(collector& c)
    {
        // check if the `c' is in the middle of a quiescence process.
        if (c.m_count.load(std::memory_order_relaxed) & ct_proxy_quiescent)
        {
            // drop `c' and get the next collector.
            release(c);
 
            return acquire();
        }
 
        return c;
    }
 
 
    void collect()
    {
        prv_quiesce_begin();
    }
 
 
    void collect(collector& c, ct_node* n)
    {
        if (! n) return;
 
        // link node into the defer list.
        ct_node* prev = c.m_defer.exchange(n, std::memory_order_relaxed);
        n->m_defer_next = prev;
 
        // bump the defer count and begin quiescence process if over
        // the limit.
        std::uint32_t count =
            c.m_defer_count.fetch_add(1, std::memory_order_relaxed) + 1;
 
        if (count >= (T_defer_limit / 2))
        {
            prv_quiesce_begin();
        }
    }
};
 
 
 
typedef ct_proxy<10> ct_proxy_collector;
 
 
// you're basic lock-free stack...
// well, minus ABA counter and DWCAS of course!  ;^)
class ct_stack
{
    std::atomic<ct_node*> m_head;
 
 
public:
    ct_stack() : m_head(nullptr)
    {
 
    }
 
 
public:
    void push(ct_node* n)
    {
        ct_node* head = m_head.load(std::memory_order_relaxed);
 
        do
        {
            n->m_next.store(head, std::memory_order_relaxed);
        }
 
        while (! m_head.compare_exchange_weak(
            head,
            n,
            std::memory_order_release));
    }
 
 
    ct_node* flush()
    {
        return m_head.exchange(nullptr, std::memory_order_acquire);
    }
 
 
    ct_node* get_head()
    {
        return m_head.load(std::memory_order_acquire);
    }
 
 
    ct_node* pop()
    {
        ct_node* head = m_head.load(std::memory_order_acquire);
        ct_node* xchg;
 
        do
        {
            if (! head) return nullptr;
 
            xchg = head->m_next.load(std::memory_order_relaxed);
        }
 
        while (!m_head.compare_exchange_weak(
            head,
            xchg,
            std::memory_order_acquire));
 
        return head;
    }
};
 
 
// The shared state
struct ct_shared
{
    ct_proxy<10> m_proxy_gc;
    ct_stack m_stack;
};
 
 
 
// Reader threads
// Iterates through the lock free stack
void ct_thread_reader(ct_shared& shared)
{
    // iterate the lockfree stack
    for (unsigned long i = 0; i < ct_reader_iters_n; ++i)
    {
        ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
 
        ct_node* n = shared.m_stack.get_head();
 
        while (n)
        {
            // need to add in some processing...
            // std::this_thread::yield();
 
            n = n->m_next.load(std::memory_order_relaxed);
        }
 
        shared.m_proxy_gc.release(c);
    }
}
 
 
 
// Writer threads
// Mutates the lock free stack
void ct_thread_writer(ct_shared& shared)
{
    for (unsigned long wloop = 0; wloop < 42; ++wloop)
    {
        shared.m_proxy_gc.collect();
 
        for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
        {
            shared.m_stack.push(new ct_node());
        }
 
        //std::this_thread::yield();
 
        ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
 
        for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
        {
            shared.m_proxy_gc.collect(c, shared.m_stack.pop());
        }
 
        shared.m_proxy_gc.release(c);
 
        for (unsigned long i = 0; i < ct_writer_iters_n / 2; ++i)
        {
            shared.m_proxy_gc.collect();
        }
 
        {
            ct_proxy_collector::collector& c = shared.m_proxy_gc.acquire();
 
            for (unsigned long i = 0; i < ct_writer_iters_n; ++i)
            {
                ct_node* n = shared.m_stack.pop();
                if (! n) break;
 
                shared.m_proxy_gc.collect(c, n);
            }
 
            shared.m_proxy_gc.release(c);
        }
 
        if ((wloop % 3) == 0)
        {
            shared.m_proxy_gc.collect();
        }
    }
}
 
 
 
int main()
{
    std::cout << "Chris M. Thomassons Proxy Collector Port ver
.0.0.2...\n";
    std::cout << "_______________________________________\n\n";
 
    {
        ct_shared shared;
 
        std::thread readers[ct_reader_threads_n];
        std::thread writers[ct_writer_threads_n];
 
        std::cout << "Booting threads...\n";
 
        for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
        {
            writers[i] = std::thread(ct_thread_writer, std::ref(shared));
        }
 
        for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
        {
            readers[i] = std::thread(ct_thread_reader, std::ref(shared));
        }
 
        std::cout << "Threads running...\n";
 
        for (unsigned long i = 0; i < ct_reader_threads_n; ++i)
        {
            readers[i].join();
        }
 
        for (unsigned long i = 0; i < ct_writer_threads_n; ++i)
        {
            writers[i].join();
        }
    }
 
    std::cout << "Threads completed!\n\n";
 
 
    // Sanity check!
    {
        std::uint32_t node_allocations =
g_debug_node_allocations.load(std::memory_order_relaxed);
        std::uint32_t node_deallocations =
g_debug_node_deallocations.load(std::memory_order_relaxed);
        std::uint32_t dtor_collect =
g_debug_dtor_collect.load(std::memory_order_relaxed);
        std::uint32_t release_collect =
g_debug_release_collect.load(std::memory_order_relaxed);
        std::uint32_t quiesce_complete =
g_debug_quiesce_complete.load(std::memory_order_relaxed);
        std::uint32_t quiesce_begin =
g_debug_quiesce_begin.load(std::memory_order_relaxed);
        std::uint32_t quiesce_complete_nodes =
g_debug_quiesce_complete_nodes.load(std::memory_order_relaxed);
 
        std::cout << "node_allocations = " << node_allocations << "\n";
        std::cout << "node_deallocations = " << node_deallocations <<
"\n\n";
        std::cout << "dtor_collect = " << dtor_collect << "\n";
        std::cout << "release_collect = " << release_collect << "\n";
        std::cout << "quiesce_complete = " << quiesce_complete << "\n";
        std::cout << "quiesce_begin = " << quiesce_begin << "\n";
        std::cout << "quiesce_complete_nodes = " <<
quiesce_complete_nodes << "\n";
 
        if (node_allocations != node_deallocations)
        {
            std::cout << "OH SHIT! NODE LEAK!!! SHIT! = " <<
node_allocations - node_deallocations << "\n\n";
        }
 
    }
 
    std::cout << "\n\nTest Completed!\n\n";
 
    return 0;
}
 
_______________________________________
 




Date Sujet#  Auteur
5 Dec 24 * portable proxy collector test...16Chris M. Thomasson
6 Dec 24 `* Re: portable proxy collector test...15Brett
6 Dec 24  +- Re: portable proxy collector test...1Chris M. Thomasson
6 Dec 24  +- Re: portable proxy collector test...1Chris M. Thomasson
6 Dec 24  `* Re: portable proxy collector test...12Chris M. Thomasson
6 Dec 24   `* Re: portable proxy collector test...11jseigh
8 Dec 24    +- Re: portable proxy collector test...1Chris M. Thomasson
8 Dec 24    +* Re: portable proxy collector test...2Chris M. Thomasson
8 Dec 24    i`- Re: portable proxy collector test...1Chris M. Thomasson
9 Dec 24    `* Re: portable proxy collector test...7Chris M. Thomasson
9 Dec 24     +* Re: portable proxy collector test...2aph
9 Dec 24     i`- Re: portable proxy collector test...1Chris M. Thomasson
9 Dec 24     `* Re: portable proxy collector test...4jseigh
9 Dec 24      +* Re: portable proxy collector test...2jseigh
9 Dec 24      i`- Re: portable proxy collector test...1Chris M. Thomasson
9 Dec 24      `- Re: portable proxy collector test...1Chris M. Thomasson

Haut de la page

Les messages affichés proviennent d'usenet.

NewsPortal