Sujet : Futex Stack Test...
De : chris.m.thomasson.1 (at) *nospam* gmail.com (Chris M. Thomasson)
Groupes : comp.lang.c++Date : 18. Feb 2025, 00:17:46
Autres entêtes
Organisation : A noiseless patient Spider
Message-ID : <vp0g2q$1bu96$1@dont-email.me>
User-Agent : Mozilla Thunderbird
This is a little C++20 test using a futex to allow one to wait on a lock-free stack. The main stack logic is in struct ct_stack. Well, can you get to compile and run? Thanks...
I still need to check my algorithm out in Relacy. I think it might be able to model futex.
My code:
______________________________________
#include <iostream>
#include <thread>
#include <atomic>
#include <algorithm>
#include <cassert>
#define CT_THREAD_N (42)
#define CT_WORK_N (10000000)
#define CT_WAIT ((ct_node*)0xDEADBEEF)
struct ct_node
{
ct_node* m_next = { nullptr };
ct_node()
{
//std::cout << "(" << this << ")::ct_node::ct_node()\n";
}
~ct_node()
{
//std::cout << "(" << this << ")::ct_node::~ct_node()\n";
}
};
struct ct_stack
{
std::atomic<ct_node*> m_head = { nullptr };
void
push(
ct_node* node
) {
ct_node* head = m_head.load(std::memory_order_relaxed);
do
{
if (head == CT_WAIT)
{
node->m_next = nullptr;
}
else
{
node->m_next = head;
}
} while (! m_head.compare_exchange_weak(
head,
node,
std::memory_order_release)
);
if (head == CT_WAIT)
{
// slow path...
m_head.notify_one();
}
}
ct_node*
flush_wait()
{
ct_node* head = nullptr;
for (;;)
{
head = m_head.exchange(nullptr, std::memory_order_acquire);
while (! head || head == CT_WAIT)
{
// slow path...
head = m_head.exchange(CT_WAIT, std::memory_order_acquire);
if (! head || head == CT_WAIT)
{
m_head.wait(CT_WAIT, std::memory_order_relaxed);
continue;
}
}
break;
}
assert(head && head != CT_WAIT);
return head;
}
};
struct ct_work : public ct_node
{
unsigned long m_state = { 0 };
};
struct ct_shared
{
ct_stack m_stack_in;
ct_stack m_stack_out;
ct_shared()
{
std::cout << "ct_shared::ct_shared()\n" << std::endl;
}
~ct_shared()
{
assert(! m_stack_in.m_head || m_stack_in.m_head == CT_WAIT);
assert(! m_stack_out.m_head || m_stack_out.m_head == CT_WAIT);
std::cout << "ct_shared::~ct_shared()\n" << std::endl;
}
};
void
ct_thread(
ct_shared& shared
) {
unsigned long state = 0;
while (! state)
{
ct_work* head = (ct_work*)shared.m_stack_in.flush_wait();
while (head)
{
ct_work* next = (ct_work*)head->m_next;
if (head->m_state == 666)
{
// Shutdown detected...
state = 1;
shared.m_stack_in.push(head);
}
else
{
shared.m_stack_out.push(head);
}
head = next;
}
}
//std::cout << "shutdown..." << std::endl;
}
int
main()
{
{
std::cout << "Futex Stack Test\n";
std::cout << "by: Chris M. Thomasson\n";
std::cout << "version: (0.0.1)\n";
std::cout << "_________________________________\n" << std::endl;
}
unsigned long g_ct_work_alloations = 0;
unsigned long g_ct_work_dealloations = 0;
{
std::cout << "CT_THREAD_N = " << CT_THREAD_N << std::endl;
std::cout << "CT_WORK_N = " << CT_WORK_N << std::endl;
std::cout << "CT_WAIT = " << CT_WAIT << std::endl;
}
{
ct_shared shared = { };
std::thread threads[CT_THREAD_N];
std::cout << "\nLaunching threads...\n" << std::endl;
for (unsigned long i = 0; i < CT_THREAD_N; ++i)
{
threads[i] = std::thread(ct_thread, std::ref(shared));
}
//std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "\nGenerate work...\n" << std::endl;
// Generate work...
{
for (unsigned long i = 0; i < CT_WORK_N; ++i)
{
shared.m_stack_in.push(new ct_work);
++g_ct_work_alloations;
}
}
// Wait for work...
{
unsigned long wn = 0;
while (wn < CT_WORK_N)
{
ct_work* head = (ct_work*)shared.m_stack_out.flush_wait();
while (head)
{
ct_work* next = (ct_work*)head->m_next;
delete head;
++g_ct_work_dealloations;
++wn;
head = next;
}
}
}
std::cout << "\nCompleted all work!\n" << std::endl;
std::cout << "Sending shutdown state...\n" << std::endl;
// Send shutdown state...
{
for (unsigned long i = 0; i < CT_THREAD_N; ++i)
{
ct_work* w = new ct_work;
++g_ct_work_alloations;
w->m_state = 666;
shared.m_stack_in.push(w);
}
}
// Join threads...
{
for (unsigned long i = 0; i < CT_THREAD_N; ++i)
{
threads[i].join();
}
}
std::cout << "\nThreads joined...\n" << std::endl;
// Dump shutdown state...
std::cout << "Dump shutdown state...\n" << std::endl;
{
ct_work* head = (ct_work*)shared.m_stack_in.m_head.load(std::memory_order_relaxed);
shared.m_stack_in.m_head = nullptr;
while (head)
{
ct_work* next = (ct_work*)head->m_next;
delete head;
++g_ct_work_dealloations;
head = next;
}
}
std::cout << "\nShutdown complete!\n" << std::endl;
}
// Sanity check...
{
std::cout << "g_ct_work_alloations = " << g_ct_work_alloations << "\n";
std::cout << "g_ct_work_dealloations = " << g_ct_work_dealloations << std::endl;
if (g_ct_work_alloations != g_ct_work_dealloations)
{
std::cout << "Pardon my French, but shit!!!!!!!!!!!!!! NOT GOOD\n" << std::endl;
std::cout << "DAMN IT!!!! Grrrrr\n" << std::endl;
}
}
std::cout << "\nFin!\n" << std::endl;
return 0;
}
______________________________________
My output:
_______________________
Futex Stack Test
by: Chris M. Thomasson
version: (0.0.1)
_________________________________
CT_THREAD_N = 42
CT_WORK_N = 10000000
CT_WAIT = 00000000DEADBEEF
ct_shared::ct_shared()
Launching threads...
Generate work...
Completed all work!
Sending shutdown state...
Threads joined...
Dump shutdown state...
Shutdown complete!
ct_shared::~ct_shared()
g_ct_work_alloations = 10000042
g_ct_work_dealloations = 10000042
Fin!
_______________________
Well, any luck?