From 05427d938b59adadd4219eeb741e3bf55628d159 Mon Sep 17 00:00:00 2001 From: Andrew Belt Date: Thu, 2 Apr 2020 02:33:52 -0400 Subject: [PATCH] Overhaul threading model for Engine using a shared mutex based design. --- include/engine/Engine.hpp | 35 ++++++++-- include/plugin.hpp | 4 +- src/app/ModuleWidget.cpp | 5 +- src/app/RackWidget.cpp | 9 ++- src/engine/Engine.cpp | 132 +++++++++++++++++++++++++------------- src/history.cpp | 6 +- src/plugin.cpp | 9 +-- 7 files changed, 133 insertions(+), 67 deletions(-) diff --git a/include/engine/Engine.hpp b/include/engine/Engine.hpp index c8242a86..47dc18ed 100644 --- a/include/engine/Engine.hpp +++ b/include/engine/Engine.hpp @@ -11,6 +11,13 @@ namespace rack { namespace engine { +/** Manages Modules and Cables and steps them in time. + +All methods are thread-safe and can safely be called from anywhere. + +The methods clear, addModule, removeModule, moduleToJson, moduleFromJson, addCable, removeCable, addParamHandle, removeParamHandle, toJson, and fromJson cannot be run simultaneously with any other Engine method. +Calling these methods inside any Engine method will result in a deadlock. +*/ struct Engine { struct Internal; Internal* internal; @@ -18,6 +25,7 @@ struct Engine { Engine(); ~Engine(); + /** Removes all modules and cables. */ void clear(); /** Advances the engine by `frames` frames. Only call this method from the primary module. @@ -26,6 +34,8 @@ struct Engine { void setPrimaryModule(Module* module); Module* getPrimaryModule(); + /** Returns the sample rate used by the engine for stepping each module. + */ float getSampleRate(); /** Returns the inverse of the current sample rate. */ @@ -37,11 +47,14 @@ struct Engine { /** Returns the number of audio samples since the Engine's first sample. */ int64_t getFrame(); - /** Returns the frame when step() was last called. */ + /** Returns the frame when step() was last called. + */ int64_t getStepFrame(); - /** Returns the timestamp in nanoseconds when step() was last called. */ + /** Returns the timestamp in nanoseconds when step() was last called. + */ int64_t getStepTime(); - /** Returns the total number of frames in the current step() call. */ + /** Returns the total number of frames in the current step() call. + */ int getStepFrames(); // Modules @@ -56,6 +69,10 @@ struct Engine { void resetModule(Module* module); void randomizeModule(Module* module); void bypassModule(Module* module, bool bypassed); + /** Serializes/deserializes with locking, ensuring that Module::process() is not called during toJson()/fromJson(). + */ + json_t* moduleToJson(Module* module); + void moduleFromJson(Module* module, json_t* rootJ); // Cables /** Adds a cable to the rack engine. @@ -70,17 +87,21 @@ struct Engine { // Params void setParam(Module* module, int paramId, float value); float getParam(Module* module, int paramId); - /** Requests the parameter to smoothly change toward `value`. */ + /** Requests the parameter to smoothly change toward `value`. + */ void setSmoothParam(Module* module, int paramId, float value); - /** Returns the target value before smoothing. */ + /** Returns the target value before smoothing. + */ float getSmoothParam(Module* module, int paramId); // ParamHandles void addParamHandle(ParamHandle* paramHandle); void removeParamHandle(ParamHandle* paramHandle); - /** Returns the unique ParamHandle for the given paramId */ + /** Returns the unique ParamHandle for the given paramId + */ ParamHandle* getParamHandle(int moduleId, int paramId); - /** Use getParamHandle(int, int) instead. */ + /** Use getParamHandle(int, int) instead. + */ DEPRECATED ParamHandle* getParamHandle(Module* module, int paramId); /** Sets the ParamHandle IDs and module pointer. If `overwrite` is true and another ParamHandle points to the same param, unsets that one and replaces it with the given handle. diff --git a/include/plugin.hpp b/include/plugin.hpp index eb2b81e0..b1efe072 100644 --- a/include/plugin.hpp +++ b/include/plugin.hpp @@ -35,10 +35,10 @@ void syncUpdates(); bool isSyncing(); Plugin* getPlugin(const std::string& pluginSlug); Model* getModel(const std::string& pluginSlug, const std::string& modelSlug); -/** Creates a Module from a JSON module object. +/** Creates a Model from a JSON module object. Throws an Exception if the model is not found. */ -engine::Module* moduleFromJson(json_t* moduleJ); +Model* modelFromJson(json_t* moduleJ); /** Checks that the slug contains only alphanumeric characters, "-", and "_" */ bool isSlugValid(const std::string& slug); /** Returns a string containing only the valid slug characters. */ diff --git a/src/app/ModuleWidget.cpp b/src/app/ModuleWidget.cpp index 33a7fa3e..d7c69692 100644 --- a/src/app/ModuleWidget.cpp +++ b/src/app/ModuleWidget.cpp @@ -572,12 +572,12 @@ PortWidget* ModuleWidget::getOutput(int portId) { } json_t* ModuleWidget::toJson() { - json_t* moduleJ = module->toJson(); + json_t* moduleJ = APP->engine->moduleToJson(module); return moduleJ; } void ModuleWidget::fromJson(json_t* rootJ) { - module->fromJson(rootJ); + APP->engine->moduleFromJson(module, rootJ); } void ModuleWidget::copyClipboard() { @@ -843,6 +843,7 @@ void ModuleWidget::cloneAction() { engine::Module* clonedModule = model->createModule(); // JSON serialization is the obvious way to do this json_t* moduleJ = toJson(); + // This doesn't need a lock (via Engine::moduleFromJson()) because the Module is not added to the Engine yet. clonedModule->fromJson(moduleJ); json_decref(moduleJ); // Reset ID so the Engine automatically assigns a new one diff --git a/src/app/RackWidget.cpp b/src/app/RackWidget.cpp index 44f6741c..367bfd32 100644 --- a/src/app/RackWidget.cpp +++ b/src/app/RackWidget.cpp @@ -23,8 +23,10 @@ namespace app { /** Creates a new Module and ModuleWidget */ ModuleWidget* moduleWidgetFromJson(json_t* moduleJ) { - engine::Module* module = plugin::moduleFromJson(moduleJ); + plugin::Model* model = plugin::modelFromJson(moduleJ); + engine::Module* module = model->createModule(); assert(module); + module->fromJson(moduleJ); // Create ModuleWidget ModuleWidget* moduleWidget = module->model->createModuleWidget(module); @@ -305,7 +307,10 @@ void RackWidget::pastePresetClipboardAction() { }); try { - engine::Module* module = plugin::moduleFromJson(moduleJ); + plugin::Model* model = plugin::modelFromJson(moduleJ); + engine::Module* module = model->createModule(); + assert(module); + module->fromJson(moduleJ); // Reset ID so the Engine automatically assigns a new one module->id = -1; APP->engine->addModule(module); diff --git a/src/engine/Engine.cpp b/src/engine/Engine.cpp index 5f4f6afa..396364eb 100644 --- a/src/engine/Engine.cpp +++ b/src/engine/Engine.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -29,6 +30,44 @@ static void initMXCSR() { } +/** Allows multiple "reader" threads to obtain a lock simultaneously, but only one "writer" thread. +This implementation is just a wrapper for pthreads. +This is available in C++14 as std::shared_mutex, but unfortunately we're using C++11. +*/ +struct SharedMutex { + pthread_rwlock_t rwlock; + SharedMutex() { + if (pthread_rwlock_init(&rwlock, NULL)) + throw Exception("pthread_rwlock_init failed"); + } + ~SharedMutex() { + pthread_rwlock_destroy(&rwlock); + } +}; + +struct SharedLock { + SharedMutex& m; + SharedLock(SharedMutex& m) : m(m) { + if (pthread_rwlock_rdlock(&m.rwlock)) + throw Exception("pthread_rwlock_rdlock failed"); + } + ~SharedLock() { + pthread_rwlock_unlock(&m.rwlock); + } +}; + +struct ExclusiveSharedLock { + SharedMutex& m; + ExclusiveSharedLock(SharedMutex& m) : m(m) { + if (pthread_rwlock_wrlock(&m.rwlock)) + throw Exception("pthread_rwlock_wrlock failed"); + } + ~ExclusiveSharedLock() { + pthread_rwlock_unlock(&m.rwlock); + } +}; + + struct Barrier { std::mutex mutex; std::condition_variable cv; @@ -164,7 +203,7 @@ struct Engine::Internal { int smoothParamId = 0; float smoothValue = 0.f; - std::recursive_mutex mutex; + SharedMutex mutex; int threadCount = 0; std::vector workers; @@ -408,7 +447,8 @@ Engine::~Engine() { void Engine::clear() { - std::lock_guard lock(internal->mutex); + // TODO This needs a lock that doesn't interfere with removeParamHandle, removeCable, and removeModule. + // Copy lists because we'll be removing while iterating std::set paramHandles = internal->paramHandles; for (ParamHandle* paramHandle : paramHandles) { @@ -432,7 +472,7 @@ void Engine::clear() { void Engine::step(int frames) { - std::lock_guard lock(internal->mutex); + SharedLock lock(internal->mutex); // Configure thread initMXCSR(); random::init(); @@ -474,25 +514,22 @@ void Engine::step(int frames) { void Engine::setPrimaryModule(Module* module) { - std::lock_guard lock(internal->mutex); + SharedLock lock(internal->mutex); internal->primaryModule = module; } Module* Engine::getPrimaryModule() { - // No lock, for performance return internal->primaryModule; } float Engine::getSampleRate() { - // No lock, for performance return internal->sampleRate; } float Engine::getSampleTime() { - // No lock, for performance return internal->sampleTime; } @@ -503,31 +540,27 @@ void Engine::yieldWorkers() { int64_t Engine::getFrame() { - // No lock, for performance return internal->frame; } int64_t Engine::getStepFrame() { - // No lock, for performance return internal->stepFrame; } int64_t Engine::getStepTime() { - // No lock, for performance return internal->stepTime; } int Engine::getStepFrames() { - // No lock, for performance return internal->stepFrames; } void Engine::addModule(Module* module) { - std::lock_guard lock(internal->mutex); + ExclusiveSharedLock lock(internal->mutex); assert(module); // Check that the module is not already added auto it = std::find(internal->modules.begin(), internal->modules.end(), module); @@ -557,12 +590,11 @@ void Engine::addModule(Module* module) { if (paramHandle->moduleId == module->id) paramHandle->module = module; } - // DEBUG("Added module %d to engine", module->id); } void Engine::removeModule(Module* module) { - std::lock_guard lock(internal->mutex); + ExclusiveSharedLock lock(internal->mutex); assert(module); // Check that the module actually exists auto it = std::find(internal->modules.begin(), internal->modules.end(), module); @@ -600,13 +632,12 @@ void Engine::removeModule(Module* module) { internal->primaryModule = NULL; // Remove module internal->modules.erase(it); - // DEBUG("Removed module %d to engine", module->id); } Module* Engine::getModule(int moduleId) { - std::lock_guard lock(internal->mutex); - // Find module + SharedLock lock(internal->mutex); + // Find module by id for (Module* module : internal->modules) { if (module->id == moduleId) return module; @@ -616,7 +647,7 @@ Module* Engine::getModule(int moduleId) { void Engine::resetModule(Module* module) { - std::lock_guard lock(internal->mutex); + SharedLock lock(internal->mutex); assert(module); Module::ResetEvent eReset; @@ -625,7 +656,7 @@ void Engine::resetModule(Module* module) { void Engine::randomizeModule(Module* module) { - std::lock_guard lock(internal->mutex); + SharedLock lock(internal->mutex); assert(module); Module::RandomizeEvent eRandomize; @@ -634,7 +665,7 @@ void Engine::randomizeModule(Module* module) { void Engine::bypassModule(Module* module, bool bypassed) { - std::lock_guard lock(internal->mutex); + SharedLock lock(internal->mutex); assert(module); if (module->bypassed() == bypassed) return; @@ -656,6 +687,18 @@ void Engine::bypassModule(Module* module, bool bypassed) { } +json_t* Engine::moduleToJson(Module* module) { + ExclusiveSharedLock lock(internal->mutex); + return module->toJson(); +} + + +void Engine::moduleFromJson(Module* module, json_t* rootJ) { + ExclusiveSharedLock lock(internal->mutex); + module->fromJson(rootJ); +} + + static void Port_setDisconnected(Port* that) { that->channels = 0; for (int c = 0; c < PORT_MAX_CHANNELS; c++) { @@ -704,7 +747,7 @@ static void Engine_updateConnected(Engine* that) { void Engine::addCable(Cable* cable) { - std::lock_guard lock(internal->mutex); + ExclusiveSharedLock lock(internal->mutex); assert(cable); // Check cable properties assert(cable->inputModule); @@ -754,12 +797,11 @@ void Engine::addCable(Cable* cable) { e.portId = cable->outputId; cable->outputModule->onPortChange(e); } - // DEBUG("Added cable %d to engine", cable->id); } void Engine::removeCable(Cable* cable) { - std::lock_guard lock(internal->mutex); + ExclusiveSharedLock lock(internal->mutex); assert(cable); // Check that the cable is already added auto it = std::find(internal->cables.begin(), internal->cables.end(), cable); @@ -790,13 +832,12 @@ void Engine::removeCable(Cable* cable) { e.portId = cable->outputId; cable->outputModule->onPortChange(e); } - // DEBUG("Removed cable %d to engine", cable->id); } Cable* Engine::getCable(int cableId) { - std::lock_guard lock(internal->mutex); - // Find Cable + SharedLock lock(internal->mutex); + // Find cable by id for (Cable* cable : internal->cables) { if (cable->id == cableId) return cable; @@ -806,7 +847,7 @@ Cable* Engine::getCable(int cableId) { void Engine::setParam(Module* module, int paramId, float value) { - // No lock, for performance + SharedLock lock(internal->mutex); // If param is being smoothed, cancel smoothing. if (internal->smoothModule == module && internal->smoothParamId == paramId) { internal->smoothModule = NULL; @@ -817,13 +858,13 @@ void Engine::setParam(Module* module, int paramId, float value) { float Engine::getParam(Module* module, int paramId) { - // No lock, for performance + SharedLock lock(internal->mutex); return module->params[paramId].value; } void Engine::setSmoothParam(Module* module, int paramId, float value) { - // No lock, for performance + SharedLock lock(internal->mutex); // If another param is being smoothed, jump value if (internal->smoothModule && !(internal->smoothModule == module && internal->smoothParamId == paramId)) { internal->smoothModule->params[internal->smoothParamId].value = internal->smoothValue; @@ -836,10 +877,10 @@ void Engine::setSmoothParam(Module* module, int paramId, float value) { float Engine::getSmoothParam(Module* module, int paramId) { - // No lock, for performance + SharedLock lock(internal->mutex); if (internal->smoothModule == module && internal->smoothParamId == paramId) return internal->smoothValue; - return getParam(module, paramId); + return module->params[paramId].value; } @@ -856,8 +897,7 @@ static void Engine_refreshParamHandleCache(Engine* that) { void Engine::addParamHandle(ParamHandle* paramHandle) { - std::lock_guard lock(internal->mutex); - + ExclusiveSharedLock lock(internal->mutex); // New ParamHandles must be blank. // This means we don't have to refresh the cache. assert(paramHandle->moduleId < 0); @@ -872,8 +912,7 @@ void Engine::addParamHandle(ParamHandle* paramHandle) { void Engine::removeParamHandle(ParamHandle* paramHandle) { - std::lock_guard lock(internal->mutex); - + ExclusiveSharedLock lock(internal->mutex); // Check that the ParamHandle is already added auto it = internal->paramHandles.find(paramHandle); assert(it != internal->paramHandles.end()); @@ -886,7 +925,7 @@ void Engine::removeParamHandle(ParamHandle* paramHandle) { ParamHandle* Engine::getParamHandle(int moduleId, int paramId) { - // No lock, for performance + SharedLock lock(internal->mutex); auto it = internal->paramHandleCache.find(std::make_tuple(moduleId, paramId)); if (it == internal->paramHandleCache.end()) return NULL; @@ -895,14 +934,12 @@ ParamHandle* Engine::getParamHandle(int moduleId, int paramId) { ParamHandle* Engine::getParamHandle(Module* module, int paramId) { - // No lock, for performance return getParamHandle(module->id, paramId); } void Engine::updateParamHandle(ParamHandle* paramHandle, int moduleId, int paramId, bool overwrite) { - std::lock_guard lock(internal->mutex); - + SharedLock lock(internal->mutex); // Check that it exists auto it = internal->paramHandles.find(paramHandle); assert(it != internal->paramHandles.end()); @@ -940,7 +977,7 @@ void Engine::updateParamHandle(ParamHandle* paramHandle, int moduleId, int param json_t* Engine::toJson() { - std::lock_guard lock(internal->mutex); + ExclusiveSharedLock lock(internal->mutex); json_t* rootJ = json_object(); // modules @@ -966,9 +1003,10 @@ json_t* Engine::toJson() { void Engine::fromJson(json_t* rootJ) { - // Don't lock here because Module::fromJson for example might deadlock, and we actually don't really need thread safety other than the addModule() and addCable() calls, which are already behind locks. - // std::lock_guard lock(internal->mutex); - + // We can't lock here because addModule() and addCable() are called inside. + // Also, AudioInterface::fromJson() might call Engine::step() due to RtAudio drivers. + // ExclusiveSharedLock lock(internal->mutex); + clear(); // modules json_t* modulesJ = json_object_get(rootJ, "modules"); if (!modulesJ) @@ -977,13 +1015,18 @@ void Engine::fromJson(json_t* rootJ) { json_t* moduleJ; json_array_foreach(modulesJ, moduleIndex, moduleJ) { try { - Module* module = plugin::moduleFromJson(moduleJ); + plugin::Model* model = plugin::modelFromJson(moduleJ); + Module* module = model->createModule(); + assert(module); + // This doesn't need a lock because the Module is not added to the Engine yet. + module->fromJson(moduleJ); // Before 1.0, the module ID was the index in the "modules" array if (APP->patch->isLegacy(2)) { module->id = moduleIndex; } + // This method locks addModule(module); } catch (Exception& e) { @@ -1005,6 +1048,7 @@ void Engine::fromJson(json_t* rootJ) { Cable* cable = new Cable; try { cable->fromJson(cableJ); + // This method locks addCable(cable); } catch (Exception& e) { diff --git a/src/history.cpp b/src/history.cpp index 17926888..e33d2fe8 100644 --- a/src/history.cpp +++ b/src/history.cpp @@ -48,7 +48,7 @@ void ModuleAdd::setModule(app::ModuleWidget* mw) { pos = mw->box.pos; // ModuleAdd doesn't *really* need the state to be serialized, although ModuleRemove certainly does. // However, creating a module may give it a nondeterministic initial state for whatever reason, so serialize anyway. - moduleJ = mw->module->toJson(); + moduleJ = APP->engine->moduleToJson(mw->module); } void ModuleAdd::undo() { @@ -104,13 +104,13 @@ ModuleChange::~ModuleChange() { void ModuleChange::undo() { engine::Module* module = APP->engine->getModule(moduleId); assert(module); - module->fromJson(oldModuleJ); + APP->engine->moduleFromJson(module, oldModuleJ); } void ModuleChange::redo() { engine::Module* module = APP->engine->getModule(moduleId); assert(module); - module->fromJson(newModuleJ); + APP->engine->moduleFromJson(module, newModuleJ); } diff --git a/src/plugin.cpp b/src/plugin.cpp index e73a8c4d..629912e8 100644 --- a/src/plugin.cpp +++ b/src/plugin.cpp @@ -500,7 +500,7 @@ Model* getModel(const std::string& pluginSlug, const std::string& modelSlug) { } -engine::Module* moduleFromJson(json_t* moduleJ) { +Model* modelFromJson(json_t* moduleJ) { // Get slugs json_t* pluginSlugJ = json_object_get(moduleJ, "plugin"); if (!pluginSlugJ) @@ -518,12 +518,7 @@ engine::Module* moduleFromJson(json_t* moduleJ) { Model* model = getModel(pluginSlug, modelSlug); if (!model) throw Exception(string::f("Could not find module \"%s\" of plugin \"%s\"", modelSlug.c_str(), pluginSlug.c_str())); - - // Create Module - engine::Module* module = model->createModule(); - assert(module); - module->fromJson(moduleJ); - return module; + return model; }