| @@ -21,70 +21,79 @@ namespace rack { | |||
| namespace engine { | |||
| /** Barrier based on mutexes. | |||
| Not finished or tested, do not use. | |||
| inline void cpuPause() { | |||
| #if defined ARCH_X64 | |||
| _mm_pause(); | |||
| #elif defined ARCH_ARM64 | |||
| __yield(); | |||
| #endif | |||
| } | |||
| /** Multiple-phase barrier based on C++ mutexes, as a reference. | |||
| */ | |||
| struct Barrier { | |||
| int count = 0; | |||
| uint8_t step = 0; | |||
| int threads = 0; | |||
| size_t threads = 0; | |||
| size_t count = 0; | |||
| size_t phase = 0; | |||
| std::mutex mutex; | |||
| std::condition_variable cv; | |||
| void setThreads(int threads) { | |||
| void setThreads(size_t threads) { | |||
| this->threads = threads; | |||
| } | |||
| void wait() { | |||
| std::unique_lock<std::mutex> lock(mutex); | |||
| uint8_t s = step; | |||
| size_t currentPhase = phase; | |||
| // Check if we're the last thread. | |||
| if (++count >= threads) { | |||
| // We're the last thread. Reset next phase. | |||
| // Advance phase and reset count | |||
| count = 0; | |||
| // Allow other threads to exit wait() | |||
| step++; | |||
| phase++; | |||
| // Notify all other threads | |||
| cv.notify_all(); | |||
| return; | |||
| } | |||
| // Unlock and wait on phase to change | |||
| cv.wait(lock, [&] { | |||
| return step != s; | |||
| return phase != currentPhase; | |||
| }); | |||
| } | |||
| }; | |||
| /** 2-phase barrier based on spin-locking. | |||
| /** Multiple-phase barrier based on spin-locking. | |||
| */ | |||
| struct SpinBarrier { | |||
| std::atomic<int> count{0}; | |||
| std::atomic<uint8_t> step{0}; | |||
| int threads = 0; | |||
| size_t threads = 0; | |||
| std::atomic<size_t> count{0}; | |||
| std::atomic<size_t> phase{0}; | |||
| /** Must be called when no threads are calling wait(). | |||
| */ | |||
| void setThreads(int threads) { | |||
| void setThreads(size_t threads) { | |||
| this->threads = threads; | |||
| } | |||
| void wait() { | |||
| uint8_t s = step; | |||
| if (count.fetch_add(1, std::memory_order_acquire) + 1 >= threads) { | |||
| // We're the last thread. Reset next phase. | |||
| count = 0; | |||
| // Allow other threads to exit wait() | |||
| step++; | |||
| size_t currentPhase = phase.load(std::memory_order_acquire); | |||
| if (count.fetch_add(1, std::memory_order_acq_rel) + 1 >= threads) { | |||
| // Reset count | |||
| count.store(0, std::memory_order_release); | |||
| // Advance phase, which notifies all other threads, which are all spinning | |||
| phase.fetch_add(1, std::memory_order_release); | |||
| return; | |||
| } | |||
| // Spin until the last thread begins waiting | |||
| // Spin until the phase is changed by the last thread | |||
| while (true) { | |||
| if (step.load(std::memory_order_relaxed) != s) | |||
| if (phase.load(std::memory_order_acquire) != currentPhase) | |||
| return; | |||
| #if defined ARCH_X64 | |||
| __builtin_ia32_pause(); | |||
| #endif | |||
| // std::this_thread::yield(); | |||
| cpuPause(); | |||
| } | |||
| } | |||
| }; | |||
| @@ -95,51 +104,59 @@ yield() should be called if it is likely that all threads will block for a while | |||
| Saves CPU power after yield is called. | |||
| */ | |||
| struct HybridBarrier { | |||
| std::atomic<int> count{0}; | |||
| std::atomic<uint8_t> step{0}; | |||
| int threads = 0; | |||
| size_t threads = 0; | |||
| std::atomic<size_t> count{0}; | |||
| std::atomic<size_t> phase{0}; | |||
| std::atomic<bool> yielded{false}; | |||
| std::mutex mutex; | |||
| std::condition_variable cv; | |||
| void setThreads(int threads) { | |||
| void setThreads(size_t threads) { | |||
| this->threads = threads; | |||
| } | |||
| void yield() { | |||
| yielded = true; | |||
| yielded.store(true, std::memory_order_release); | |||
| } | |||
| void wait() { | |||
| uint8_t s = step; | |||
| if (count.fetch_add(1, std::memory_order_acquire) + 1 >= threads) { | |||
| // We're the last thread. Reset next phase. | |||
| count = 0; | |||
| bool wasYielded = yielded; | |||
| yielded = false; | |||
| // Allow other threads to exit wait() | |||
| step++; | |||
| if (wasYielded) { | |||
| size_t currentPhase = phase.load(std::memory_order_acquire); | |||
| // Check if we're the last thread | |||
| if (count.fetch_add(1, std::memory_order_acq_rel) + 1 >= threads) { | |||
| // Reset count | |||
| count.store(0, std::memory_order_release); | |||
| // If yielded, advance phase and notify all other threads | |||
| if (yielded.load(std::memory_order_acquire)) { | |||
| std::unique_lock<std::mutex> lock(mutex); | |||
| yielded.store(false, std::memory_order_release); | |||
| phase.fetch_add(1, std::memory_order_release); | |||
| cv.notify_all(); | |||
| return; | |||
| } | |||
| // Advance phase, which notifies all other threads, which are all spinning | |||
| phase.fetch_add(1, std::memory_order_release); | |||
| return; | |||
| } | |||
| // Spin until the last thread begins waiting | |||
| while (!yielded.load(std::memory_order_relaxed)) { | |||
| if (step.load(std::memory_order_relaxed) != s) | |||
| // Spin until the phase is changed by the last thread, or yield() is called | |||
| while (true) { | |||
| if (phase.load(std::memory_order_acquire) != currentPhase) | |||
| return; | |||
| #if defined ARCH_X64 | |||
| __builtin_ia32_pause(); | |||
| #endif | |||
| if (yielded.load(std::memory_order_acquire)) | |||
| break; | |||
| // std::this_thread::yield(); | |||
| cpuPause(); | |||
| } | |||
| // Wait on mutex CV | |||
| // yield() was called, so use cv to wait on phase to be changed by the last thread | |||
| std::unique_lock<std::mutex> lock(mutex); | |||
| cv.wait(lock, [&] { | |||
| return step != s; | |||
| return phase.load(std::memory_order_acquire) != currentPhase; | |||
| }); | |||
| } | |||
| }; | |||