Browse Source

Fixed InterprocessConnection's use of the supplied timeout when reading from pipes

tags/2021-05-28
jules 7 years ago
parent
commit
da6ba0d783
3 changed files with 47 additions and 41 deletions
  1. +19
    -23
      modules/juce_core/native/juce_posix_NamedPipe.cpp
  2. +26
    -17
      modules/juce_events/interprocess/juce_InterprocessConnection.cpp
  3. +2
    -1
      modules/juce_events/interprocess/juce_InterprocessConnection.h

+ 19
- 23
modules/juce_core/native/juce_posix_NamedPipe.cpp View File

@@ -29,11 +29,7 @@ public:
Pimpl (const String& pipePath, bool createPipe)
: pipeInName (pipePath + "_in"),
pipeOutName (pipePath + "_out"),
pipeIn (-1), pipeOut (-1),
createdFifoIn (false),
createdFifoOut (false),
createdPipe (createPipe),
stopReadOperation (false)
createdPipe (createPipe)
{
signal (SIGPIPE, signalHandler);
juce_siginterrupt (SIGPIPE, 1);
@@ -53,7 +49,7 @@ public:
int read (char* destBuffer, int maxBytesToRead, int timeOutMilliseconds)
{
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
auto timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeIn == -1)
{
@@ -67,12 +63,12 @@ public:
while (bytesRead < maxBytesToRead)
{
const int bytesThisTime = maxBytesToRead - bytesRead;
const int numRead = (int) ::read (pipeIn, destBuffer, (size_t) bytesThisTime);
auto bytesThisTime = maxBytesToRead - bytesRead;
auto numRead = (int) ::read (pipeIn, destBuffer, (size_t) bytesThisTime);
if (numRead <= 0)
{
if (errno != EWOULDBLOCK || stopReadOperation || hasExpired (timeoutEnd))
if (errno != EWOULDBLOCK || stopReadOperation.load() || hasExpired (timeoutEnd))
return -1;
const int maxWaitingTime = 30;
@@ -91,7 +87,7 @@ public:
int write (const char* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds)
{
const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
auto timeoutEnd = getTimeoutEnd (timeOutMilliseconds);
if (pipeOut == -1)
{
@@ -105,8 +101,8 @@ public:
while (bytesWritten < numBytesToWrite && ! hasExpired (timeoutEnd))
{
const int bytesThisTime = numBytesToWrite - bytesWritten;
const int numWritten = (int) ::write (pipeOut, sourceBuffer, (size_t) bytesThisTime);
auto bytesThisTime = numBytesToWrite - bytesWritten;
auto numWritten = (int) ::write (pipeOut, sourceBuffer, (size_t) bytesThisTime);
if (numWritten <= 0)
return -1;
@@ -132,39 +128,39 @@ public:
}
const String pipeInName, pipeOutName;
int pipeIn, pipeOut;
bool createdFifoIn, createdFifoOut;
int pipeIn = -1, pipeOut = -1;
bool createdFifoIn = false, createdFifoOut = false;
const bool createdPipe;
bool stopReadOperation;
std::atomic<bool> stopReadOperation { false };
private:
static void signalHandler (int) {}
static uint32 getTimeoutEnd (const int timeOutMilliseconds)
static uint32 getTimeoutEnd (int timeOutMilliseconds)
{
return timeOutMilliseconds >= 0 ? Time::getMillisecondCounter() + (uint32) timeOutMilliseconds : 0;
}
static bool hasExpired (const uint32 timeoutEnd)
static bool hasExpired (uint32 timeoutEnd)
{
return timeoutEnd != 0 && Time::getMillisecondCounter() >= timeoutEnd;
}
int openPipe (const String& name, int flags, const uint32 timeoutEnd)
int openPipe (const String& name, int flags, uint32 timeoutEnd)
{
for (;;)
{
const int p = ::open (name.toUTF8(), flags);
auto p = ::open (name.toUTF8(), flags);
if (p != -1 || hasExpired (timeoutEnd) || stopReadOperation)
if (p != -1 || hasExpired (timeoutEnd) || stopReadOperation.load())
return p;
Thread::sleep (2);
}
}
static void waitForInput (const int handle, const int timeoutMsecs) noexcept
static void waitForInput (int handle, int timeoutMsecs) noexcept
{
struct timeval timeout;
timeout.tv_sec = timeoutMsecs / 1000;
@@ -195,13 +191,13 @@ void NamedPipe::close()
}
}
bool NamedPipe::openInternal (const String& pipeName, const bool createPipe, bool mustNotExist)
bool NamedPipe::openInternal (const String& pipeName, bool createPipe, bool mustNotExist)
{
#if JUCE_IOS
pimpl.reset (new Pimpl (File::getSpecialLocation (File::tempDirectory)
.getChildFile (File::createLegalFileName (pipeName)).getFullPathName(), createPipe));
#else
String file (pipeName);
auto file = pipeName;
if (! File::isAbsolutePath (file))
file = "/tmp/" + File::createLegalFileName (file);


+ 26
- 17
modules/juce_events/interprocess/juce_InterprocessConnection.cpp View File

@@ -50,8 +50,7 @@ InterprocessConnection::~InterprocessConnection()
//==============================================================================
bool InterprocessConnection::connectToSocket (const String& hostName,
const int portNumber,
const int timeOutMillisecs)
int portNumber, int timeOutMillisecs)
{
disconnect();
@@ -69,7 +68,7 @@ bool InterprocessConnection::connectToSocket (const String& hostName,
return false;
}
bool InterprocessConnection::connectToPipe (const String& pipeName, const int timeoutMs)
bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutMs)
{
disconnect();
@@ -86,7 +85,7 @@ bool InterprocessConnection::connectToPipe (const String& pipeName, const int ti
return false;
}
bool InterprocessConnection::createPipe (const String& pipeName, const int timeoutMs, bool mustNotExist)
bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs, bool mustNotExist)
{
disconnect();
@@ -269,16 +268,27 @@ void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
}
//==============================================================================
bool InterprocessConnection::readNextMessageInt()
int InterprocessConnection::readData (void* data, int num)
{
if (socket != nullptr)
return socket->read (data, num, true);
if (pipe != nullptr)
return pipe->read (data, num, pipeReceiveMessageTimeout);
jassertfalse;
return -1;
}
bool InterprocessConnection::readNextMessage()
{
uint32 messageHeader[2];
const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
: pipe ->read (messageHeader, sizeof (messageHeader), -1);
auto bytes = readData (messageHeader, sizeof (messageHeader));
if (bytes == sizeof (messageHeader)
&& ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
{
int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
auto bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
if (bytesInMessage > 0)
{
@@ -290,11 +300,8 @@ bool InterprocessConnection::readNextMessageInt()
if (thread->threadShouldExit())
return false;
const int numThisTime = jmin (bytesInMessage, 65536);
void* const data = addBytesToPointer (messageData.getData(), bytesRead);
const int bytesIn = socket != nullptr ? socket->read (data, numThisTime, true)
: pipe ->read (data, numThisTime, -1);
auto numThisTime = jmin (bytesInMessage, 65536);
auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
if (bytesIn <= 0)
break;
@@ -306,17 +313,19 @@ bool InterprocessConnection::readNextMessageInt()
if (bytesRead >= 0)
deliverDataInt (messageData);
}
return true;
}
else if (bytes < 0)
if (bytes < 0)
{
if (socket != nullptr)
deletePipeAndSocket();
connectionLostInt();
return false;
}
return true;
return false;
}
void InterprocessConnection::runThread()
@@ -354,7 +363,7 @@ void InterprocessConnection::runThread()
break;
}
if (thread->threadShouldExit() || ! readNextMessageInt())
if (thread->threadShouldExit() || ! readNextMessage())
break;
}
}


+ 2
- 1
modules/juce_events/interprocess/juce_InterprocessConnection.h View File

@@ -193,7 +193,8 @@ private:
void connectionMadeInt();
void connectionLostInt();
void deliverDataInt (const MemoryBlock&);
bool readNextMessageInt();
bool readNextMessage();
int readData (void*, int);
struct ConnectionThread;
friend struct ConnectionThread;


Loading…
Cancel
Save