Browse Source

Overhaul threading model for Engine using a shared mutex based design.

tags/v2.0.0
Andrew Belt 4 years ago
parent
commit
05427d938b
7 changed files with 133 additions and 67 deletions
  1. +28
    -7
      include/engine/Engine.hpp
  2. +2
    -2
      include/plugin.hpp
  3. +3
    -2
      src/app/ModuleWidget.cpp
  4. +7
    -2
      src/app/RackWidget.cpp
  5. +88
    -44
      src/engine/Engine.cpp
  6. +3
    -3
      src/history.cpp
  7. +2
    -7
      src/plugin.cpp

+ 28
- 7
include/engine/Engine.hpp View File

@@ -11,6 +11,13 @@ namespace rack {
namespace engine {


/** Manages Modules and Cables and steps them in time.

All methods are thread-safe and can safely be called from anywhere.

The methods clear, addModule, removeModule, moduleToJson, moduleFromJson, addCable, removeCable, addParamHandle, removeParamHandle, toJson, and fromJson cannot be run simultaneously with any other Engine method.
Calling these methods inside any Engine method will result in a deadlock.
*/
struct Engine {
struct Internal;
Internal* internal;
@@ -18,6 +25,7 @@ struct Engine {
Engine();
~Engine();

/** Removes all modules and cables. */
void clear();
/** Advances the engine by `frames` frames.
Only call this method from the primary module.
@@ -26,6 +34,8 @@ struct Engine {
void setPrimaryModule(Module* module);
Module* getPrimaryModule();

/** Returns the sample rate used by the engine for stepping each module.
*/
float getSampleRate();
/** Returns the inverse of the current sample rate.
*/
@@ -37,11 +47,14 @@ struct Engine {
/** Returns the number of audio samples since the Engine's first sample.
*/
int64_t getFrame();
/** Returns the frame when step() was last called. */
/** Returns the frame when step() was last called.
*/
int64_t getStepFrame();
/** Returns the timestamp in nanoseconds when step() was last called. */
/** Returns the timestamp in nanoseconds when step() was last called.
*/
int64_t getStepTime();
/** Returns the total number of frames in the current step() call. */
/** Returns the total number of frames in the current step() call.
*/
int getStepFrames();

// Modules
@@ -56,6 +69,10 @@ struct Engine {
void resetModule(Module* module);
void randomizeModule(Module* module);
void bypassModule(Module* module, bool bypassed);
/** Serializes/deserializes with locking, ensuring that Module::process() is not called during toJson()/fromJson().
*/
json_t* moduleToJson(Module* module);
void moduleFromJson(Module* module, json_t* rootJ);

// Cables
/** Adds a cable to the rack engine.
@@ -70,17 +87,21 @@ struct Engine {
// Params
void setParam(Module* module, int paramId, float value);
float getParam(Module* module, int paramId);
/** Requests the parameter to smoothly change toward `value`. */
/** Requests the parameter to smoothly change toward `value`.
*/
void setSmoothParam(Module* module, int paramId, float value);
/** Returns the target value before smoothing. */
/** Returns the target value before smoothing.
*/
float getSmoothParam(Module* module, int paramId);

// ParamHandles
void addParamHandle(ParamHandle* paramHandle);
void removeParamHandle(ParamHandle* paramHandle);
/** Returns the unique ParamHandle for the given paramId */
/** Returns the unique ParamHandle for the given paramId
*/
ParamHandle* getParamHandle(int moduleId, int paramId);
/** Use getParamHandle(int, int) instead. */
/** Use getParamHandle(int, int) instead.
*/
DEPRECATED ParamHandle* getParamHandle(Module* module, int paramId);
/** Sets the ParamHandle IDs and module pointer.
If `overwrite` is true and another ParamHandle points to the same param, unsets that one and replaces it with the given handle.


+ 2
- 2
include/plugin.hpp View File

@@ -35,10 +35,10 @@ void syncUpdates();
bool isSyncing();
Plugin* getPlugin(const std::string& pluginSlug);
Model* getModel(const std::string& pluginSlug, const std::string& modelSlug);
/** Creates a Module from a JSON module object.
/** Creates a Model from a JSON module object.
Throws an Exception if the model is not found.
*/
engine::Module* moduleFromJson(json_t* moduleJ);
Model* modelFromJson(json_t* moduleJ);
/** Checks that the slug contains only alphanumeric characters, "-", and "_" */
bool isSlugValid(const std::string& slug);
/** Returns a string containing only the valid slug characters. */


+ 3
- 2
src/app/ModuleWidget.cpp View File

@@ -572,12 +572,12 @@ PortWidget* ModuleWidget::getOutput(int portId) {
}

json_t* ModuleWidget::toJson() {
json_t* moduleJ = module->toJson();
json_t* moduleJ = APP->engine->moduleToJson(module);
return moduleJ;
}

void ModuleWidget::fromJson(json_t* rootJ) {
module->fromJson(rootJ);
APP->engine->moduleFromJson(module, rootJ);
}

void ModuleWidget::copyClipboard() {
@@ -843,6 +843,7 @@ void ModuleWidget::cloneAction() {
engine::Module* clonedModule = model->createModule();
// JSON serialization is the obvious way to do this
json_t* moduleJ = toJson();
// This doesn't need a lock (via Engine::moduleFromJson()) because the Module is not added to the Engine yet.
clonedModule->fromJson(moduleJ);
json_decref(moduleJ);
// Reset ID so the Engine automatically assigns a new one


+ 7
- 2
src/app/RackWidget.cpp View File

@@ -23,8 +23,10 @@ namespace app {

/** Creates a new Module and ModuleWidget */
ModuleWidget* moduleWidgetFromJson(json_t* moduleJ) {
engine::Module* module = plugin::moduleFromJson(moduleJ);
plugin::Model* model = plugin::modelFromJson(moduleJ);
engine::Module* module = model->createModule();
assert(module);
module->fromJson(moduleJ);

// Create ModuleWidget
ModuleWidget* moduleWidget = module->model->createModuleWidget(module);
@@ -305,7 +307,10 @@ void RackWidget::pastePresetClipboardAction() {
});

try {
engine::Module* module = plugin::moduleFromJson(moduleJ);
plugin::Model* model = plugin::modelFromJson(moduleJ);
engine::Module* module = model->createModule();
assert(module);
module->fromJson(moduleJ);
// Reset ID so the Engine automatically assigns a new one
module->id = -1;
APP->engine->addModule(module);


+ 88
- 44
src/engine/Engine.cpp View File

@@ -5,6 +5,7 @@
#include <atomic>
#include <tuple>
#include <pmmintrin.h>
#include <pthread.h>

#include <engine/Engine.hpp>
#include <settings.hpp>
@@ -29,6 +30,44 @@ static void initMXCSR() {
}


/** Allows multiple "reader" threads to obtain a lock simultaneously, but only one "writer" thread.
This implementation is just a wrapper for pthreads.
This is available in C++14 as std::shared_mutex, but unfortunately we're using C++11.
*/
struct SharedMutex {
pthread_rwlock_t rwlock;
SharedMutex() {
if (pthread_rwlock_init(&rwlock, NULL))
throw Exception("pthread_rwlock_init failed");
}
~SharedMutex() {
pthread_rwlock_destroy(&rwlock);
}
};

struct SharedLock {
SharedMutex& m;
SharedLock(SharedMutex& m) : m(m) {
if (pthread_rwlock_rdlock(&m.rwlock))
throw Exception("pthread_rwlock_rdlock failed");
}
~SharedLock() {
pthread_rwlock_unlock(&m.rwlock);
}
};

struct ExclusiveSharedLock {
SharedMutex& m;
ExclusiveSharedLock(SharedMutex& m) : m(m) {
if (pthread_rwlock_wrlock(&m.rwlock))
throw Exception("pthread_rwlock_wrlock failed");
}
~ExclusiveSharedLock() {
pthread_rwlock_unlock(&m.rwlock);
}
};


struct Barrier {
std::mutex mutex;
std::condition_variable cv;
@@ -164,7 +203,7 @@ struct Engine::Internal {
int smoothParamId = 0;
float smoothValue = 0.f;

std::recursive_mutex mutex;
SharedMutex mutex;

int threadCount = 0;
std::vector<EngineWorker> workers;
@@ -408,7 +447,8 @@ Engine::~Engine() {


void Engine::clear() {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
// TODO This needs a lock that doesn't interfere with removeParamHandle, removeCable, and removeModule.

// Copy lists because we'll be removing while iterating
std::set<ParamHandle*> paramHandles = internal->paramHandles;
for (ParamHandle* paramHandle : paramHandles) {
@@ -432,7 +472,7 @@ void Engine::clear() {


void Engine::step(int frames) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
SharedLock lock(internal->mutex);
// Configure thread
initMXCSR();
random::init();
@@ -474,25 +514,22 @@ void Engine::step(int frames) {


void Engine::setPrimaryModule(Module* module) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
SharedLock lock(internal->mutex);
internal->primaryModule = module;
}


Module* Engine::getPrimaryModule() {
// No lock, for performance
return internal->primaryModule;
}


float Engine::getSampleRate() {
// No lock, for performance
return internal->sampleRate;
}


float Engine::getSampleTime() {
// No lock, for performance
return internal->sampleTime;
}

@@ -503,31 +540,27 @@ void Engine::yieldWorkers() {


int64_t Engine::getFrame() {
// No lock, for performance
return internal->frame;
}


int64_t Engine::getStepFrame() {
// No lock, for performance
return internal->stepFrame;
}


int64_t Engine::getStepTime() {
// No lock, for performance
return internal->stepTime;
}


int Engine::getStepFrames() {
// No lock, for performance
return internal->stepFrames;
}


void Engine::addModule(Module* module) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
ExclusiveSharedLock lock(internal->mutex);
assert(module);
// Check that the module is not already added
auto it = std::find(internal->modules.begin(), internal->modules.end(), module);
@@ -557,12 +590,11 @@ void Engine::addModule(Module* module) {
if (paramHandle->moduleId == module->id)
paramHandle->module = module;
}
// DEBUG("Added module %d to engine", module->id);
}


void Engine::removeModule(Module* module) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
ExclusiveSharedLock lock(internal->mutex);
assert(module);
// Check that the module actually exists
auto it = std::find(internal->modules.begin(), internal->modules.end(), module);
@@ -600,13 +632,12 @@ void Engine::removeModule(Module* module) {
internal->primaryModule = NULL;
// Remove module
internal->modules.erase(it);
// DEBUG("Removed module %d to engine", module->id);
}


Module* Engine::getModule(int moduleId) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
// Find module
SharedLock lock(internal->mutex);
// Find module by id
for (Module* module : internal->modules) {
if (module->id == moduleId)
return module;
@@ -616,7 +647,7 @@ Module* Engine::getModule(int moduleId) {


void Engine::resetModule(Module* module) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
SharedLock lock(internal->mutex);
assert(module);

Module::ResetEvent eReset;
@@ -625,7 +656,7 @@ void Engine::resetModule(Module* module) {


void Engine::randomizeModule(Module* module) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
SharedLock lock(internal->mutex);
assert(module);

Module::RandomizeEvent eRandomize;
@@ -634,7 +665,7 @@ void Engine::randomizeModule(Module* module) {


void Engine::bypassModule(Module* module, bool bypassed) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
SharedLock lock(internal->mutex);
assert(module);
if (module->bypassed() == bypassed)
return;
@@ -656,6 +687,18 @@ void Engine::bypassModule(Module* module, bool bypassed) {
}


json_t* Engine::moduleToJson(Module* module) {
ExclusiveSharedLock lock(internal->mutex);
return module->toJson();
}


void Engine::moduleFromJson(Module* module, json_t* rootJ) {
ExclusiveSharedLock lock(internal->mutex);
module->fromJson(rootJ);
}


static void Port_setDisconnected(Port* that) {
that->channels = 0;
for (int c = 0; c < PORT_MAX_CHANNELS; c++) {
@@ -704,7 +747,7 @@ static void Engine_updateConnected(Engine* that) {


void Engine::addCable(Cable* cable) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
ExclusiveSharedLock lock(internal->mutex);
assert(cable);
// Check cable properties
assert(cable->inputModule);
@@ -754,12 +797,11 @@ void Engine::addCable(Cable* cable) {
e.portId = cable->outputId;
cable->outputModule->onPortChange(e);
}
// DEBUG("Added cable %d to engine", cable->id);
}


void Engine::removeCable(Cable* cable) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
ExclusiveSharedLock lock(internal->mutex);
assert(cable);
// Check that the cable is already added
auto it = std::find(internal->cables.begin(), internal->cables.end(), cable);
@@ -790,13 +832,12 @@ void Engine::removeCable(Cable* cable) {
e.portId = cable->outputId;
cable->outputModule->onPortChange(e);
}
// DEBUG("Removed cable %d to engine", cable->id);
}


Cable* Engine::getCable(int cableId) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
// Find Cable
SharedLock lock(internal->mutex);
// Find cable by id
for (Cable* cable : internal->cables) {
if (cable->id == cableId)
return cable;
@@ -806,7 +847,7 @@ Cable* Engine::getCable(int cableId) {


void Engine::setParam(Module* module, int paramId, float value) {
// No lock, for performance
SharedLock lock(internal->mutex);
// If param is being smoothed, cancel smoothing.
if (internal->smoothModule == module && internal->smoothParamId == paramId) {
internal->smoothModule = NULL;
@@ -817,13 +858,13 @@ void Engine::setParam(Module* module, int paramId, float value) {


float Engine::getParam(Module* module, int paramId) {
// No lock, for performance
SharedLock lock(internal->mutex);
return module->params[paramId].value;
}


void Engine::setSmoothParam(Module* module, int paramId, float value) {
// No lock, for performance
SharedLock lock(internal->mutex);
// If another param is being smoothed, jump value
if (internal->smoothModule && !(internal->smoothModule == module && internal->smoothParamId == paramId)) {
internal->smoothModule->params[internal->smoothParamId].value = internal->smoothValue;
@@ -836,10 +877,10 @@ void Engine::setSmoothParam(Module* module, int paramId, float value) {


float Engine::getSmoothParam(Module* module, int paramId) {
// No lock, for performance
SharedLock lock(internal->mutex);
if (internal->smoothModule == module && internal->smoothParamId == paramId)
return internal->smoothValue;
return getParam(module, paramId);
return module->params[paramId].value;
}


@@ -856,8 +897,7 @@ static void Engine_refreshParamHandleCache(Engine* that) {


void Engine::addParamHandle(ParamHandle* paramHandle) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);

ExclusiveSharedLock lock(internal->mutex);
// New ParamHandles must be blank.
// This means we don't have to refresh the cache.
assert(paramHandle->moduleId < 0);
@@ -872,8 +912,7 @@ void Engine::addParamHandle(ParamHandle* paramHandle) {


void Engine::removeParamHandle(ParamHandle* paramHandle) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);

ExclusiveSharedLock lock(internal->mutex);
// Check that the ParamHandle is already added
auto it = internal->paramHandles.find(paramHandle);
assert(it != internal->paramHandles.end());
@@ -886,7 +925,7 @@ void Engine::removeParamHandle(ParamHandle* paramHandle) {


ParamHandle* Engine::getParamHandle(int moduleId, int paramId) {
// No lock, for performance
SharedLock lock(internal->mutex);
auto it = internal->paramHandleCache.find(std::make_tuple(moduleId, paramId));
if (it == internal->paramHandleCache.end())
return NULL;
@@ -895,14 +934,12 @@ ParamHandle* Engine::getParamHandle(int moduleId, int paramId) {


ParamHandle* Engine::getParamHandle(Module* module, int paramId) {
// No lock, for performance
return getParamHandle(module->id, paramId);
}


void Engine::updateParamHandle(ParamHandle* paramHandle, int moduleId, int paramId, bool overwrite) {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);

SharedLock lock(internal->mutex);
// Check that it exists
auto it = internal->paramHandles.find(paramHandle);
assert(it != internal->paramHandles.end());
@@ -940,7 +977,7 @@ void Engine::updateParamHandle(ParamHandle* paramHandle, int moduleId, int param


json_t* Engine::toJson() {
std::lock_guard<std::recursive_mutex> lock(internal->mutex);
ExclusiveSharedLock lock(internal->mutex);
json_t* rootJ = json_object();

// modules
@@ -966,9 +1003,10 @@ json_t* Engine::toJson() {


void Engine::fromJson(json_t* rootJ) {
// Don't lock here because Module::fromJson for example might deadlock, and we actually don't really need thread safety other than the addModule() and addCable() calls, which are already behind locks.
// std::lock_guard<std::recursive_mutex> lock(internal->mutex);

// We can't lock here because addModule() and addCable() are called inside.
// Also, AudioInterface::fromJson() might call Engine::step() due to RtAudio drivers.
// ExclusiveSharedLock lock(internal->mutex);
clear();
// modules
json_t* modulesJ = json_object_get(rootJ, "modules");
if (!modulesJ)
@@ -977,13 +1015,18 @@ void Engine::fromJson(json_t* rootJ) {
json_t* moduleJ;
json_array_foreach(modulesJ, moduleIndex, moduleJ) {
try {
Module* module = plugin::moduleFromJson(moduleJ);
plugin::Model* model = plugin::modelFromJson(moduleJ);
Module* module = model->createModule();
assert(module);
// This doesn't need a lock because the Module is not added to the Engine yet.
module->fromJson(moduleJ);

// Before 1.0, the module ID was the index in the "modules" array
if (APP->patch->isLegacy(2)) {
module->id = moduleIndex;
}

// This method locks
addModule(module);
}
catch (Exception& e) {
@@ -1005,6 +1048,7 @@ void Engine::fromJson(json_t* rootJ) {
Cable* cable = new Cable;
try {
cable->fromJson(cableJ);
// This method locks
addCable(cable);
}
catch (Exception& e) {


+ 3
- 3
src/history.cpp View File

@@ -48,7 +48,7 @@ void ModuleAdd::setModule(app::ModuleWidget* mw) {
pos = mw->box.pos;
// ModuleAdd doesn't *really* need the state to be serialized, although ModuleRemove certainly does.
// However, creating a module may give it a nondeterministic initial state for whatever reason, so serialize anyway.
moduleJ = mw->module->toJson();
moduleJ = APP->engine->moduleToJson(mw->module);
}

void ModuleAdd::undo() {
@@ -104,13 +104,13 @@ ModuleChange::~ModuleChange() {
void ModuleChange::undo() {
engine::Module* module = APP->engine->getModule(moduleId);
assert(module);
module->fromJson(oldModuleJ);
APP->engine->moduleFromJson(module, oldModuleJ);
}

void ModuleChange::redo() {
engine::Module* module = APP->engine->getModule(moduleId);
assert(module);
module->fromJson(newModuleJ);
APP->engine->moduleFromJson(module, newModuleJ);
}




+ 2
- 7
src/plugin.cpp View File

@@ -500,7 +500,7 @@ Model* getModel(const std::string& pluginSlug, const std::string& modelSlug) {
}


engine::Module* moduleFromJson(json_t* moduleJ) {
Model* modelFromJson(json_t* moduleJ) {
// Get slugs
json_t* pluginSlugJ = json_object_get(moduleJ, "plugin");
if (!pluginSlugJ)
@@ -518,12 +518,7 @@ engine::Module* moduleFromJson(json_t* moduleJ) {
Model* model = getModel(pluginSlug, modelSlug);
if (!model)
throw Exception(string::f("Could not find module \"%s\" of plugin \"%s\"", modelSlug.c_str(), pluginSlug.c_str()));

// Create Module
engine::Module* module = model->createModule();
assert(module);
module->fromJson(moduleJ);
return module;
return model;
}




Loading…
Cancel
Save