Browse Source

Improved scheduling when multiple threads are fighting for the MessageManagerLock

tags/2021-05-28
hogliux 8 years ago
parent
commit
b9b34393d1
9 changed files with 344 additions and 144 deletions
  1. +15
    -6
      modules/juce_core/threads/juce_ScopedLock.h
  2. +11
    -0
      modules/juce_core/threads/juce_Thread.cpp
  3. +22
    -0
      modules/juce_core/threads/juce_Thread.h
  4. +11
    -0
      modules/juce_core/threads/juce_ThreadPool.cpp
  5. +11
    -0
      modules/juce_core/threads/juce_ThreadPool.h
  6. +124
    -59
      modules/juce_events/messages/juce_MessageManager.cpp
  7. +119
    -39
      modules/juce_events/messages/juce_MessageManager.h
  8. +26
    -35
      modules/juce_opengl/opengl/juce_OpenGLContext.cpp
  9. +5
    -5
      modules/juce_osc/osc/juce_OSCReceiver.cpp

+ 15
- 6
modules/juce_core/threads/juce_ScopedLock.h View File

@@ -194,16 +194,22 @@ public:
//==============================================================================
/** Creates a GenericScopedTryLock.
As soon as it is created, this will attempt to acquire the lock, and when the
GenericScopedTryLock is deleted, the lock will be released (if the lock was
successfully acquired).
If acquireLockOnInitialisation is true then as soon as this ScopedTryLock
is created, it will attempt to acquire the lock with tryEnter.
You can retry acquiring the lock by calling retryLock.
When GenericScopedTryLock is deleted, the lock will be released (if the lock
was successfully acquired).
Make sure this object is created and deleted by the same thread,
otherwise there are no guarantees what will happen! Best just to use it
as a local stack object, rather than creating one with the new() operator.
@see retryLock, isLocked
*/
inline explicit GenericScopedTryLock (const LockType& lock) noexcept
: lock_ (lock), lockWasSuccessful (lock.tryEnter()) {}
inline explicit GenericScopedTryLock (const LockType& lock, bool acquireLockOnInitialisation = true) noexcept
: lock_ (lock), lockWasSuccessful (acquireLockOnInitialisation && lock.tryEnter()) {}
/** Destructor.
@@ -218,10 +224,13 @@ public:
/** Returns true if the mutex was successfully locked. */
bool isLocked() const noexcept { return lockWasSuccessful; }
/** Retry gaining the lock by calling tryEnter on the underlying lock. */
bool retryLock() const noexcept { lockWasSuccessful = lock_.tryEnter(); return lockWasSuccessful; }
private:
//==============================================================================
const LockType& lock_;
const bool lockWasSuccessful;
mutable bool lockWasSuccessful;
JUCE_DECLARE_NON_COPYABLE (GenericScopedTryLock)
};


+ 11
- 0
modules/juce_core/threads/juce_Thread.cpp View File

@@ -167,6 +167,7 @@ Thread* JUCE_CALLTYPE Thread::getCurrentThread()
void Thread::signalThreadShouldExit()
{
shouldExit = true;
listeners.call (&Listener::exitSignalSent);
}
bool Thread::currentThreadShouldExit()
@@ -229,6 +230,16 @@ bool Thread::stopThread (const int timeOutMilliseconds)
return true;
}
void Thread::addListener (Listener* listener)
{
listeners.add (listener);
}
void Thread::removeListener (Listener* listener)
{
listeners.remove (listener);
}
//==============================================================================
bool Thread::setPriority (int newPriority)
{


+ 22
- 0
modules/juce_core/threads/juce_Thread.h View File

@@ -172,6 +172,27 @@ public:
*/
bool waitForThreadToExit (int timeOutMilliseconds) const;
//==============================================================================
class Listener
{
public:
virtual ~Listener() {}
/** Called if Thread::signalThreadShouldExit was called.
@see Thread::threadShouldExit, Thread::addListener, Thread::removeListener
*/
virtual void exitSignalSent() = 0;
};
/** Add a listener to this thread which will receive a callback when
signalThreadShouldExit was called on this thread.
@see signalThreadShouldExit, removeListener
*/
void addListener (Listener*);
/** Removes a listener added with addListener. */
void removeListener (Listener*);
//==============================================================================
/** Special realtime audio thread priority
@@ -313,6 +334,7 @@ private:
uint32 affinityMask = 0;
bool deleteOnThreadEnd = false;
bool volatile shouldExit = false;
ListenerList<Listener, Array<Listener*, CriticalSection>> listeners;
#if JUCE_ANDROID
bool isAndroidRealtimeThread = false;


+ 11
- 0
modules/juce_core/threads/juce_ThreadPool.cpp View File

@@ -69,6 +69,17 @@ void ThreadPoolJob::setJobName (const String& newName)
void ThreadPoolJob::signalJobShouldExit()
{
shouldStop = true;
listeners.call (&Thread::Listener::exitSignalSent);
}
void ThreadPoolJob::addListener (Thread::Listener* listener)
{
listeners.add (listener);
}
void ThreadPoolJob::removeListener (Thread::Listener* listener)
{
listeners.remove (listener);
}
ThreadPoolJob* ThreadPoolJob::getCurrentThreadPoolJob()


+ 11
- 0
modules/juce_core/threads/juce_ThreadPool.h View File

@@ -113,6 +113,16 @@ public:
*/
void signalJobShouldExit();
/** Add a listener to this thread job which will receive a callback when
signalJobShouldExit was called on this thread job.
@see signalJobShouldExit, removeListener
*/
void addListener (Thread::Listener*);
/** Removes a listener added with addListener. */
void removeListener (Thread::Listener*);
//==============================================================================
/** If the calling thread is being invoked inside a runJob() method, this will
return the ThreadPoolJob that it belongs to.
@@ -126,6 +136,7 @@ private:
String jobName;
ThreadPool* pool = nullptr;
bool shouldStop = false, isActive = false, shouldBeDeleted = false;
ListenerList<Thread::Listener, Array<Thread::Listener*, CriticalSection>> listeners;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolJob)
};


+ 124
- 59
modules/juce_events/messages/juce_MessageManager.cpp View File

@@ -228,7 +228,7 @@ void MessageManager::setCurrentThreadAsMessageThread()
bool MessageManager::currentThreadHasLockedMessageManager() const noexcept
{
const Thread::ThreadID thisThread = Thread::getCurrentThreadId();
return thisThread == messageThreadId || thisThread == threadWithLock;
return thisThread == messageThreadId || thisThread == threadWithLock.get();
}
//==============================================================================
@@ -242,121 +242,186 @@ bool MessageManager::currentThreadHasLockedMessageManager() const noexcept
accessed from another thread inside a MM lock, you're screwed. (this is exactly what happens
in Cocoa).
*/
class MessageManagerLock::BlockingMessage : public MessageManager::MessageBase
struct MessageManager::Lock::BlockingMessage : public MessageManager::MessageBase
{
public:
BlockingMessage() noexcept {}
BlockingMessage (const MessageManager::Lock* parent) noexcept
// need a const_cast here as VS2013 doesn't like a const pointer to be in an atomic
: owner (const_cast<MessageManager::Lock*> (parent)) {}
void messageCallback() override
{
lockedEvent.signal();
{
ScopedLock lock (ownerCriticalSection);
if (auto* o = owner.get())
o->messageCallback();
}
releaseEvent.wait();
}
WaitableEvent lockedEvent, releaseEvent;
CriticalSection ownerCriticalSection;
Atomic<MessageManager::Lock*> owner;
WaitableEvent releaseEvent;
JUCE_DECLARE_NON_COPYABLE (BlockingMessage)
};
//==============================================================================
MessageManagerLock::MessageManagerLock (Thread* const threadToCheck)
: blockingMessage(), checker (threadToCheck, nullptr),
locked (attemptLock (threadToCheck != nullptr ? &checker : nullptr))
{
}
MessageManagerLock::MessageManagerLock (ThreadPoolJob* const jobToCheckForExitSignal)
: blockingMessage(), checker (nullptr, jobToCheckForExitSignal),
locked (attemptLock (jobToCheckForExitSignal != nullptr ? &checker : nullptr))
{
}
MessageManager::Lock::Lock() {}
MessageManager::Lock::~Lock() { exit(); }
void MessageManager::Lock::enter() const noexcept { tryAcquire (true); }
bool MessageManager::Lock::tryEnter() const noexcept { return tryAcquire (false); }
MessageManagerLock::MessageManagerLock (BailOutChecker& bailOutChecker)
: blockingMessage(), checker (nullptr, nullptr),
locked (attemptLock (&bailOutChecker))
{
}
bool MessageManagerLock::attemptLock (BailOutChecker* bailOutChecker)
bool MessageManager::Lock::tryAcquire (bool lockIsMandatory) const noexcept
{
auto* mm = MessageManager::instance;
if (mm == nullptr)
{
jassertfalse;
return false;
}
if (! lockIsMandatory && (abortWait.get() != 0))
{
abortWait.set (0);
return false;
}
if (mm->currentThreadHasLockedMessageManager())
return true;
if (bailOutChecker == nullptr)
try
{
mm->lockingLock.enter();
blockingMessage = new BlockingMessage (this);
}
else
catch (...)
{
while (! mm->lockingLock.tryEnter())
{
if (bailOutChecker->shouldAbortAcquiringLock())
return false;
Thread::yield();
}
jassert (! lockIsMandatory);
return false;
}
blockingMessage = new BlockingMessage();
if (! blockingMessage->post())
{
// post of message failed while trying to get the lock
jassert (! lockIsMandatory);
blockingMessage = nullptr;
return false;
}
while (! blockingMessage->lockedEvent.wait (20))
do
{
if (bailOutChecker != nullptr && bailOutChecker->shouldAbortAcquiringLock())
while (abortWait.get() == 0)
lockedEvent.wait (-1);
abortWait.set (0);
if (lockGained.get() != 0)
{
blockingMessage->releaseEvent.signal();
blockingMessage = nullptr;
mm->lockingLock.exit();
return false;
mm->threadWithLock = Thread::getCurrentThreadId();
return true;
}
}
jassert (mm->threadWithLock == 0);
} while (lockIsMandatory);
mm->threadWithLock = Thread::getCurrentThreadId();
return true;
// we didn't get the lock
blockingMessage->releaseEvent.signal();
{
ScopedLock lock (blockingMessage->ownerCriticalSection);
lockGained.set (0);
blockingMessage->owner.set (nullptr);
}
blockingMessage = nullptr;
return false;
}
MessageManagerLock::~MessageManagerLock() noexcept
void MessageManager::Lock::exit() const noexcept
{
if (blockingMessage != nullptr)
if (lockGained.compareAndSetBool (false, true))
{
auto* mm = MessageManager::instance;
jassert (mm == nullptr || mm->currentThreadHasLockedMessageManager());
blockingMessage->releaseEvent.signal();
blockingMessage = nullptr;
lockGained.set (0);
if (mm != nullptr)
{
mm->threadWithLock = 0;
mm->lockingLock.exit();
if (blockingMessage != nullptr)
{
blockingMessage->releaseEvent.signal();
blockingMessage = nullptr;
}
}
}
void MessageManager::Lock::messageCallback() const
{
lockGained.set (1);
abort();
}
void MessageManager::Lock::abort() const noexcept
{
abortWait.set (1);
lockedEvent.signal();
}
//==============================================================================
MessageManagerLock::ThreadChecker::ThreadChecker (Thread* const threadToUse,
ThreadPoolJob* const threadJobToUse)
: threadToCheck (threadToUse), job (threadJobToUse)
MessageManagerLock::MessageManagerLock (Thread* threadToCheck)
: locked (attemptLock (threadToCheck, nullptr))
{}
MessageManagerLock::MessageManagerLock (ThreadPoolJob* jobToCheck)
: locked (attemptLock (nullptr, jobToCheck))
{}
bool MessageManagerLock::attemptLock (Thread* threadToCheck, ThreadPoolJob* jobToCheck)
{
jassert (threadToCheck == nullptr || jobToCheck == nullptr);
if (threadToCheck != nullptr)
threadToCheck->addListener (this);
if (jobToCheck != nullptr)
jobToCheck->addListener (this);
// tryEnter may have a spurious abort (return false) so keep checking the condition
while ((threadToCheck == nullptr || ! threadToCheck->threadShouldExit())
&& (jobToCheck == nullptr || ! jobToCheck->shouldExit()))
{
if (mmLock.tryEnter())
break;
}
if (threadToCheck != nullptr)
{
threadToCheck->removeListener (this);
if (threadToCheck->threadShouldExit())
return false;
}
if (jobToCheck != nullptr)
{
jobToCheck->removeListener (this);
if (jobToCheck->shouldExit())
return false;
}
return true;
}
bool MessageManagerLock::ThreadChecker::shouldAbortAcquiringLock()
MessageManagerLock::~MessageManagerLock() noexcept { mmLock.exit(); }
void MessageManagerLock::exitSignalSent()
{
return (threadToCheck != nullptr && threadToCheck->threadShouldExit())
|| (job != nullptr && job->shouldExit());
mmLock.abort();
}
//==============================================================================


+ 119
- 39
modules/juce_events/messages/juce_MessageManager.h View File

@@ -185,6 +185,120 @@ public:
JUCE_DECLARE_NON_COPYABLE (MessageBase)
};
//==============================================================================
/** A lock you can use to lock the message manager. You can use this class with
the RAII-based ScopedLock classes.
*/
class Lock
{
public:
/**
Creates a new critical section to exclusively access methods which can
only be called when the message manager is locked.
Unlike CrititcalSection, multiple instances of this lock class provide
exclusive access to a single resource - the MessageManager.
*/
Lock();
/** Destructor. */
~Lock();
/** Acquires the message manager lock.
If the caller thread already has exclusive access to the MessageManager, this method
will return immediately.
If another thread is currently using the MessageManager, this will wait until that
thread releases the lock to the MessageManager.
This call will only exit if the lock was accquired by this thread. Calling abort while
a thread is waiting for enter to finish, will have no effect.
@see exit, abort
*/
void enter() const noexcept;
/** Attempts to lock the meesage manager and exits if abort is called.
This method behaves identically to enter, except that it will abort waiting for
the lock if the abort method is called.
Unlike other JUCE critical sections, this method **will** block waiting for the lock.
To ensure predictable behaviour, you should re-check your abort condition if tryEnter
returns false.
This method can be used if you want to do some work while waiting for the
MessageManagerLock:
void doWorkWhileWaitingForMessageManagerLock()
{
MessageManager::Lock::ScopedTryLockType mmLock (messageManagerLock);
while (! mmLock.isLocked())
{
while (workQueue.size() > 0)
{
auto work = workQueue.pop();
doSomeWork (work);
}
// this will block until we either have the lock or there is work
mmLock.retryLock();
}
// we have the mmlock
// do some message manager stuff like resizing and painting components
}
// called from another thread
void addWorkToDo (Work work)
{
queue.push (work);
messageManagerLock.abort();
}
@returns false if waiting for a lock was aborted, true if the lock was accquired.
@see enter, abort, ScopedTryLock
*/
bool tryEnter() const noexcept;
/** Releases the message manager lock.
@see enter, ScopedLock
*/
void exit() const noexcept;
/** Unblocks a thread which is waiting in tryEnter
Call this method if you want to unblock a thread which is waiting for the
MessageManager lock in tryEnter.
This method does not have any effetc on a thread waiting for a lock in enter.
@see tryEnter
*/
void abort() const noexcept;
//==============================================================================
/** Provides the type of scoped lock to use with a CriticalSection. */
typedef GenericScopedLock<Lock> ScopedLockType;
/** Provides the type of scoped unlocker to use with a CriticalSection. */
typedef GenericScopedUnlock<Lock> ScopedUnlockType;
/** Provides the type of scoped try-locker to use with a CriticalSection. */
typedef GenericScopedTryLock<Lock> ScopedTryLockType;
private:
struct BlockingMessage;
friend class ReferenceCountedObjectPtr<BlockingMessage>;
bool tryAcquire (bool) const noexcept;
void messageCallback() const;
//==============================================================================
mutable ReferenceCountedObjectPtr<BlockingMessage> blockingMessage;
WaitableEvent lockedEvent;
mutable Atomic<int> abortWait, lockGained;
};
//==============================================================================
#ifndef DOXYGEN
// Internal methods - do not use!
@@ -206,8 +320,7 @@ private:
ScopedPointer<ActionBroadcaster> broadcaster;
bool quitMessagePosted = false, quitMessageReceived = false;
Thread::ThreadID messageThreadId;
Thread::ThreadID volatile threadWithLock = {};
CriticalSection lockingLock;
Atomic<Thread::ThreadID> threadWithLock;
static bool postMessageToSystemQueue (MessageBase*);
static void* exitModalLoopCallback (void*);
@@ -264,7 +377,7 @@ private:
@see MessageManager, MessageManager::currentThreadHasLockedMessageManager
*/
class JUCE_API MessageManagerLock
class JUCE_API MessageManagerLock : private Thread::Listener
{
public:
//==============================================================================
@@ -319,23 +432,6 @@ public:
*/
MessageManagerLock (ThreadPoolJob* jobToCheckForExitSignal);
//==============================================================================
struct BailOutChecker
{
virtual ~BailOutChecker() {}
/** Return true if acquiring the lock should be aborted. */
virtual bool shouldAbortAcquiringLock() = 0;
};
/** This is an abstraction of the other constructors. You can pass this constructor
a functor which is periodically checked if attempting the lock should be aborted.
See the MessageManagerLock (Thread*) constructor for details on how this works.
*/
MessageManagerLock (BailOutChecker&);
//==============================================================================
/** Releases the current thread's lock on the message manager.
@@ -351,29 +447,13 @@ public:
bool lockWasGained() const noexcept { return locked; }
private:
class BlockingMessage;
friend class ReferenceCountedObjectPtr<BlockingMessage>;
ReferenceCountedObjectPtr<BlockingMessage> blockingMessage;
struct ThreadChecker : BailOutChecker
{
ThreadChecker (Thread* const, ThreadPoolJob* const);
// Required to supress VS2013 compiler warnings
ThreadChecker& operator= (const ThreadChecker&) = delete;
bool shouldAbortAcquiringLock() override;
Thread* const threadToCheck;
ThreadPoolJob* const job;
};
//==============================================================================
ThreadChecker checker;
MessageManager::Lock mmLock;
bool locked;
//==============================================================================
bool attemptLock (BailOutChecker*);
bool attemptLock (Thread*, ThreadPoolJob*);
void exitSignalSent() override;
JUCE_DECLARE_NON_COPYABLE (MessageManagerLock)
};


+ 26
- 35
modules/juce_opengl/opengl/juce_OpenGLContext.cpp View File

@@ -112,7 +112,8 @@ public:
if (! renderThread->contains (this))
resume();
execute (new DoNothingWorker(), true, true);
while (workQueue.size() != 0)
Thread::sleep (20);
}
pause();
@@ -125,6 +126,9 @@ public:
//==============================================================================
void pause()
{
signalJobShouldExit();
messageManagerLock.abort();
if (renderThread != nullptr)
{
repaintEvent.signal();
@@ -206,20 +210,24 @@ public:
bool renderFrame()
{
ScopedPointer<MessageManagerLock> mmLock;
MessageManager::Lock::ScopedTryLockType mmLock (messageManagerLock, false);
const bool isUpdating = needsUpdate.compareAndSetBool (0, 1);
if (context.renderComponents && isUpdating)
{
MessageLockWorker worker (*this);
// This avoids hogging the message thread when doing intensive rendering.
if (lastMMLockReleaseTime + 1 >= Time::getMillisecondCounter())
Thread::sleep (2);
mmLock = new MessageManagerLock (worker); // need to acquire this before locking the context.
if (! mmLock->lockWasGained())
while (! shouldExit())
{
doWorkWhileWaitingForLock (false);
if (mmLock.retryLock ())
break;
}
if (shouldExit())
return false;
updateViewportSize (false);
@@ -253,7 +261,7 @@ public:
if (! hasInitialised)
return false;
mmLock = nullptr;
messageManagerLock.exit();
lastMMLockReleaseTime = Time::getMillisecondCounter();
}
@@ -418,12 +426,15 @@ public:
JobStatus runJob() override
{
{
MessageLockWorker worker (*this);
// Allow the message thread to finish setting-up the context before using it..
MessageManagerLock mml (worker);
if (! mml.lockWasGained())
return ThreadPoolJob::jobHasFinished;
MessageManager::Lock::ScopedTryLockType mmLock (messageManagerLock, false);
do
{
if (shouldExit())
return ThreadPoolJob::jobHasFinished;
} while (! mmLock.retryLock ());
}
initialiseOnThread();
@@ -512,20 +523,6 @@ public:
nativeContext->shutdownOnRenderThread();
}
//==============================================================================
struct MessageLockWorker : public MessageManagerLock::BailOutChecker
{
MessageLockWorker (CachedImage& cachedImageRequestingLock)
: owner (cachedImageRequestingLock)
{
}
bool shouldAbortAcquiringLock() override { return owner.doWorkWhileWaitingForLock (false); }
CachedImage& owner;
JUCE_DECLARE_NON_COPYABLE (MessageLockWorker)
};
//==============================================================================
struct BlockingWorker : public OpenGLContext::AsyncWorker
{
@@ -582,6 +579,7 @@ public:
OpenGLContext::AsyncWorker::Ptr worker = (blocker != nullptr ? blocker : static_cast<OpenGLContext::AsyncWorker::Ptr&&> (workerToUse));
workQueue.add (worker);
messageManagerLock.abort();
context.triggerRepaint();
if (blocker != nullptr)
@@ -599,14 +597,6 @@ public:
return dynamic_cast<CachedImage*> (c.getCachedComponentImage());
}
//==============================================================================
// used to push no work on to the gl thread to easily block
struct DoNothingWorker : public OpenGLContext::AsyncWorker
{
DoNothingWorker() {}
void operator() (OpenGLContext&) override {}
};
//==============================================================================
friend class NativeContext;
ScopedPointer<NativeContext> nativeContext;
@@ -637,6 +627,7 @@ public:
ScopedPointer<ThreadPool> renderThread;
ReferenceCountedArray<OpenGLContext::AsyncWorker, CriticalSection> workQueue;
MessageManager::Lock messageManagerLock;
#if JUCE_IOS
iOSBackgroundProcessCheck backgroundProcessCheck;


+ 5
- 5
modules/juce_osc/osc/juce_OSCReceiver.cpp View File

@@ -361,12 +361,12 @@ struct OSCReceiver::Pimpl : private Thread,
}
//==============================================================================
void addListener (Listener<MessageLoopCallback>* listenerToAdd)
void addListener (OSCReceiver::Listener<MessageLoopCallback>* listenerToAdd)
{
listeners.add (listenerToAdd);
}
void addListener (Listener<RealtimeCallback>* listenerToAdd)
void addListener (OSCReceiver::Listener<RealtimeCallback>* listenerToAdd)
{
realtimeListeners.add (listenerToAdd);
}
@@ -383,12 +383,12 @@ struct OSCReceiver::Pimpl : private Thread,
addListenerWithAddress (listenerToAdd, addressToMatch, realtimeListenersWithAddress);
}
void removeListener (Listener<MessageLoopCallback>* listenerToRemove)
void removeListener (OSCReceiver::Listener<MessageLoopCallback>* listenerToRemove)
{
listeners.remove (listenerToRemove);
}
void removeListener (Listener<RealtimeCallback>* listenerToRemove)
void removeListener (OSCReceiver::Listener<RealtimeCallback>* listenerToRemove)
{
realtimeListeners.remove (listenerToRemove);
}
@@ -585,7 +585,7 @@ bool OSCReceiver::disconnect()
return pimpl->disconnect();
}
void OSCReceiver::addListener (Listener<MessageLoopCallback>* listenerToAdd)
void OSCReceiver::addListener (OSCReceiver::Listener<MessageLoopCallback>* listenerToAdd)
{
pimpl->addListener (listenerToAdd);
}


Loading…
Cancel
Save