It's not hard to implement a lock-free stack with DW-CAS,
so no need for futexes. That's my dcas_atomic:
#pragma once
#include <cstdint>
#include <utility>
#include <atomic>
#include <type_traits>
#include <climits>
#include <bit>
#include <memory>
#if defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable: 26495)
#endif
#if defined(__llvm__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Watomic-alignment"
#endif
constexpr int
DCAS_ACQ_REL = (int)std::memory_order::acq_rel,
DCAS_ACQUIRE = (int)std::memory_order::acquire,
DCAS_RELAXED = (int)std::memory_order::relaxed,
DCAS_RELEASE = (int)std::memory_order::release,
DCAS_SEQ_CST = (int)std::memory_order::seq_cst;
template<int Type>
using dcas_type = std::integral_constant<int, Type>;
using dcas_acq_rel = dcas_type<DCAS_ACQ_REL>;
using dcas_acquire = dcas_type<DCAS_ACQUIRE>;
using dcas_relaxed = dcas_type<DCAS_RELAXED>;
using dcas_release = dcas_type<DCAS_RELEASE>;
using dcas_seq_cst = dcas_type<DCAS_SEQ_CST>;
struct dcas_atomic
{
static_assert(sizeof(size_t) == 8 || sizeof(size_t) == 4, "must be 64 or 32 bit architecture");
struct pair_t { size_t first, second; };
dcas_atomic() = default;
dcas_atomic( pair_t const &desired );
std::atomic<size_t> &first() noexcept;
std::atomic<size_t> &second() noexcept;
operator pair_t() noexcept;
template<int Type>
bool compare_exchange( pair_t &expected, pair_t const &desired, dcas_type<Type> = dcas_seq_cst() ) noexcept;
template<int Type>
pair_t load( dcas_type<Type> = dcas_seq_cst() ) const noexcept;
template<int Type>
void store( pair_t const &niu, dcas_type<Type> = dcas_seq_cst() ) noexcept;
private:
#if defined(__GNUC__) || defined(__clang__) && !defined(_MSC_VER)
#if SIZE_WIDTH == 64
using dword_t = unsigned __int128;
#elif SIZE_WIDTH == 32
using dword_t = unsigned long long;
#else
#error unknown architecture
#endif
#endif
union alignas(2 * sizeof(size_t)) U
{
U() {}
static_assert(sizeof(std::atomic<size_t>) == sizeof(size_t), "sizeof(atomic<size_t>) must be == sizeof(size_t)");
std::atomic<size_t> m_atomics[2];
#if defined(_MSC_VER)
#if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
__int64 volatile m_firstAndSecond[2];
#elif defined(_M_IX86)
__int64 volatile m_firstAndSecond;
#else
#error unknown architecture
#endif
#elif defined(__GNUC__) || defined(__clang__)
dword_t volatile m_firstAndSecond;
#endif
} u;
};
inline dcas_atomic::dcas_atomic( pair_t const &desired )
{
u.m_atomics[0].store( desired.first, std::memory_order_relaxed );
u.m_atomics[1].store( desired.second, std::memory_order_relaxed );
}
inline std::atomic<size_t> &dcas_atomic::first() noexcept
{
return u.m_atomics[0];
}
inline std::atomic<size_t> &dcas_atomic::second() noexcept
{
return u.m_atomics[1];
}
inline dcas_atomic::operator pair_t() noexcept
{
return pair_t( first().load( std::memory_order_relaxed ), second().load( std::memory_order_relaxed ) );
}
template<int Type>
inline bool dcas_atomic::compare_exchange( pair_t &expected, pair_t const &desired, dcas_type<Type> ) noexcept
{
using namespace std;
static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE || Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
#if defined(_MSC_VER)
#if defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM64EC)
__int64 expectedA[2];
expectedA[0] = (__int64)expected.first;
expectedA[1] = (__int64)expected.second;
char fRet;
#if defined(_M_X64)
fRet = _InterlockedCompareExchange128( u.m_firstAndSecond, desired.second, desired.first, expectedA );
#else
if constexpr( Type == DCAS_ACQ_REL || Type == DCAS_SEQ_CST )
fRet = _InterlockedCompareExchange128( u.m_firstAndSecond, desired.second, desired.first, expectedA );
else if constexpr( Type == DCAS_ACQUIRE )
fRet = _InterlockedCompareExchange128_acq( u.m_firstAndSecond, desired.second, desired.first, expectedA );
else if constexpr( Type == DCAS_RELAXED )
fRet = _InterlockedCompareExchange128_nf( u.m_firstAndSecond, desired.second, desired.first, expectedA );
else
fRet = _InterlockedCompareExchange128_rel( u.m_firstAndSecond, desired.second, desired.first, expectedA );
#endif
if( fRet )
return true;
expected.first = expectedA[0];
expected.second = expectedA[1];
return false;
#elif defined(_M_IX86)
auto compose = []( pair_t const &p ) -> uint64_t { return p.first | (uint64_t)p.second << 32; };
uint64_t
cDesired = compose( desired ),
cExpected = compose( expected ),
cResult = _InterlockedCompareExchange64( &u.m_firstAndSecond, cDesired, cExpected );
if( cResult == cExpected ) [[likely]]
return true;
expected.first = (uint32_t)cResult;
expected.second = (uint32_t)(cResult >> 32);
return false;
#else
#error unspupported Windows-platform
#endif
#elif defined(__GNUC__) || defined(__clang__)
constexpr auto
pair_t::*FIRST = std::endian::native == std::endian::little ? &pair_t::first : &pair_t::second,
pair_t::*SECOND = std::endian::native == std::endian::little ? &pair_t::second : &pair_t::first;
auto compose = []( pair_t const &p ) -> dword_t { return (dword_t)(p.*FIRST) | (dword_t)(p.*SECOND) << SIZE_WIDTH; };
dword_t
dwExpected = compose( expected ),
dwDesired = compose( desired );
bool ret;
if constexpr( Type == DCAS_ACQ_REL )
ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected, dwDesired, true, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED );
else if constexpr( Type == DCAS_ACQUIRE )
ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected, dwDesired, true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED );
else if constexpr( Type == DCAS_RELAXED )
ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected, dwDesired, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED );
else if constexpr( Type == DCAS_RELEASE )
ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected, dwDesired, true, __ATOMIC_RELEASE, __ATOMIC_RELAXED );
else
ret = __atomic_compare_exchange_n( &u.m_firstAndSecond, &dwExpected, dwDesired, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED );
if( ret ) [[likely]]
return true;
expected.*FIRST = (size_t)dwExpected;
expected.*SECOND = (size_t)(dwExpected >> SIZE_WIDTH);
return false;
#else
#error unspupported platform
#endif
}
template<int Type>
inline typename dcas_atomic::pair_t dcas_atomic::load( dcas_type<Type> ) const noexcept
{
using namespace std;
static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE || Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
pair_t cmp( 0, 0 );
const_cast<dcas_atomic *>(this)->compare_exchange( cmp, cmp, dcas_type<Type>() );
return cmp;
}
template<int Type>
inline void dcas_atomic::store( pair_t const &niu, dcas_type<Type> ) noexcept
{
using namespace std;
static_assert(Type == DCAS_ACQ_REL || Type == DCAS_ACQUIRE || Type == DCAS_RELAXED || Type == DCAS_RELEASE || Type == DCAS_SEQ_CST);
pair_t cmp( *this );
while( !this->compare_exchange( cmp, niu, dcas_type<Type>() ) );
}
#if defined(_MSC_VER)
#pragma warning(pop)
#endif
#if defined(__llvm__)
#pragma clang diagnostic pop
#endif
Am 18.02.2025 um 00:17 schrieb Chris M. Thomasson:
This is a little C++20 test using a futex to allow one to wait on a lock-free stack. The main stack logic is in struct ct_stack. Well, can you get to compile and run? Thanks...
I still need to check my algorithm out in Relacy. I think it might be able to model futex.
My code:
______________________________________
#include <iostream>
#include <thread>
#include <atomic>
#include <algorithm>
#include <cassert>
#define CT_THREAD_N (42)
#define CT_WORK_N (10000000)
#define CT_WAIT ((ct_node*)0xDEADBEEF)
struct ct_node
{
ct_node* m_next = { nullptr };
ct_node()
{
//std::cout << "(" << this << ")::ct_node::ct_node()\n";
}
~ct_node()
{
//std::cout << "(" << this << ")::ct_node::~ct_node()\n";
}
};
struct ct_stack
{
std::atomic<ct_node*> m_head = { nullptr };
void
push(
ct_node* node
) {
ct_node* head = m_head.load(std::memory_order_relaxed);
do
{
if (head == CT_WAIT)
{
node->m_next = nullptr;
}
else
{
node->m_next = head;
}
} while (! m_head.compare_exchange_weak(
head,
node,
std::memory_order_release)
);
if (head == CT_WAIT)
{
// slow path...
m_head.notify_one();
}
}
ct_node*
flush_wait()
{
ct_node* head = nullptr;
for (;;)
{
head = m_head.exchange(nullptr, std::memory_order_acquire);
while (! head || head == CT_WAIT)
{
// slow path...
head = m_head.exchange(CT_WAIT, std::memory_order_acquire);
if (! head || head == CT_WAIT)
{
m_head.wait(CT_WAIT, std::memory_order_relaxed);
continue;
}
}
break;
}
assert(head && head != CT_WAIT);
return head;
}
};
struct ct_work : public ct_node
{
unsigned long m_state = { 0 };
};
struct ct_shared
{
ct_stack m_stack_in;
ct_stack m_stack_out;
ct_shared()
{
std::cout << "ct_shared::ct_shared()\n" << std::endl;
}
~ct_shared()
{
assert(! m_stack_in.m_head || m_stack_in.m_head == CT_WAIT);
assert(! m_stack_out.m_head || m_stack_out.m_head == CT_WAIT);
std::cout << "ct_shared::~ct_shared()\n" << std::endl;
}
};
void
ct_thread(
ct_shared& shared
) {
unsigned long state = 0;
while (! state)
{
ct_work* head = (ct_work*)shared.m_stack_in.flush_wait();
while (head)
{
ct_work* next = (ct_work*)head->m_next;
if (head->m_state == 666)
{
// Shutdown detected...
state = 1;
shared.m_stack_in.push(head);
}
else
{
shared.m_stack_out.push(head);
}
head = next;
}
}
//std::cout << "shutdown..." << std::endl;
}
int
main()
{
{
std::cout << "Futex Stack Test\n";
std::cout << "by: Chris M. Thomasson\n";
std::cout << "version: (0.0.1)\n";
std::cout << "_________________________________\n" << std::endl;
}
unsigned long g_ct_work_alloations = 0;
unsigned long g_ct_work_dealloations = 0;
{
std::cout << "CT_THREAD_N = " << CT_THREAD_N << std::endl;
std::cout << "CT_WORK_N = " << CT_WORK_N << std::endl;
std::cout << "CT_WAIT = " << CT_WAIT << std::endl;
}
{
ct_shared shared = { };
std::thread threads[CT_THREAD_N];
std::cout << "\nLaunching threads...\n" << std::endl;
for (unsigned long i = 0; i < CT_THREAD_N; ++i)
{
threads[i] = std::thread(ct_thread, std::ref(shared));
}
//std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "\nGenerate work...\n" << std::endl;
// Generate work...
{
for (unsigned long i = 0; i < CT_WORK_N; ++i)
{
shared.m_stack_in.push(new ct_work);
++g_ct_work_alloations;
}
}
// Wait for work...
{
unsigned long wn = 0;
while (wn < CT_WORK_N)
{
ct_work* head = (ct_work*)shared.m_stack_out.flush_wait();
while (head)
{
ct_work* next = (ct_work*)head->m_next;
delete head;
++g_ct_work_dealloations;
++wn;
head = next;
}
}
}
std::cout << "\nCompleted all work!\n" << std::endl;
std::cout << "Sending shutdown state...\n" << std::endl;
// Send shutdown state...
{
for (unsigned long i = 0; i < CT_THREAD_N; ++i)
{
ct_work* w = new ct_work;
++g_ct_work_alloations;
w->m_state = 666;
shared.m_stack_in.push(w);
}
}
// Join threads...
{
for (unsigned long i = 0; i < CT_THREAD_N; ++i)
{
threads[i].join();
}
}
std::cout << "\nThreads joined...\n" << std::endl;
// Dump shutdown state...
std::cout << "Dump shutdown state...\n" << std::endl;
{
ct_work* head = (ct_work*)shared.m_stack_in.m_head.load(std::memory_order_relaxed);
shared.m_stack_in.m_head = nullptr;
while (head)
{
ct_work* next = (ct_work*)head->m_next;
delete head;
++g_ct_work_dealloations;
head = next;
}
}
std::cout << "\nShutdown complete!\n" << std::endl;
}
// Sanity check...
{
std::cout << "g_ct_work_alloations = " << g_ct_work_alloations << "\n";
std::cout << "g_ct_work_dealloations = " << g_ct_work_dealloations << std::endl;
if (g_ct_work_alloations != g_ct_work_dealloations)
{
std::cout << "Pardon my French, but shit!!!!!!!!!!!!!! NOT GOOD\n" << std::endl;
std::cout << "DAMN IT!!!! Grrrrr\n" << std::endl;
}
}
std::cout << "\nFin!\n" << std::endl;
return 0;
}
______________________________________
My output:
_______________________
Futex Stack Test
by: Chris M. Thomasson
version: (0.0.1)
_________________________________
CT_THREAD_N = 42
CT_WORK_N = 10000000
CT_WAIT = 00000000DEADBEEF
ct_shared::ct_shared()
Launching threads...
Generate work...
Completed all work!
Sending shutdown state...
Threads joined...
Dump shutdown state...
Shutdown complete!
ct_shared::~ct_shared()
g_ct_work_alloations = 10000042
g_ct_work_dealloations = 10000042
Fin!
_______________________
Well, any luck?