diff --git a/modules/juce_events/interprocess/juce_InterprocessConnection.cpp b/modules/juce_events/interprocess/juce_InterprocessConnection.cpp index a06167a01f..72f414a579 100644 --- a/modules/juce_events/interprocess/juce_InterprocessConnection.cpp +++ b/modules/juce_events/interprocess/juce_InterprocessConnection.cpp @@ -32,19 +32,64 @@ struct InterprocessConnection::ConnectionThread : public Thread JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread) }; +class SafeActionImpl +{ +public: + explicit SafeActionImpl (InterprocessConnection& p) + : ref (p) {} + + template + void ifSafe (Fn&& fn) + { + const ScopedLock lock (mutex); + + if (safe) + fn (ref); + } + + void setSafe (bool s) + { + const ScopedLock lock (mutex); + safe = s; + } + + bool isSafe() + { + const ScopedLock lock (mutex); + return safe; + } + +private: + CriticalSection mutex; + InterprocessConnection& ref; + bool safe = false; +}; + +class InterprocessConnection::SafeAction : public SafeActionImpl +{ + using SafeActionImpl::SafeActionImpl; +}; + //============================================================================== InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber) : useMessageThread (callbacksOnMessageThread), - magicMessageHeader (magicMessageHeaderNumber) + magicMessageHeader (magicMessageHeaderNumber), + safeAction (std::make_shared (*this)) { thread.reset (new ConnectionThread (*this)); } InterprocessConnection::~InterprocessConnection() { + // You *must* call `disconnect` in the destructor of your derived class to ensure + // that any pending messages are not delivered. If the messages were delivered after + // destroying the derived class, we'd end up calling the pure virtual implementations + // of `messageReceived`, `connectionMade` and `connectionLost` which is definitely + // not a good idea! + jassert (! safeAction->isSafe()); + callbackConnectionState = false; disconnect(); - masterReference.clear(); thread.reset(); } @@ -54,18 +99,15 @@ bool InterprocessConnection::connectToSocket (const String& hostName, { disconnect(); - const ScopedLock sl (pipeAndSocketLock); - socket.reset (new StreamingSocket()); + auto s = std::make_unique(); - if (socket->connect (hostName, portNumber, timeOutMillisecs)) + if (s->connect (hostName, portNumber, timeOutMillisecs)) { - threadIsRunning = true; - connectionMadeInt(); - thread->startThread(); + const ScopedLock sl (pipeAndSocketLock); + initialiseWithSocket (std::move (s)); return true; } - socket.reset(); return false; } @@ -73,13 +115,13 @@ bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutM { disconnect(); - std::unique_ptr newPipe (new NamedPipe()); + auto newPipe = std::make_unique(); if (newPipe->openExisting (pipeName)) { const ScopedLock sl (pipeAndSocketLock); pipeReceiveMessageTimeout = timeoutMs; - initialiseWithPipe (newPipe.release()); + initialiseWithPipe (std::move (newPipe)); return true; } @@ -90,13 +132,13 @@ bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs, { disconnect(); - std::unique_ptr newPipe (new NamedPipe()); + auto newPipe = std::make_unique(); if (newPipe->createNewPipe (pipeName, mustNotExist)) { const ScopedLock sl (pipeAndSocketLock); pipeReceiveMessageTimeout = timeoutMs; - initialiseWithPipe (newPipe.release()); + initialiseWithPipe (std::move (newPipe)); return true; } @@ -116,6 +158,8 @@ void InterprocessConnection::disconnect() thread->stopThread (4000); deletePipeAndSocket(); connectionLostInt(); + + safeAction->setSafe (false); } void InterprocessConnection::deletePipeAndSocket() @@ -176,45 +220,47 @@ int InterprocessConnection::writeData (void* data, int dataSize) } //============================================================================== -void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket) +void InterprocessConnection::initialise() { - jassert (socket == nullptr && pipe == nullptr); - socket.reset (newSocket); - + safeAction->setSafe (true); threadIsRunning = true; connectionMadeInt(); thread->startThread(); } -void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe) +void InterprocessConnection::initialiseWithSocket (std::unique_ptr newSocket) { jassert (socket == nullptr && pipe == nullptr); - pipe.reset (newPipe); + socket = std::move (newSocket); + initialise(); +} - threadIsRunning = true; - connectionMadeInt(); - thread->startThread(); +void InterprocessConnection::initialiseWithPipe (std::unique_ptr newPipe) +{ + jassert (socket == nullptr && pipe == nullptr); + pipe = std::move (newPipe); + initialise(); } //============================================================================== struct ConnectionStateMessage : public MessageManager::MessageBase { - ConnectionStateMessage (InterprocessConnection* ipc, bool connected) noexcept - : owner (ipc), connectionMade (connected) + ConnectionStateMessage (std::shared_ptr ipc, bool connected) noexcept + : safeAction (ipc), connectionMade (connected) {} void messageCallback() override { - if (auto* ipc = owner.get()) + safeAction->ifSafe ([this] (InterprocessConnection& owner) { if (connectionMade) - ipc->connectionMade(); + owner.connectionMade(); else - ipc->connectionLost(); - } + owner.connectionLost(); + }); } - WeakReference owner; + std::shared_ptr safeAction; bool connectionMade; JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage) @@ -227,7 +273,7 @@ void InterprocessConnection::connectionMadeInt() callbackConnectionState = true; if (useMessageThread) - (new ConnectionStateMessage (this, true))->post(); + (new ConnectionStateMessage (safeAction, true))->post(); else connectionMade(); } @@ -240,7 +286,7 @@ void InterprocessConnection::connectionLostInt() callbackConnectionState = false; if (useMessageThread) - (new ConnectionStateMessage (this, false))->post(); + (new ConnectionStateMessage (safeAction, false))->post(); else connectionLost(); } @@ -248,17 +294,19 @@ void InterprocessConnection::connectionLostInt() struct DataDeliveryMessage : public Message { - DataDeliveryMessage (InterprocessConnection* ipc, const MemoryBlock& d) - : owner (ipc), data (d) + DataDeliveryMessage (std::shared_ptr ipc, const MemoryBlock& d) + : safeAction (ipc), data (d) {} void messageCallback() override { - if (auto* ipc = owner.get()) - ipc->messageReceived (data); + safeAction->ifSafe ([this] (InterprocessConnection& owner) + { + owner.messageReceived (data); + }); } - WeakReference owner; + std::shared_ptr safeAction; MemoryBlock data; }; @@ -267,7 +315,7 @@ void InterprocessConnection::deliverDataInt (const MemoryBlock& data) jassert (callbackConnectionState); if (useMessageThread) - (new DataDeliveryMessage (this, data))->post(); + (new DataDeliveryMessage (safeAction, data))->post(); else messageReceived (data); } diff --git a/modules/juce_events/interprocess/juce_InterprocessConnection.h b/modules/juce_events/interprocess/juce_InterprocessConnection.h index 464be59479..d9fd82b37b 100644 --- a/modules/juce_events/interprocess/juce_InterprocessConnection.h +++ b/modules/juce_events/interprocess/juce_InterprocessConnection.h @@ -43,6 +43,9 @@ class MemoryBlock; To act as a socket server and create connections for one or more client, see the InterprocessConnectionServer class. + IMPORTANT NOTE: Your derived Connection class *must* call `disconnect` in its destructor + in order to cancel any pending messages before the class is destroyed. + @see InterprocessConnectionServer, Socket, NamedPipe @tags{Events} @@ -117,7 +120,11 @@ public: */ bool createPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist = false); - /** Disconnects and closes any currently-open sockets or pipes. */ + /** Disconnects and closes any currently-open sockets or pipes. + + Derived classes *must* call this in their destructors in order to avoid undefined + behaviour. + */ void disconnect(); /** True if a socket or pipe is currently active. */ @@ -187,8 +194,9 @@ private: int pipeReceiveMessageTimeout = -1; friend class InterprocessConnectionServer; - void initialiseWithSocket (StreamingSocket*); - void initialiseWithPipe (NamedPipe*); + void initialise(); + void initialiseWithSocket (std::unique_ptr); + void initialiseWithPipe (std::unique_ptr); void deletePipeAndSocket(); void connectionMadeInt(); void connectionLostInt(); @@ -200,10 +208,12 @@ private: std::unique_ptr thread; std::atomic threadIsRunning { false }; + class SafeAction; + std::shared_ptr safeAction; + void runThread(); int writeData (void*, int); - JUCE_DECLARE_WEAK_REFERENCEABLE (InterprocessConnection) JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (InterprocessConnection) }; diff --git a/modules/juce_events/interprocess/juce_InterprocessConnectionServer.cpp b/modules/juce_events/interprocess/juce_InterprocessConnectionServer.cpp index 6905529ff4..cf79303ac2 100644 --- a/modules/juce_events/interprocess/juce_InterprocessConnectionServer.cpp +++ b/modules/juce_events/interprocess/juce_InterprocessConnectionServer.cpp @@ -73,7 +73,7 @@ void InterprocessConnectionServer::run() if (clientSocket != nullptr) if (auto* newConnection = createConnectionObject()) - newConnection->initialiseWithSocket (clientSocket.release()); + newConnection->initialiseWithSocket (std::move (clientSocket)); } }