@ -1,10 +1,7 @@
// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se>
// SPDX-FileCopyrightText: Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se>
// SPDX-License-Identifier: MIT
// SPDX-License-Identifier: MIT
# pragma once
# pragma once
# ifdef _MSC_VER
# pragma warning(push)
# pragma warning(disable : 4324)
# endif
# include <atomic>
# include <atomic>
# include <bit>
# include <bit>
@ -12,105 +9,63 @@
# include <memory>
# include <memory>
# include <mutex>
# include <mutex>
# include <new>
# include <new>
# include <stdexcept>
# include <stop_token>
# include <stop_token>
# include <type_traits>
# include <type_traits>
# include <utility>
# include <utility>
namespace Common {
namespace Common {
namespace mpsc {
# if defined(__cpp_lib_hardware_interference_size)
# if defined(__cpp_lib_hardware_interference_size)
constexpr size_t hardware_interference_size = std : : hardware_destructive_interference_size ;
constexpr size_t hardware_interference_size = std : : hardware_destructive_interference_size ;
# else
# else
constexpr size_t hardware_interference_size = 64 ;
constexpr size_t hardware_interference_size = 64 ;
# endif
# endif
template < typename T >
# ifdef _MSC_VER
using AlignedAllocator = std : : allocator < T > ;
# pragma warning(push)
# pragma warning(disable : 4324)
# endif
template < typename T >
template < typename T , size_t capacity = 0x400 >
struct Slot {
class MPSCQueue {
~ Slot ( ) noexcept {
if ( turn . test ( ) ) {
destroy ( ) ;
}
}
template < typename . . . Args >
void construct ( Args & & . . . args ) noexcept {
static_assert ( std : : is_nothrow_constructible_v < T , Args & & . . . > ,
" T must be nothrow constructible with Args&&... " ) ;
std : : construct_at ( reinterpret_cast < T * > ( & storage ) , std : : forward < Args > ( args ) . . . ) ;
}
void destroy ( ) noexcept {
static_assert ( std : : is_nothrow_destructible_v < T > , " T must be nothrow destructible " ) ;
std : : destroy_at ( reinterpret_cast < T * > ( & storage ) ) ;
}
T & & move ( ) noexcept {
return reinterpret_cast < T & & > ( storage ) ;
}
// Align to avoid false sharing between adjacent slots
alignas ( hardware_interference_size ) std : : atomic_flag turn { } ;
struct aligned_store {
struct type {
alignas ( T ) unsigned char data [ sizeof ( T ) ] ;
} ;
} ;
typename aligned_store : : type storage ;
} ;
template < typename T , typename Allocator = AlignedAllocator < Slot < T > > >
class Queue {
public :
public :
explicit Queue ( const size_t capacity , const Allocator & allocator = Allocator ( ) )
explicit MPSCQueue ( ) : allocator { std : : allocator < Slot < T > > ( ) } {
: allocator_ ( allocator ) {
if ( capacity < 1 ) {
throw std : : invalid_argument ( " capacity < 1 " ) ;
}
// Ensure that the queue length is an integer power of 2
// This is so that idx(i) can be a simple i & mask_ insted of i % capacity
// https://github.com/rigtorp/MPMCQueue/pull/36
if ( ! std : : has_single_bit ( capacity ) ) {
throw std : : invalid_argument ( " capacity must be an integer power of 2 " ) ;
}
mask_ = capacity - 1 ;
// Allocate one extra slot to prevent false sharing on the last slot
// Allocate one extra slot to prevent false sharing on the last slot
slots _ = allocator_ . allocate ( mask_ + 2 ) ;
slots = allocator . allocate ( capacity + 1 ) ;
// Allocators are not required to honor alignment for over-aligned types
// Allocators are not required to honor alignment for over-aligned types
// (see http://eel.is/c++draft/allocator.requirements#10) so we verify
// (see http://eel.is/c++draft/allocator.requirements#10) so we verify
// alignment here
// alignment here
if ( reinterpret_cast < uintptr_t > ( slots _ ) % alignof ( Slot < T > ) ! = 0 ) {
if ( reinterpret_cast < uintptr_t > ( slots ) % alignof ( Slot < T > ) ! = 0 ) {
allocator _. deallocate ( slots_ , mask_ + 2 ) ;
allocator . deallocate ( slots , capacity + 1 ) ;
throw std : : bad_alloc ( ) ;
throw std : : bad_alloc ( ) ;
}
}
for ( size_t i = 0 ; i < mask_ + 1 ; + + i ) {
for ( size_t i = 0 ; i < capacity ; + + i ) {
std : : construct_at ( & slots _ [ i ] ) ;
std : : construct_at ( & slots [ i ] ) ;
}
}
static_assert ( std : : has_single_bit ( capacity ) , " capacity must be an integer power of 2 " ) ;
static_assert ( alignof ( Slot < T > ) = = hardware_interference_size ,
static_assert ( alignof ( Slot < T > ) = = hardware_interference_size ,
" Slot must be aligned to cache line boundary to prevent false sharing " ) ;
" Slot must be aligned to cache line boundary to prevent false sharing " ) ;
static_assert ( sizeof ( Slot < T > ) % hardware_interference_size = = 0 ,
static_assert ( sizeof ( Slot < T > ) % hardware_interference_size = = 0 ,
" Slot size must be a multiple of cache line size to prevent "
" Slot size must be a multiple of cache line size to prevent "
" false sharing between adjacent slots " ) ;
" false sharing between adjacent slots " ) ;
static_assert ( sizeof ( Queue) % hardware_interference_size = = 0 ,
static_assert ( sizeof ( MPSCQueue ) % hardware_interference_size = = 0 ,
" Queue size must be a multiple of cache line size to "
" Queue size must be a multiple of cache line size to "
" prevent false sharing between adjacent queues " ) ;
" prevent false sharing between adjacent queues " ) ;
}
}
~ Queue( ) noexcept {
~ MPSCQueue ( ) noexcept {
for ( size_t i = 0 ; i < mask_ + 1 ; + + i ) {
for ( size_t i = 0 ; i < capacity ; + + i ) {
s lots_[ i ] . ~ Slot ( ) ;
std : : destroy_at ( & slots [ i ] ) ;
}
}
allocator _. deallocate ( slots_ , mask_ + 2 ) ;
allocator . deallocate ( slots , capacity + 1 ) ;
}
}
// non-copyable and non-movable
// The queue must be both non-copyable and non-movable
Queue ( const Queue & ) = delete ;
MPSCQueue ( const MPSCQueue & ) = delete ;
Queue & operator = ( const Queue & ) = delete ;
MPSCQueue & operator = ( const MPSCQueue & ) = delete ;
MPSCQueue ( MPSCQueue & & ) = delete ;
MPSCQueue & operator = ( MPSCQueue & & ) = delete ;
void Push ( const T & v ) noexcept {
void Push ( const T & v ) noexcept {
static_assert ( std : : is_nothrow_copy_constructible_v < T > ,
static_assert ( std : : is_nothrow_copy_constructible_v < T > ,
@ -125,8 +80,8 @@ public:
void Pop ( T & v , std : : stop_token stop ) noexcept {
void Pop ( T & v , std : : stop_token stop ) noexcept {
auto const tail = tail_ . fetch_add ( 1 ) ;
auto const tail = tail_ . fetch_add ( 1 ) ;
auto & slot = slots _ [ idx ( tail ) ] ;
auto & slot = slots [ idx ( tail ) ] ;
if ( false = = slot . turn . test ( ) ) {
if ( ! slot . turn . test ( ) ) {
std : : unique_lock lock { cv_mutex } ;
std : : unique_lock lock { cv_mutex } ;
cv . wait ( lock , stop , [ & slot ] { return slot . turn . test ( ) ; } ) ;
cv . wait ( lock , stop , [ & slot ] { return slot . turn . test ( ) ; } ) ;
}
}
@ -137,12 +92,46 @@ public:
}
}
private :
private :
template < typename U = T >
struct Slot {
~ Slot ( ) noexcept {
if ( turn . test ( ) ) {
destroy ( ) ;
}
}
template < typename . . . Args >
void construct ( Args & & . . . args ) noexcept {
static_assert ( std : : is_nothrow_constructible_v < U , Args & & . . . > ,
" T must be nothrow constructible with Args&&... " ) ;
std : : construct_at ( reinterpret_cast < U * > ( & storage ) , std : : forward < Args > ( args ) . . . ) ;
}
void destroy ( ) noexcept {
static_assert ( std : : is_nothrow_destructible_v < U > , " T must be nothrow destructible " ) ;
std : : destroy_at ( reinterpret_cast < U * > ( & storage ) ) ;
}
U & & move ( ) noexcept {
return reinterpret_cast < U & & > ( storage ) ;
}
// Align to avoid false sharing between adjacent slots
alignas ( hardware_interference_size ) std : : atomic_flag turn { } ;
struct aligned_store {
struct type {
alignas ( U ) unsigned char data [ sizeof ( U ) ] ;
} ;
} ;
typename aligned_store : : type storage ;
} ;
template < typename . . . Args >
template < typename . . . Args >
void emplace ( Args & & . . . args ) noexcept {
void emplace ( Args & & . . . args ) noexcept {
static_assert ( std : : is_nothrow_constructible_v < T , Args & & . . . > ,
static_assert ( std : : is_nothrow_constructible_v < T , Args & & . . . > ,
" T must be nothrow constructible with Args&&... " ) ;
" T must be nothrow constructible with Args&&... " ) ;
auto const head = head_ . fetch_add ( 1 ) ;
auto const head = head_ . fetch_add ( 1 ) ;
auto & slot = slots_ [ idx ( head ) ] ;
auto & slot = slots [ idx ( head ) ] ;
slot . turn . wait ( true ) ;
slot . turn . wait ( true ) ;
slot . construct ( std : : forward < Args > ( args ) . . . ) ;
slot . construct ( std : : forward < Args > ( args ) . . . ) ;
slot . turn . test_and_set ( ) ;
slot . turn . test_and_set ( ) ;
@ -150,31 +139,29 @@ private:
}
}
constexpr size_t idx ( size_t i ) const noexcept {
constexpr size_t idx ( size_t i ) const noexcept {
return i & mask _ ;
return i & mask ;
}
}
std : : conditional_t < true , std : : condition_variable_any , std : : condition_variable > cv ;
static constexpr size_t mask = capacity - 1 ;
std : : mutex cv_mutex ;
size_t mask_ ;
Slot < T > * slots_ ;
[[no_unique_address]] Allocator allocator_ ;
// Align to avoid false sharing between head_ and tail_
// Align to avoid false sharing between head_ and tail_
alignas ( hardware_interference_size ) std : : atomic < size_t > head_ { 0 } ;
alignas ( hardware_interference_size ) std : : atomic < size_t > head_ { 0 } ;
alignas ( hardware_interference_size ) std : : atomic < size_t > tail_ { 0 } ;
alignas ( hardware_interference_size ) std : : atomic < size_t > tail_ { 0 } ;
std : : mutex cv_mutex ;
std : : condition_variable_any cv ;
Slot < T > * slots ;
[[no_unique_address]] std : : allocator < Slot < T > > allocator ;
static_assert ( std : : is_nothrow_copy_assignable_v < T > | | std : : is_nothrow_move_assignable_v < T > ,
static_assert ( std : : is_nothrow_copy_assignable_v < T > | | std : : is_nothrow_move_assignable_v < T > ,
" T must be nothrow copy or move assignable " ) ;
" T must be nothrow copy or move assignable " ) ;
static_assert ( std : : is_nothrow_destructible_v < T > , " T must be nothrow destructible " ) ;
static_assert ( std : : is_nothrow_destructible_v < T > , " T must be nothrow destructible " ) ;
} ;
} ;
} // namespace mpsc
template < typename T , typename Allocator = mpsc : : AlignedAllocator < mpsc : : Slot < T > > >
using MPSCQueue = mpsc : : Queue < T , Allocator > ;
} // namespace Common
# ifdef _MSC_VER
# ifdef _MSC_VER
# pragma warning(pop)
# pragma warning(pop)
# endif
# endif
} // namespace Common