Browse Source

IPC: Fix race condition when destroying connections

It was possible to encounter data races when when requesting connection
callbacks on the message thread, but creating/destroying connection
objects on a background thread.

This change ensures that a message will not be processed if the
destination connection is destroyed before the message is delivered.
tags/2021-05-28
reuk 4 years ago
parent
commit
fb83c45a9d
3 changed files with 100 additions and 42 deletions
  1. +85
    -37
      modules/juce_events/interprocess/juce_InterprocessConnection.cpp
  2. +14
    -4
      modules/juce_events/interprocess/juce_InterprocessConnection.h
  3. +1
    -1
      modules/juce_events/interprocess/juce_InterprocessConnectionServer.cpp

+ 85
- 37
modules/juce_events/interprocess/juce_InterprocessConnection.cpp View File

@@ -32,19 +32,64 @@ struct InterprocessConnection::ConnectionThread : public Thread
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
};
class SafeActionImpl
{
public:
explicit SafeActionImpl (InterprocessConnection& p)
: ref (p) {}
template <typename Fn>
void ifSafe (Fn&& fn)
{
const ScopedLock lock (mutex);
if (safe)
fn (ref);
}
void setSafe (bool s)
{
const ScopedLock lock (mutex);
safe = s;
}
bool isSafe()
{
const ScopedLock lock (mutex);
return safe;
}
private:
CriticalSection mutex;
InterprocessConnection& ref;
bool safe = false;
};
class InterprocessConnection::SafeAction : public SafeActionImpl
{
using SafeActionImpl::SafeActionImpl;
};
//==============================================================================
InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber)
: useMessageThread (callbacksOnMessageThread),
magicMessageHeader (magicMessageHeaderNumber)
magicMessageHeader (magicMessageHeaderNumber),
safeAction (std::make_shared<SafeAction> (*this))
{
thread.reset (new ConnectionThread (*this));
}
InterprocessConnection::~InterprocessConnection()
{
// You *must* call `disconnect` in the destructor of your derived class to ensure
// that any pending messages are not delivered. If the messages were delivered after
// destroying the derived class, we'd end up calling the pure virtual implementations
// of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
// not a good idea!
jassert (! safeAction->isSafe());
callbackConnectionState = false;
disconnect();
masterReference.clear();
thread.reset();
}
@@ -54,18 +99,15 @@ bool InterprocessConnection::connectToSocket (const String& hostName,
{
disconnect();
const ScopedLock sl (pipeAndSocketLock);
socket.reset (new StreamingSocket());
auto s = std::make_unique<StreamingSocket>();
if (socket->connect (hostName, portNumber, timeOutMillisecs))
if (s->connect (hostName, portNumber, timeOutMillisecs))
{
threadIsRunning = true;
connectionMadeInt();
thread->startThread();
const ScopedLock sl (pipeAndSocketLock);
initialiseWithSocket (std::move (s));
return true;
}
socket.reset();
return false;
}
@@ -73,13 +115,13 @@ bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutM
{
disconnect();
std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
auto newPipe = std::make_unique<NamedPipe>();
if (newPipe->openExisting (pipeName))
{
const ScopedLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (newPipe.release());
initialiseWithPipe (std::move (newPipe));
return true;
}
@@ -90,13 +132,13 @@ bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs,
{
disconnect();
std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
auto newPipe = std::make_unique<NamedPipe>();
if (newPipe->createNewPipe (pipeName, mustNotExist))
{
const ScopedLock sl (pipeAndSocketLock);
pipeReceiveMessageTimeout = timeoutMs;
initialiseWithPipe (newPipe.release());
initialiseWithPipe (std::move (newPipe));
return true;
}
@@ -116,6 +158,8 @@ void InterprocessConnection::disconnect()
thread->stopThread (4000);
deletePipeAndSocket();
connectionLostInt();
safeAction->setSafe (false);
}
void InterprocessConnection::deletePipeAndSocket()
@@ -176,45 +220,47 @@ int InterprocessConnection::writeData (void* data, int dataSize)
}
//==============================================================================
void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
void InterprocessConnection::initialise()
{
jassert (socket == nullptr && pipe == nullptr);
socket.reset (newSocket);
safeAction->setSafe (true);
threadIsRunning = true;
connectionMadeInt();
thread->startThread();
}
void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
{
jassert (socket == nullptr && pipe == nullptr);
pipe.reset (newPipe);
socket = std::move (newSocket);
initialise();
}
threadIsRunning = true;
connectionMadeInt();
thread->startThread();
void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
{
jassert (socket == nullptr && pipe == nullptr);
pipe = std::move (newPipe);
initialise();
}
//==============================================================================
struct ConnectionStateMessage : public MessageManager::MessageBase
{
ConnectionStateMessage (InterprocessConnection* ipc, bool connected) noexcept
: owner (ipc), connectionMade (connected)
ConnectionStateMessage (std::shared_ptr<SafeActionImpl> ipc, bool connected) noexcept
: safeAction (ipc), connectionMade (connected)
{}
void messageCallback() override
{
if (auto* ipc = owner.get())
safeAction->ifSafe ([this] (InterprocessConnection& owner)
{
if (connectionMade)
ipc->connectionMade();
owner.connectionMade();
else
ipc->connectionLost();
}
owner.connectionLost();
});
}
WeakReference<InterprocessConnection> owner;
std::shared_ptr<SafeActionImpl> safeAction;
bool connectionMade;
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
@@ -227,7 +273,7 @@ void InterprocessConnection::connectionMadeInt()
callbackConnectionState = true;
if (useMessageThread)
(new ConnectionStateMessage (this, true))->post();
(new ConnectionStateMessage (safeAction, true))->post();
else
connectionMade();
}
@@ -240,7 +286,7 @@ void InterprocessConnection::connectionLostInt()
callbackConnectionState = false;
if (useMessageThread)
(new ConnectionStateMessage (this, false))->post();
(new ConnectionStateMessage (safeAction, false))->post();
else
connectionLost();
}
@@ -248,17 +294,19 @@ void InterprocessConnection::connectionLostInt()
struct DataDeliveryMessage : public Message
{
DataDeliveryMessage (InterprocessConnection* ipc, const MemoryBlock& d)
: owner (ipc), data (d)
DataDeliveryMessage (std::shared_ptr<SafeActionImpl> ipc, const MemoryBlock& d)
: safeAction (ipc), data (d)
{}
void messageCallback() override
{
if (auto* ipc = owner.get())
ipc->messageReceived (data);
safeAction->ifSafe ([this] (InterprocessConnection& owner)
{
owner.messageReceived (data);
});
}
WeakReference<InterprocessConnection> owner;
std::shared_ptr<SafeActionImpl> safeAction;
MemoryBlock data;
};
@@ -267,7 +315,7 @@ void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
jassert (callbackConnectionState);
if (useMessageThread)
(new DataDeliveryMessage (this, data))->post();
(new DataDeliveryMessage (safeAction, data))->post();
else
messageReceived (data);
}


+ 14
- 4
modules/juce_events/interprocess/juce_InterprocessConnection.h View File

@@ -43,6 +43,9 @@ class MemoryBlock;
To act as a socket server and create connections for one or more client, see the
InterprocessConnectionServer class.
IMPORTANT NOTE: Your derived Connection class *must* call `disconnect` in its destructor
in order to cancel any pending messages before the class is destroyed.
@see InterprocessConnectionServer, Socket, NamedPipe
@tags{Events}
@@ -117,7 +120,11 @@ public:
*/
bool createPipe (const String& pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist = false);
/** Disconnects and closes any currently-open sockets or pipes. */
/** Disconnects and closes any currently-open sockets or pipes.
Derived classes *must* call this in their destructors in order to avoid undefined
behaviour.
*/
void disconnect();
/** True if a socket or pipe is currently active. */
@@ -187,8 +194,9 @@ private:
int pipeReceiveMessageTimeout = -1;
friend class InterprocessConnectionServer;
void initialiseWithSocket (StreamingSocket*);
void initialiseWithPipe (NamedPipe*);
void initialise();
void initialiseWithSocket (std::unique_ptr<StreamingSocket>);
void initialiseWithPipe (std::unique_ptr<NamedPipe>);
void deletePipeAndSocket();
void connectionMadeInt();
void connectionLostInt();
@@ -200,10 +208,12 @@ private:
std::unique_ptr<ConnectionThread> thread;
std::atomic<bool> threadIsRunning { false };
class SafeAction;
std::shared_ptr<SafeAction> safeAction;
void runThread();
int writeData (void*, int);
JUCE_DECLARE_WEAK_REFERENCEABLE (InterprocessConnection)
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (InterprocessConnection)
};


+ 1
- 1
modules/juce_events/interprocess/juce_InterprocessConnectionServer.cpp View File

@@ -73,7 +73,7 @@ void InterprocessConnectionServer::run()
if (clientSocket != nullptr)
if (auto* newConnection = createConnectionObject())
newConnection->initialiseWithSocket (clientSocket.release());
newConnection->initialiseWithSocket (std::move (clientSocket));
}
}


Loading…
Cancel
Save