diff --git a/README.md b/README.md index 533e4ae..ad5b0ad 100644 --- a/README.md +++ b/README.md @@ -22,9 +22,9 @@ auto val = c.get(); ```cpp // MPSC -fastchan::MPSC c; +fastchan::MPSC c; // OR -fastchan::MPSC c; +fastchan::MPSC c; c.put(0); c.put(1); diff --git a/bench/fastchan_bench.cpp b/bench/fastchan_bench.cpp index 4d43148..b235158 100644 --- a/bench/fastchan_bench.cpp +++ b/bench/fastchan_bench.cpp @@ -69,9 +69,9 @@ BENCHMARK_TEMPLATE(BoostSPSC_Get, 65'536); BENCHMARK_TEMPLATE(BoostSPSC_Get, 262'144); BENCHMARK_TEMPLATE(BoostSPSC_Get, 1'048'576); -template +template static void MPSC_BlockingBoth_Put(benchmark::State& state) { - fastchan::MPSC c; + fastchan::MPSC c; std::atomic_bool shouldRunWriter = true; std::atomic_bool shouldRunReader = true; std::atomic stoppedWriters = 0; @@ -114,99 +114,99 @@ static void MPSC_BlockingBoth_Put(benchmark::State& state) { reader.join(); } -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 1, fastchan::WaitYield); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 2, fastchan::WaitYield); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 5, fastchan::WaitYield); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 1, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 1, fastchan::WaitPause); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 2, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 2, fastchan::WaitPause); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 5, fastchan::WaitPause); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 5, fastchan::WaitPause); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 1, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 1, fastchan::WaitCondition); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 2, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 2, fastchan::WaitCondition); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 5, fastchan::WaitCondition); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 5, fastchan::WaitCondition); - -template +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 1, fastchan::YieldWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 2, fastchan::YieldWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 5, fastchan::YieldWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 1, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 1, fastchan::PauseWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 2, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 2, fastchan::PauseWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 5, fastchan::PauseWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 5, fastchan::PauseWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 1, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 1, fastchan::CVWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 2, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 2, fastchan::CVWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 64, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 256, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1024, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 4096, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 16'384, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 65'536, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 262'144, 5, fastchan::CVWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Put, 1'048'576, 5, fastchan::CVWaitStrategy); + +template static void MPSC_BlockingBoth_Get(benchmark::State& state) { - fastchan::MPSC c; + fastchan::MPSC c; std::atomic_bool shouldRunWriter = true; std::atomic_bool shouldRunReader = true; std::atomic stoppedWriters = 0; @@ -248,39 +248,39 @@ static void MPSC_BlockingBoth_Get(benchmark::State& state) { reader.join(); } -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 64, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 256, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1024, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 4096, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16'384, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 65'536, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 262'144, 1, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1'048'576, 1, fastchan::WaitYield); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 64, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 256, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1024, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 4096, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16'384, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 65'536, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 262'144, 2, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1'048'576, 2, fastchan::WaitYield); - -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 64, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 256, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1024, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 4096, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16'384, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 65'536, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 262'144, 5, fastchan::WaitYield); -BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1'048'576, 5, fastchan::WaitYield); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 64, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 256, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1024, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 4096, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16'384, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 65'536, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 262'144, 1, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1'048'576, 1, fastchan::YieldWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 64, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 256, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1024, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 4096, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16'384, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 65'536, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 262'144, 2, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1'048'576, 2, fastchan::YieldWaitStrategy); + +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 64, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 256, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1024, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 4096, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 16'384, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 65'536, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 262'144, 5, fastchan::YieldWaitStrategy); +BENCHMARK_TEMPLATE(MPSC_BlockingBoth_Get, 1'048'576, 5, fastchan::YieldWaitStrategy); template static void MPSC_NonBlockingGet_Put(benchmark::State& state) { - fastchan::MPSC c; + fastchan::MPSC c; std::atomic_bool shouldRunWriter = true; std::atomic_bool shouldRunReader = true; std::atomic stoppedWriters = 0; @@ -352,7 +352,7 @@ BENCHMARK_TEMPLATE(MPSC_NonBlockingGet_Put, 1'048'576, 5); template static void MPSC_NonBlockingGet_Get(benchmark::State& state) { - fastchan::MPSC c; + fastchan::MPSC c; std::atomic_bool shouldRunWriter = true; std::atomic_bool shouldRunReader = true; std::atomic stoppedWriters = 0; @@ -426,7 +426,7 @@ BENCHMARK_TEMPLATE(MPSC_NonBlockingGet_Get, 1'048'576, 5); template static void MPSC_NonBlockingBoth_Put(benchmark::State& state) { - fastchan::MPSC c; + fastchan::MPSC c; std::atomic_bool shouldRun = true; std::thread reader([&]() { while (shouldRun) { @@ -520,7 +520,7 @@ BENCHMARK_TEMPLATE(MPSC_NonBlockingBoth_Put, 1'048'576, 5, 100); template static void MPSC_NonBlockingBoth_Get(benchmark::State& state) { - fastchan::MPSC c; + fastchan::MPSC c; std::atomic_bool shouldRun = true; std::array producers; for (auto i = 0; i < num_producers; ++i) { diff --git a/include/common.hpp b/include/common.hpp index c7e449e..44e1be4 100644 --- a/include/common.hpp +++ b/include/common.hpp @@ -19,20 +19,6 @@ inline void cpu_pause() { #endif } -enum BlockingType { - BlockingPutBlockingGet, - BlockingPutNonBlockingGet, - NonBlockingPutBlockingGet, - NonBlockingPutNonBlockingGet, -}; - -enum WaitType { - WaitPause, - WaitYield, - WaitCondition, - WaitNoOp, -}; - constexpr size_t roundUpNextPowerOfTwo(size_t v) { v--; for (size_t i = 1; i < sizeof(v) * CHAR_BIT; i *= 2) { diff --git a/include/mpsc.hpp b/include/mpsc.hpp index a49a6d4..3a70fd9 100644 --- a/include/mpsc.hpp +++ b/include/mpsc.hpp @@ -6,115 +6,105 @@ #include #include "common.hpp" +#include "wait_strategy.hpp" namespace fastchan { -template +template class MPSC { public: - using put_t = typename std::conditional<(blocking_type == BlockingPutBlockingGet || blocking_type == BlockingPutNonBlockingGet), void, bool>::type; - using get_t = typename std::conditional<(blocking_type == BlockingPutBlockingGet || blocking_type == NonBlockingPutBlockingGet), T, std::optional>::type; + using put_t = typename std::conditional::value, void, bool>::type; + using get_t = typename std::conditional::value, T, std::optional>::type; MPSC() = default; put_t put(const T &value) noexcept { auto write_index = next_free_index_.load(std::memory_order_acquire); do { - while (write_index > (reader_index_.load(std::memory_order_relaxed) + index_mask_)) { - if constexpr (blocking_type == BlockingPutBlockingGet || blocking_type == BlockingPutNonBlockingGet) { - if constexpr (wait_type == WaitYield) { - std::this_thread::yield(); - } else if constexpr (wait_type == WaitCondition) { - std::unique_lock lock(put_mutex_); - put_cv_.wait(lock, [this, write_index] { return write_index <= (reader_index_.load(std::memory_order_relaxed) + index_mask_); }); - } else if constexpr (wait_type == WaitPause) { - cpu_pause(); - } - } else { + while (write_index > (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_)) { + if constexpr (std::is_same::value) { return false; + } else { + common_.put_wait_.wait( + [this, write_index] { return write_index <= (consumer_.reader_index_.load(std::memory_order_relaxed) + common_.index_mask_); }); } } } while (!next_free_index_.compare_exchange_strong(write_index, write_index + 1, std::memory_order_acq_rel, std::memory_order_acquire)); - contents_[write_index & index_mask_] = value; + contents_[write_index & common_.index_mask_] = value; // commit in the correct order to avoid problems while (last_committed_index_.load(std::memory_order_relaxed) != write_index) { - if constexpr (wait_type == WaitYield) { - std::this_thread::yield(); - } else if constexpr (wait_type == WaitPause) { - cpu_pause(); - } + // we don't return at this point even in case of ReturnImmediatelyStrategy as we've already taken the token + common_.put_wait_.wait([this, write_index] { return last_committed_index_.load(std::memory_order_relaxed) == write_index; }); } last_committed_index_.store(++write_index, std::memory_order_release); - if constexpr (wait_type == WaitCondition) { - std::lock_guard lock(get_mutex_); - get_cv_.notify_one(); - } + common_.get_wait_.notify(); + common_.put_wait_.notify(); - if constexpr (blocking_type != BlockingPutBlockingGet && blocking_type != BlockingPutNonBlockingGet) { + if constexpr (std::is_same::value) { return true; } } get_t get() noexcept { - while (reader_index_2_ >= last_committed_index_.load(std::memory_order_relaxed)) { - if constexpr (blocking_type == BlockingPutBlockingGet || blocking_type == NonBlockingPutBlockingGet) { - if constexpr (wait_type == WaitYield) { - std::this_thread::yield(); - } else if constexpr (wait_type == WaitCondition) { - std::unique_lock lock(get_mutex_); - get_cv_.wait(lock, [this] { return reader_index_2_ < last_committed_index_.load(std::memory_order_relaxed); }); - } else if constexpr (wait_type == WaitPause) { - cpu_pause(); - } - } else { + while (consumer_.reader_index_2_ >= last_committed_index_.load(std::memory_order_relaxed)) { + if constexpr (std::is_same::value) { return std::nullopt; + } else { + common_.get_wait_.wait([this] { return consumer_.reader_index_2_ < last_committed_index_.load(std::memory_order_relaxed); }); } } - auto contents = contents_[reader_index_2_ & index_mask_]; - reader_index_.store(++reader_index_2_, std::memory_order_release); + auto contents = contents_[consumer_.reader_index_2_ & common_.index_mask_]; + consumer_.reader_index_.store(++consumer_.reader_index_2_, std::memory_order_release); + + common_.put_wait_.notify(); - if constexpr (wait_type == WaitCondition) { - std::lock_guard lock(put_mutex_); - put_cv_.notify_one(); - } return contents; } void empty() noexcept { - reader_index_2_ = 0; + consumer_.reader_index_2_ = 0; next_free_index_.store(0, std::memory_order_release); last_committed_index_.store(0, std::memory_order_release); - reader_index_.store(0, std::memory_order_release); + consumer_.reader_index_.store(0, std::memory_order_release); } - std::size_t size() const noexcept { return last_committed_index_.load(std::memory_order_acquire) - reader_index_.load(std::memory_order_acquire); } + std::size_t size() const noexcept { + return last_committed_index_.load(std::memory_order_acquire) - consumer_.reader_index_.load(std::memory_order_acquire); + } - bool isEmpty() const noexcept { return reader_index_.load(std::memory_order_acquire) >= last_committed_index_.load(std::memory_order_acquire); } + bool isEmpty() const noexcept { return consumer_.reader_index_.load(std::memory_order_acquire) >= last_committed_index_.load(std::memory_order_acquire); } bool isFull() const noexcept { // this isFull is about whether there's all writer slots to the buffer are taken rather than whether those // changes have actually been committed - return next_free_index_.load(std::memory_order_acquire) > (reader_index_.load(std::memory_order_acquire) + index_mask_); + return next_free_index_.load(std::memory_order_acquire) > (consumer_.reader_index_.load(std::memory_order_acquire) + common_.index_mask_); } private: - const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1; - alignas(64) std::size_t reader_index_2_{0}; - alignas(64) std::atomic reader_index_{0}; + std::array contents_; + alignas(64) std::atomic next_free_index_{0}; alignas(64) std::atomic last_committed_index_{0}; - alignas(64) std::condition_variable put_cv_; - alignas(64) std::condition_variable get_cv_; - alignas(64) std::mutex put_mutex_; - alignas(64) std::mutex get_mutex_; + struct alignas(64) Common { + GetWaitStrategy get_wait_{}; + PutWaitStrategy put_wait_{}; + const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1; + }; + + struct alignas(64) Consumer { + std::size_t next_free_index_cache_{0}; + std::size_t reader_index_2_{0}; + std::atomic reader_index_{0}; + }; - alignas(64) std::array contents_; + Common common_; + Consumer consumer_; }; } // namespace fastchan diff --git a/include/wait_strategy.hpp b/include/wait_strategy.hpp index 396c477..366005b 100644 --- a/include/wait_strategy.hpp +++ b/include/wait_strategy.hpp @@ -1,8 +1,13 @@ +#include #include +#include #include #include "common.hpp" +#ifndef FASTCHANWAIT_HPP +#define FASTCHANWAIT_HPP + namespace fastchan { // WaitStrategyInterface is the interface for actual implementation of a wait strategy handler @@ -51,10 +56,10 @@ class CVWaitStrategy : public WaitStrategyInterface { template void wait(Predicate p) { std::unique_lock lock(mutex_); - cv_.wait(lock, p); + cv_.wait_for(lock, std::chrono::nanoseconds(100), p); } - void notify() { cv_.notify_one(); } + void notify() { cv_.notify_all(); } private: std::condition_variable cv_; @@ -62,3 +67,5 @@ class CVWaitStrategy : public WaitStrategyInterface { }; } // namespace fastchan + +#endif diff --git a/test/fastchan_mpsc_test.cpp b/test/fastchan_mpsc_test.cpp index da7de0d..03198dc 100644 --- a/test/fastchan_mpsc_test.cpp +++ b/test/fastchan_mpsc_test.cpp @@ -11,16 +11,16 @@ using namespace std::chrono_literals; const auto IterationsMultiplier = 100; -template +template void testMPSCSingleThreaded() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::MPSC chan; + fastchan::MPSC chan; assert(chan.size() == 0); assert(chan.isEmpty() == true); // Test filling up with a single thread for (int i = 0; i < iterations; ++i) { - if constexpr (blockingType == fastchan::NonBlockingPutNonBlockingGet || blockingType == fastchan::NonBlockingPutBlockingGet) { + if constexpr (std::is_same::value) { auto result = false; do { result = chan.put(i); @@ -49,7 +49,7 @@ void testMPSCSingleThreaded() { // Test put and get with a single thread for (int i = 0; i < iterations; ++i) { - if constexpr (blockingType == fastchan::NonBlockingPutNonBlockingGet || blockingType == fastchan::NonBlockingPutBlockingGet) { + if constexpr (std::is_same::value) { auto result = false; do { result = chan.put(i); @@ -62,7 +62,7 @@ void testMPSCSingleThreaded() { } for (int i = 0; i < iterations; ++i) { - if constexpr (blockingType == fastchan::NonBlockingPutNonBlockingGet || blockingType == fastchan::BlockingPutNonBlockingGet) { + if constexpr (std::is_same::value) { auto&& val = chan.get(); while (!val) { val = chan.get(); @@ -82,16 +82,16 @@ void testMPSCSingleThreaded() { assert(chan.isEmpty()); } -template +template void testMPSCMultiThreadedSingleProducer() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::MPSC chan; + fastchan::MPSC chan; auto total_iterations = IterationsMultiplier * iterations; // Test put and get with multiple threads std::thread producer([&] { for (int i = 1; i <= total_iterations; ++i) { - if constexpr (blockingType == fastchan::NonBlockingPutNonBlockingGet || blockingType == fastchan::NonBlockingPutBlockingGet) { + if constexpr (std::is_same::value) { auto result = false; do { result = chan.put(i); @@ -104,7 +104,7 @@ void testMPSCMultiThreadedSingleProducer() { std::thread consumer([&] { for (int i = 1; i <= total_iterations;) { - if constexpr (blockingType == fastchan::NonBlockingPutNonBlockingGet || blockingType == fastchan::BlockingPutNonBlockingGet) { + if constexpr (std::is_same::value) { auto&& val = chan.get(); while (!val) { val = chan.get(); @@ -126,10 +126,10 @@ void testMPSCMultiThreadedSingleProducer() { assert(chan.size() == 0); } -template +template void testMPSCMultiThreadedMultiProducer() { constexpr std::size_t chan_size = (iterations / 2) + 1; - fastchan::MPSC chan; + fastchan::MPSC chan; size_t total_iterations = IterationsMultiplier * iterations; size_t total = num_threads * (total_iterations * (total_iterations + 1) / 2); @@ -140,7 +140,7 @@ void testMPSCMultiThreadedMultiProducer() { // Test put and get with multiple threads producers[i] = std::thread([&] { for (int i = 1; i <= total_iterations; ++i) { - if constexpr (blockingType == fastchan::NonBlockingPutNonBlockingGet || blockingType == fastchan::NonBlockingPutBlockingGet) { + if constexpr (std::is_same::value) { auto result = false; do { result = chan.put(i); @@ -154,7 +154,7 @@ void testMPSCMultiThreadedMultiProducer() { std::thread consumer([&] { for (int i = 1; i <= total_iterations * num_threads;) { - if constexpr (blockingType == fastchan::NonBlockingPutNonBlockingGet || blockingType == fastchan::BlockingPutNonBlockingGet) { + if constexpr (std::is_same::value) { auto&& val = chan.get(); while (!val) { val = chan.get(); @@ -179,41 +179,42 @@ void testMPSCMultiThreadedMultiProducer() { assert(chan.size() == 0); } -template +template void testMPSC() { - testMPSCSingleThreaded(); - testMPSCMultiThreadedSingleProducer(); + testMPSCSingleThreaded<4, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedSingleProducer<4, put_wait_type, get_wait_type>(); if (std::thread::hardware_concurrency() > 5) { - testMPSCMultiThreadedMultiProducer(); - testMPSCMultiThreadedMultiProducer(); + testMPSCMultiThreadedMultiProducer<4, 3, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4, 5, put_wait_type, get_wait_type>(); } else { - testMPSCMultiThreadedMultiProducer(); + testMPSCMultiThreadedMultiProducer<4, 2, put_wait_type, get_wait_type>(); } - testMPSCSingleThreaded(); - testMPSCMultiThreadedSingleProducer(); + testMPSCSingleThreaded<4096, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedSingleProducer<4096, put_wait_type, get_wait_type>(); if (std::thread::hardware_concurrency() > 5) { - testMPSCMultiThreadedMultiProducer(); - testMPSCMultiThreadedMultiProducer(); + testMPSCMultiThreadedMultiProducer<4096, 3, put_wait_type, get_wait_type>(); + testMPSCMultiThreadedMultiProducer<4096, 5, put_wait_type, get_wait_type>(); } else { - testMPSCMultiThreadedMultiProducer(); + testMPSCMultiThreadedMultiProducer<4096, 2, put_wait_type, get_wait_type>(); } } int main() { - testMPSC(); - testMPSC(); - testMPSC(); - testMPSC(); - - testMPSC(); - testMPSC(); - testMPSC(); - testMPSC(); - - testMPSC(); - testMPSC(); - testMPSC(); - testMPSC(); + testMPSC(); + testMPSC(); + testMPSC(); + testMPSC(); + + testMPSC(); + testMPSC(); + testMPSC(); + testMPSC(); + + testMPSC(); + testMPSC(); + testMPSC(); + testMPSC(); + return 0; }