From 2b98dfc6f9b05cb68790952e007073dfcc70ecbd Mon Sep 17 00:00:00 2001 From: Andrew Belt Date: Wed, 30 Jan 2019 16:41:25 -0500 Subject: [PATCH] Write custom threading model in Engine. Remove OpenMP dep. Fix bugs in string::basename/extension. --- Makefile | 6 - dep/Makefile | 10 -- include/engine/Engine.hpp | 12 +- src/Core/AudioInterface.cpp | 2 +- src/app/ModuleWidget.cpp | 4 +- src/app/Toolbar.cpp | 10 +- src/audio.cpp | 5 + src/engine/Engine.cpp | 281 ++++++++++++++++++++++++++++-------- src/settings.cpp | 4 +- src/string.cpp | 4 + 10 files changed, 240 insertions(+), 98 deletions(-) diff --git a/Makefile b/Makefile index d293180a..bd7faab2 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,6 @@ SOURCES += $(wildcard dep/jpommier-pffft-*/pffft.c) $(wildcard dep/jpommier-pfff SOURCES += $(wildcard src/*.cpp src/*/*.cpp) ifdef ARCH_MAC - FLAGS += -Xpreprocessor -fopenmp - SOURCES += dep/osdialog/osdialog_mac.m LDFLAGS += -lpthread -ldl \ -framework Cocoa -framework OpenGL -framework IOKit -framework CoreVideo -framework CoreAudio -framework CoreMIDI \ -Ldep/lib dep/lib/libglfw3.a dep/lib/libGLEW.a dep/lib/libjansson.a dep/lib/libspeexdsp.a dep/lib/libzip.a dep/lib/libz.a dep/lib/librtaudio.a dep/lib/librtmidi.a dep/lib/libcrypto.a dep/lib/libssl.a dep/lib/libcurl.a dep/lib/libomp.a @@ -27,8 +25,6 @@ ifdef ARCH_MAC endif ifdef ARCH_WIN - FLAGS += -fopenmp - LDFLAGS += -fopenmp SOURCES += dep/osdialog/osdialog_win.c LDFLAGS += -static \ -Wl,--export-all-symbols,--out-implib,libRack.a -mwindows \ @@ -39,8 +35,6 @@ ifdef ARCH_WIN endif ifdef ARCH_LIN - FLAGS += -fopenmp - LDFLAGS += -fopenmp SOURCES += dep/osdialog/osdialog_gtk2.c CFLAGS += $(shell pkg-config --cflags gtk+-2.0) LDFLAGS += -rdynamic \ diff --git a/dep/Makefile b/dep/Makefile index 3d91f31a..ae43bf63 100755 --- a/dep/Makefile +++ b/dep/Makefile @@ -30,7 +30,6 @@ ifdef ARCH_MAC rtmidi = lib/librtmidi.a rtaudio = lib/librtaudio.a openssl = lib/libssl.a - libomp = lib/libomp.a endif ifdef ARCH_WIN @@ -179,15 +178,6 @@ $(pffft): $(UNZIP) 29e4f76ac53b.zip cp jpommier-pffft-29e4f76ac53b/*.h include/ -$(libomp): - $(WGET) "https://releases.llvm.org/7.0.1/openmp-7.0.1.src.tar.xz" - $(SHA256) openmp-7.0.1.src.tar.xz bf16b78a678da67d68405214ec7ee59d86a15f599855806192a75dcfca9b0d0c - $(UNTAR) openmp-7.0.1.src.tar.xz - cd openmp-7.0.1.src && mkdir -p build - cd openmp-7.0.1.src/build && $(CMAKE) .. -DLIBOMP_ENABLE_SHARED=OFF - $(MAKE) -C openmp-7.0.1.src/build - $(MAKE) -C openmp-7.0.1.src/build install - clean: git clean -fdx git submodule foreach git clean -fdx diff --git a/include/engine/Engine.hpp b/include/engine/Engine.hpp index 41eb8913..75ec43ed 100644 --- a/include/engine/Engine.hpp +++ b/include/engine/Engine.hpp @@ -10,14 +10,6 @@ namespace engine { struct Engine { - /** Plugins should not manipulate other modules or cables unless that is the entire purpose of the module. - Your plugin needs to have a clear purpose for manipulating other modules and cables and must be done with a good UX. - */ - std::vector modules; - std::vector cables; - bool paused = false; - int threadCount; - struct Internal; Internal *internal; @@ -27,6 +19,10 @@ struct Engine { void start(); /** Stops engine thread */ void stop(); + void setThreadCount(int threadCount); + int getThreadCount(); + void setPaused(bool paused); + bool isPaused(); /** Does not transfer pointer ownership */ void addModule(Module *module); void removeModule(Module *module); diff --git a/src/Core/AudioInterface.cpp b/src/Core/AudioInterface.cpp index 35de6831..74e2543f 100644 --- a/src/Core/AudioInterface.cpp +++ b/src/Core/AudioInterface.cpp @@ -1,11 +1,11 @@ #include "Core.hpp" #include "audio.hpp" +#include "app.hpp" #include #include #include #include #include -#include "app.hpp" static const int AUDIO_OUTPUTS = 8; diff --git a/src/app/ModuleWidget.cpp b/src/app/ModuleWidget.cpp index 38f240f0..4c47f9fe 100644 --- a/src/app/ModuleWidget.cpp +++ b/src/app/ModuleWidget.cpp @@ -149,11 +149,11 @@ void ModuleWidget::draw(const widget::DrawContext &ctx) { nvgBeginPath(ctx.vg); nvgRect(ctx.vg, 0, box.size.y - 20, - 65, 20); + 105, 20); nvgFillColor(ctx.vg, nvgRGBAf(0, 0, 0, 0.75)); nvgFill(ctx.vg); - std::string cpuText = string::f("%.2f μs", module->cpuTime * 1e6f); + std::string cpuText = string::f("%.2f μs %.1f%%", module->cpuTime * 1e6f, module->cpuTime * APP->engine->getSampleRate() * 100); bndLabel(ctx.vg, 2.0, box.size.y - 20.0, INFINITY, INFINITY, -1, cpuText.c_str()); float p = math::clamp(module->cpuTime / APP->engine->getSampleTime(), 0.f, 1.f); diff --git a/src/app/Toolbar.cpp b/src/app/Toolbar.cpp index b8ba20ba..90057958 100644 --- a/src/app/Toolbar.cpp +++ b/src/app/Toolbar.cpp @@ -260,10 +260,10 @@ struct LockModulesItem : ui::MenuItem { struct EnginePauseItem : ui::MenuItem { EnginePauseItem() { text = "Pause engine"; - rightText = CHECKMARK(APP->engine->paused); + rightText = CHECKMARK(APP->engine->isPaused()); } void onAction(const event::Action &e) override { - APP->engine->paused ^= true; + APP->engine->setPaused(!APP->engine->isPaused()); } }; @@ -277,7 +277,7 @@ struct SampleRateValueItem : ui::MenuItem { } void onAction(const event::Action &e) override { APP->engine->setSampleRate(sampleRate); - APP->engine->paused = false; + APP->engine->setPaused(false); } }; @@ -320,10 +320,10 @@ struct ThreadCountValueItem : ui::MenuItem { text += " (default)"; else if (threadCount == system::getPhysicalCoreCount() / 2) text += " (recommended)"; - rightText = CHECKMARK(APP->engine->threadCount == threadCount); + rightText = CHECKMARK(APP->engine->getThreadCount() == threadCount); } void onAction(const event::Action &e) override { - APP->engine->threadCount = threadCount; + APP->engine->setThreadCount(threadCount); } }; diff --git a/src/audio.cpp b/src/audio.cpp index 627b772a..901dcfa9 100644 --- a/src/audio.cpp +++ b/src/audio.cpp @@ -2,6 +2,7 @@ #include "string.hpp" #include "math.hpp" #include "bridge.hpp" +#include "system.hpp" namespace rack { @@ -207,6 +208,10 @@ void IO::setChannels(int numOutputs, int numInputs) { static int rtCallback(void *outputBuffer, void *inputBuffer, unsigned int nFrames, double streamTime, RtAudioStreamStatus status, void *userData) { IO *audioIO = (IO*) userData; assert(audioIO); + // Exploit the stream time to run code on startup of the audio thread + if (streamTime == 0.0) { + system::setThreadName("Audio"); + } audioIO->processStream((const float *) inputBuffer, (float *) outputBuffer, nFrames); return 0; } diff --git a/src/engine/Engine.cpp b/src/engine/Engine.cpp index cdff07ec..e5524310 100644 --- a/src/engine/Engine.cpp +++ b/src/engine/Engine.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -32,6 +33,7 @@ struct VIPMutex { } }; + struct VIPLock { VIPMutex &m; VIPLock(VIPMutex &m) : m(m) { @@ -47,7 +49,85 @@ struct VIPLock { }; +struct Barrier { + std::mutex mutex; + std::condition_variable cv; + int count = 0; + int total = 0; + + void wait() { + // Waiting on one thread is trivial. + if (total <= 1) + return; + std::unique_lock lock(mutex); + count++; + if (count >= total) { + count = 0; + cv.notify_all(); + } + else { + cv.wait(lock); + } + } +}; + + +struct SpinBarrier { + std::atomic count; + int total = 0; + + SpinBarrier() { + count = 0; + } + + void wait() { + count++; + if (count >= total) { + count = 0; + } + else { + while (count > 0) {} + } + } +}; + + +struct EngineWorker { + Engine *engine; + int id; + std::thread thread; + bool running = true; + + void start() { + thread = std::thread([&] { + system::setThreadName("Engine worker"); + run(); + }); + } + + void stop() { + running = false; + } + + void join() { + thread.join(); + } + + void run() { + while (running) { + step(); + } + } + + void step(); +}; + + struct Engine::Internal { + std::vector modules; + std::vector cables; + bool paused = false; + bool running = false; float sampleRate; float sampleTime; @@ -64,6 +144,11 @@ struct Engine::Internal { std::mutex mutex; std::thread thread; VIPMutex vipMutex; + + int threadCount = 1; + std::vector workers; + Barrier engineBarrier; + SpinBarrier workerBarrier; }; @@ -74,57 +159,55 @@ Engine::Engine() { internal->sampleRate = sampleRate; internal->sampleTime = 1 / sampleRate; internal->sampleRateRequested = sampleRate; - - threadCount = 1; } Engine::~Engine() { // Make sure there are no cables or modules in the rack on destruction. This suggests that a module failed to remove itself before the RackWidget was destroyed. - assert(cables.empty()); - assert(modules.empty()); + assert(internal->cables.empty()); + assert(internal->modules.empty()); delete internal; } -static void Engine_step(Engine *engine) { - // Sample rate - if (engine->internal->sampleRateRequested != engine->internal->sampleRate) { - engine->internal->sampleRate = engine->internal->sampleRateRequested; - engine->internal->sampleTime = 1 / engine->internal->sampleRate; - for (Module *module : engine->modules) { - module->onSampleRateChange(); - } +static void Engine_setWorkerCount(Engine *engine, int workerCount) { + Engine::Internal *internal = engine->internal; + + // Stop all workers + for (EngineWorker &worker : internal->workers) { + worker.stop(); } + internal->engineBarrier.wait(); - // Param smoothing - { - Module *smoothModule = engine->internal->smoothModule; - int smoothParamId = engine->internal->smoothParamId; - float smoothValue = engine->internal->smoothValue; - if (smoothModule) { - Param *param = &smoothModule->params[smoothParamId]; - float value = param->value; - // decay rate is 1 graphics frame - const float smoothLambda = 60.f; - float newValue = value + (smoothValue - value) * smoothLambda * engine->internal->sampleTime; - if (value == newValue || !(param->minValue <= newValue && newValue <= param->maxValue)) { - // Snap to actual smooth value if the value doesn't change enough (due to the granularity of floats), or if newValue is out of bounds - param->setValue(smoothValue); - engine->internal->smoothModule = NULL; - } - else { - param->value = newValue; - } + // Destroy all workers + for (EngineWorker &worker : internal->workers) { + worker.join(); + } + internal->workers.resize(0); + + // Set barrier counts + internal->engineBarrier.total = workerCount + 1; + internal->workerBarrier.total = workerCount + 1; + + if (workerCount >= 1) { + // Create workers + internal->workers.resize(workerCount); + for (int i = 0; i < workerCount; i++) { + EngineWorker &worker = internal->workers[i]; + worker.id = i + 1; + worker.engine = engine; + worker.start(); } } +} + +static void Engine_stepModules(Engine *engine, int id) { + Engine::Internal *internal = engine->internal; - const float cpuLambda = engine->internal->sampleTime / 2.f; + int threadCount = internal->threadCount; + int modulesLen = internal->modules.size(); - // Iterate modules - int modulesLen = engine->modules.size(); - #pragma omp parallel for num_threads(engine->threadCount) schedule(guided, 1) - for (int i = 0; i < modulesLen; i++) { - Module *module = engine->modules[i]; + for (int i = id; i < modulesLen; i += threadCount) { + Module *module = internal->modules[i]; if (!module->bypass) { // Step module if (settings::powerMeter) { @@ -134,8 +217,9 @@ static void Engine_step(Engine *engine) { auto stopTime = std::chrono::high_resolution_clock::now(); float cpuTime = std::chrono::duration(stopTime - startTime).count(); - // Smooth cpu time - module->cpuTime += (cpuTime - module->cpuTime) * cpuLambda; + // Smooth CPU time + const float cpuTau = 2.f /* seconds */; + module->cpuTime += (cpuTime - module->cpuTime) * internal->sampleTime / cpuTau; } else { module->step(); @@ -150,19 +234,57 @@ static void Engine_step(Engine *engine) { output.step(); } } +} + +static void Engine_step(Engine *engine) { + Engine::Internal *internal = engine->internal; -#if 0 - if (random::u32() % 1000 == 0 && settings::powerMeter) { - float cpuTotal = 0.f; - for (Module *module : engine->modules) { - cpuTotal += module->cpuTime; + // Sample rate + if (internal->sampleRateRequested != internal->sampleRate) { + internal->sampleRate = internal->sampleRateRequested; + internal->sampleTime = 1 / internal->sampleRate; + for (Module *module : internal->modules) { + module->onSampleRateChange(); + } + } + + // Param smoothing + { + Module *smoothModule = internal->smoothModule; + int smoothParamId = internal->smoothParamId; + float smoothValue = internal->smoothValue; + if (smoothModule) { + Param *param = &smoothModule->params[smoothParamId]; + float value = param->value; + // decay rate is 1 graphics frame + const float smoothLambda = 60.f; + float newValue = value + (smoothValue - value) * smoothLambda * internal->sampleTime; + if (value == newValue || !(param->minValue <= newValue && newValue <= param->maxValue)) { + // Snap to actual smooth value if the value doesn't change enough (due to the granularity of floats), or if newValue is out of bounds + param->setValue(smoothValue); + internal->smoothModule = NULL; + } + else { + param->value = newValue; + } } - DEBUG("%fus %f%% CPU", cpuTotal * 1e6, cpuTotal * engine->internal->sampleRate * 100); } -#endif + + // Lazily create/destroy workers + int workerCount = internal->threadCount - 1; + if ((int) internal->workers.size() != workerCount) { + Engine_setWorkerCount(engine, workerCount); + } + else { + internal->engineBarrier.wait(); + } + + // Step modules along with workers + Engine_stepModules(engine, 0); + internal->workerBarrier.wait(); // Step cables - for (Cable *cable : engine->cables) { + for (Cable *cable : engine->internal->cables) { cable->step(); } } @@ -186,7 +308,7 @@ static void Engine_run(Engine *engine) { while (engine->internal->running) { engine->internal->vipMutex.wait(); - if (!engine->paused) { + if (!engine->internal->paused) { std::lock_guard lock(engine->internal->mutex); // auto startTime = std::chrono::high_resolution_clock::now(); @@ -214,6 +336,8 @@ static void Engine_run(Engine *engine) { std::this_thread::sleep_for(std::chrono::duration(stepTime)); } } + + Engine_setWorkerCount(engine, 0); } void Engine::start() { @@ -226,13 +350,35 @@ void Engine::stop() { internal->thread.join(); } +void Engine::setThreadCount(int threadCount) { + assert(threadCount >= 1); + VIPLock vipLock(internal->vipMutex); + std::lock_guard lock(internal->mutex); + internal->threadCount = threadCount; +} + +int Engine::getThreadCount() { + // No lock + return internal->threadCount; +} + +void Engine::setPaused(bool paused) { + // No lock + internal->paused = paused; +} + +bool Engine::isPaused() { + // No lock + return internal->paused; +} + void Engine::addModule(Module *module) { assert(module); VIPLock vipLock(internal->vipMutex); std::lock_guard lock(internal->mutex); // Check that the module is not already added - auto it = std::find(modules.begin(), modules.end(), module); - assert(it == modules.end()); + auto it = std::find(internal->modules.begin(), internal->modules.end(), module); + assert(it == internal->modules.end()); // Set ID if (module->id == 0) { // Automatically assign ID @@ -242,12 +388,12 @@ void Engine::addModule(Module *module) { // Manual ID assert(module->id < internal->nextModuleId); // Check that the ID is not already taken - for (Module *m : modules) { + for (Module *m : internal->modules) { assert(module->id != m->id); } } // Add module - modules.push_back(module); + internal->modules.push_back(module); } void Engine::removeModule(Module *module) { @@ -259,15 +405,15 @@ void Engine::removeModule(Module *module) { internal->smoothModule = NULL; } // Check that all cables are disconnected - for (Cable *cable : cables) { + for (Cable *cable : internal->cables) { assert(cable->outputModule != module); assert(cable->inputModule != module); } // Check that the module actually exists - auto it = std::find(modules.begin(), modules.end(), module); - assert(it != modules.end()); + auto it = std::find(internal->modules.begin(), internal->modules.end(), module); + assert(it != internal->modules.end()); // Remove the module - modules.erase(it); + internal->modules.erase(it); // Remove id module->id = 0; } @@ -311,7 +457,7 @@ void Engine::bypassModule(Module *module, bool bypass) { static void Engine_updateConnected(Engine *engine) { // Set everything to unconnected - for (Module *module : engine->modules) { + for (Module *module : engine->internal->modules) { for (Input &input : module->inputs) { input.active = false; } @@ -320,7 +466,7 @@ static void Engine_updateConnected(Engine *engine) { } } // Set inputs/outputs to active - for (Cable *cable : engine->cables) { + for (Cable *cable : engine->internal->cables) { cable->outputModule->outputs[cable->outputId].active = true; cable->inputModule->inputs[cable->inputId].active = true; } @@ -334,7 +480,7 @@ void Engine::addCable(Cable *cable) { assert(cable->outputModule); assert(cable->inputModule); // Check that the cable is not already added, and that the input is not already used by another cable - for (Cable *cable2 : cables) { + for (Cable *cable2 : internal->cables) { assert(cable2 != cable); assert(!(cable2->inputModule == cable->inputModule && cable2->inputId == cable->inputId)); } @@ -347,12 +493,12 @@ void Engine::addCable(Cable *cable) { // Manual ID assert(cable->id < internal->nextCableId); // Check that the ID is not already taken - for (Cable *w : cables) { + for (Cable *w : internal->cables) { assert(cable->id != w->id); } } // Add the cable - cables.push_back(cable); + internal->cables.push_back(cable); Engine_updateConnected(this); } @@ -361,13 +507,13 @@ void Engine::removeCable(Cable *cable) { VIPLock vipLock(internal->vipMutex); std::lock_guard lock(internal->mutex); // Check that the cable is already added - auto it = std::find(cables.begin(), cables.end(), cable); - assert(it != cables.end()); + auto it = std::find(internal->cables.begin(), internal->cables.end(), cable); + assert(it != internal->cables.end()); // Set input to inactive Input &input = cable->inputModule->inputs[cable->inputId]; input.setChannels(0); // Remove the cable - cables.erase(it); + internal->cables.erase(it); Engine_updateConnected(this); // Remove ID cable->id = 0; @@ -415,5 +561,12 @@ float Engine::getSampleTime() { } +void EngineWorker::step() { + Engine_stepModules(engine, id); + engine->internal->workerBarrier.wait(); + engine->internal->engineBarrier.wait(); +} + + } // namespace engine } // namespace rack diff --git a/src/settings.cpp b/src/settings.cpp index e946483a..6a17d206 100644 --- a/src/settings.cpp +++ b/src/settings.cpp @@ -69,7 +69,7 @@ static json_t *settingsToJson() { json_object_set_new(rootJ, "powerMeter", json_boolean(powerMeter)); // threadCount - json_object_set_new(rootJ, "threadCount", json_integer(APP->engine->threadCount)); + json_object_set_new(rootJ, "threadCount", json_integer(APP->engine->getThreadCount())); // checkVersion json_object_set_new(rootJ, "checkVersion", json_boolean(checkVersion)); @@ -152,7 +152,7 @@ static void settingsFromJson(json_t *rootJ) { // threadCount json_t *threadCountJ = json_object_get(rootJ, "threadCount"); if (threadCountJ) - APP->engine->threadCount = json_integer_value(threadCountJ); + APP->engine->setThreadCount(json_integer_value(threadCountJ)); // checkVersion json_t *checkVersionJ = json_object_get(rootJ, "checkVersion"); diff --git a/src/string.cpp b/src/string.cpp index 6f4f8fa4..54ea6c08 100644 --- a/src/string.cpp +++ b/src/string.cpp @@ -65,11 +65,15 @@ std::string filename(const std::string &path) { std::string basename(const std::string &path) { size_t pos = path.rfind('.'); + if (pos == std::string::npos) + return path; return std::string(path, 0, pos); } std::string extension(const std::string &path) { size_t pos = path.rfind('.'); + if (pos == std::string::npos) + return ""; return std::string(path, pos); }