From 41f3b6c48516855128017ce1427b04d7179f59a1 Mon Sep 17 00:00:00 2001 From: jules Date: Sun, 9 Dec 2012 12:52:41 +0000 Subject: [PATCH] Re-implemented NamedPipe, to make the win32 version better match the posix one. --- .../juce_core/native/juce_posix_NamedPipe.cpp | 114 +++---- modules/juce_core/native/juce_win32_Files.cpp | 306 +++++++++--------- modules/juce_core/network/juce_NamedPipe.cpp | 20 ++ modules/juce_core/network/juce_NamedPipe.h | 6 +- .../juce_InterprocessConnection.cpp | 4 +- 5 files changed, 207 insertions(+), 243 deletions(-) diff --git a/modules/juce_core/native/juce_posix_NamedPipe.cpp b/modules/juce_core/native/juce_posix_NamedPipe.cpp index 959a54fcd7..efcad1c278 100644 --- a/modules/juce_core/native/juce_posix_NamedPipe.cpp +++ b/modules/juce_core/native/juce_posix_NamedPipe.cpp @@ -31,7 +31,7 @@ public: pipeOutName (pipePath + "_out"), pipeIn (-1), pipeOut (-1), createdPipe (createPipe), - blocked (false), stopReadOperation (false) + stopReadOperation (false) { signal (SIGPIPE, signalHandler); siginterrupt (SIGPIPE, 1); @@ -49,36 +49,8 @@ public: } } - static int openPipe (const String& name, int flags, const uint32 timeoutEnd) - { - for (;;) - { - const int p = ::open (name.toUTF8(), flags); - - if (p != -1 || hasExpired (timeoutEnd)) - return p; - - Thread::sleep (2); - } - } - - static void waitForInput (const int handle, const int timeoutMsecs) noexcept - { - struct timeval timeout; - timeout.tv_sec = timeoutMsecs / 1000; - timeout.tv_usec = (timeoutMsecs % 1000) * 1000; - - fd_set rset; - FD_ZERO (&rset); - FD_SET (handle, &rset); - - select (handle + 1, &rset, nullptr, 0, &timeout); - } - int read (char* destBuffer, int maxBytesToRead, int timeOutMilliseconds) { - int bytesRead = -1; - blocked = true; const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds); if (pipeIn == -1) @@ -86,13 +58,10 @@ public: pipeIn = openPipe (createdPipe ? pipeInName : pipeOutName, O_RDWR | O_NONBLOCK, timeoutEnd); if (pipeIn == -1) - { - blocked = false; return -1; - } } - bytesRead = 0; + int bytesRead = 0; while (bytesRead < maxBytesToRead) { @@ -102,10 +71,7 @@ public: if (numRead <= 0) { if (errno != EWOULDBLOCK || stopReadOperation || hasExpired (timeoutEnd)) - { - bytesRead = -1; - break; - } + return -1; const int maxWaitingTime = 30; waitForInput (pipeIn, timeoutEnd == 0 ? maxWaitingTime @@ -118,13 +84,11 @@ public: destBuffer += numRead; } - blocked = false; return bytesRead; } int write (const char* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) { - int bytesWritten = -1; const uint32 timeoutEnd = getTimeoutEnd (timeOutMilliseconds); if (pipeOut == -1) @@ -135,7 +99,7 @@ public: return -1; } - bytesWritten = 0; + int bytesWritten = 0; while (bytesWritten < numBytesToWrite && ! hasExpired (timeoutEnd)) { @@ -143,10 +107,7 @@ public: const int numWritten = (int) ::write (pipeOut, sourceBuffer, (size_t) bytesThisTime); if (numWritten <= 0) - { - bytesWritten = -1; - break; - } + return -1; bytesWritten += numWritten; sourceBuffer += numWritten; @@ -165,7 +126,7 @@ public: int pipeIn, pipeOut; const bool createdPipe; - bool volatile blocked, stopReadOperation; + bool stopReadOperation; private: static void signalHandler (int) {} @@ -180,47 +141,51 @@ private: return timeoutEnd != 0 && Time::getMillisecondCounter() >= timeoutEnd; } - JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl) -}; + static int openPipe (const String& name, int flags, const uint32 timeoutEnd) + { + for (;;) + { + const int p = ::open (name.toUTF8(), flags); -NamedPipe::NamedPipe() -{ -} + if (p != -1 || hasExpired (timeoutEnd)) + return p; -NamedPipe::~NamedPipe() -{ - close(); -} + Thread::sleep (2); + } + } -void NamedPipe::cancelPendingReads() -{ - while (pimpl != nullptr && pimpl->blocked) + static void waitForInput (const int handle, const int timeoutMsecs) noexcept { - pimpl->stopReadOperation = true; + struct timeval timeout; + timeout.tv_sec = timeoutMsecs / 1000; + timeout.tv_usec = (timeoutMsecs % 1000) * 1000; - char buffer[1] = { 0 }; - int bytesWritten = (int) ::write (pimpl->pipeIn, buffer, 1); - (void) bytesWritten; + fd_set rset; + FD_ZERO (&rset); + FD_SET (handle, &rset); - int timeout = 2000; - while (pimpl->blocked && --timeout >= 0) - Thread::sleep (2); + select (handle + 1, &rset, nullptr, 0, &timeout); } - if (pimpl != nullptr) - pimpl->stopReadOperation = false; -} + JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl) +}; void NamedPipe::close() { - cancelPendingReads(); - ScopedPointer deleter (pimpl); // (clears the pimpl member variable before deleting it) + if (pimpl != nullptr) + { + pimpl->stopReadOperation = true; + + char buffer[1] = { 0 }; + ::write (pimpl->pipeIn, buffer, 1); + + ScopedWriteLock sl (lock); + pimpl = nullptr; + } } bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) { - close(); - #if JUCE_IOS pimpl = new Pimpl (File::getSpecialLocation (File::tempDirectory) .getChildFile (File::createLegalFileName (pipeName)).getFullPathName(), createPipe); @@ -239,15 +204,12 @@ bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) int NamedPipe::read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds) { + ScopedReadLock sl (lock); return pimpl != nullptr ? pimpl->read (static_cast (destBuffer), maxBytesToRead, timeOutMilliseconds) : -1; } int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) { + ScopedReadLock sl (lock); return pimpl != nullptr ? pimpl->write (static_cast (sourceBuffer), numBytesToWrite, timeOutMilliseconds) : -1; } - -bool NamedPipe::isOpen() const -{ - return pimpl != nullptr; -} diff --git a/modules/juce_core/native/juce_win32_Files.cpp b/modules/juce_core/native/juce_win32_Files.cpp index 588a8bdd2c..d0f86dc085 100644 --- a/modules/juce_core/native/juce_win32_Files.cpp +++ b/modules/juce_core/native/juce_win32_Files.cpp @@ -731,18 +731,16 @@ void File::revealToUser() const class NamedPipe::Pimpl { public: - Pimpl (const String& file, const bool createPipe) - : pipeH (INVALID_HANDLE_VALUE), + Pimpl (const String& pipeName, const bool createPipe) + : filename ("\\\\.\\pipe\\" + File::createLegalFileName (pipeName)), + pipeH (INVALID_HANDLE_VALUE), cancelEvent (CreateEvent (0, FALSE, FALSE, 0)), - connected (false), - ownsPipe (createPipe) + connected (false), ownsPipe (createPipe), shouldStop (false) { - pipeH = createPipe ? CreateNamedPipe (file.toWideCharPointer(), - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0, - PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, 0) - : CreateFile (file.toWideCharPointer(), - GENERIC_READ | GENERIC_WRITE, 0, 0, - OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0); + if (createPipe) + pipeH = CreateNamedPipe (filename.toWideCharPointer(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, 0, + PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, 0); } ~Pimpl() @@ -758,32 +756,47 @@ public: bool connect (const int timeOutMs) { if (! ownsPipe) - return true; - - if (! connected) { - OVERLAPPED over = { 0 }; - over.hEvent = CreateEvent (0, TRUE, FALSE, 0); + if (pipeH != INVALID_HANDLE_VALUE) + return true; - if (ConnectNamedPipe (pipeH, &over) == 0) - { - const DWORD err = GetLastError(); + const Time timeOutEnd (Time::getCurrentTime() + RelativeTime::milliseconds (timeOutMs)); - if (err == ERROR_IO_PENDING || err == ERROR_PIPE_LISTENING) + for (;;) + { { - HANDLE handles[] = { over.hEvent, cancelEvent }; + const ScopedLock sl (createFileLock); - if (WaitForMultipleObjects (2, handles, FALSE, - timeOutMs >= 0 ? timeOutMs : INFINITE) == WAIT_OBJECT_0) - connected = true; + if (pipeH == INVALID_HANDLE_VALUE) + pipeH = CreateFile (filename.toWideCharPointer(), + GENERIC_READ | GENERIC_WRITE, 0, 0, + OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0); } - else if (err == ERROR_PIPE_CONNECTED) + + if (pipeH != INVALID_HANDLE_VALUE) + return true; + + if (shouldStop || (timeOutMs >= 0 && Time::getCurrentTime() > timeOutEnd)) + return false; + + Thread::sleep (1); + } + } + + if (! connected) + { + OverlappedEvent over; + + if (ConnectNamedPipe (pipeH, &over.over) == 0) + { + switch (GetLastError()) { - connected = true; + case ERROR_PIPE_CONNECTED: connected = true; break; + case ERROR_IO_PENDING: + case ERROR_PIPE_LISTENING: connected = waitForIO (over, timeOutMs); break; + default: break; } } - - CloseHandle (over.hEvent); } return connected; @@ -791,179 +804,150 @@ public: void disconnectPipe() { - if (connected) + if (ownsPipe && connected) { DisconnectNamedPipe (pipeH); connected = false; } } - HANDLE pipeH, cancelEvent; - bool connected, ownsPipe; -}; - -NamedPipe::NamedPipe() -{ -} + int read (void* destBuffer, const int maxBytesToRead, const int timeOutMilliseconds) + { + while (connect (timeOutMilliseconds)) + { + if (maxBytesToRead <= 0) + return 0; -NamedPipe::~NamedPipe() -{ - close(); -} + OverlappedEvent over; + unsigned long numRead; -bool NamedPipe::isOpen() const -{ - return pimpl != nullptr; -} + if (ReadFile (pipeH, destBuffer, (DWORD) maxBytesToRead, &numRead, &over.over)) + return (int) numRead; -void NamedPipe::cancelPendingReads() -{ - if (pimpl != nullptr) - SetEvent (pimpl->cancelEvent); -} + const DWORD lastError = GetLastError(); -void NamedPipe::close() -{ - cancelPendingReads(); + if (lastError == ERROR_IO_PENDING) + { + if (! waitForIO (over, timeOutMilliseconds)) + return -1; - const ScopedLock sl (lock); - ScopedPointer deleter (pimpl); // (clears the pimpl member variable before deleting it) -} + if (GetOverlappedResult (pipeH, &over.over, &numRead, FALSE)) + return (int) numRead; + } -bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) -{ - close(); + if (ownsPipe && (GetLastError() == ERROR_BROKEN_PIPE || GetLastError() == ERROR_PIPE_NOT_CONNECTED)) + disconnectPipe(); + else + break; + } - pimpl = new Pimpl ("\\\\.\\pipe\\" + File::createLegalFileName (pipeName), createPipe); + return -1; + } - if (pimpl->pipeH != INVALID_HANDLE_VALUE) - return true; + int write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) + { + if (connect (timeOutMilliseconds)) + { + if (numBytesToWrite <= 0) + return 0; - pimpl = nullptr; - return false; -} + OverlappedEvent over; + unsigned long numWritten; -int NamedPipe::read (void* destBuffer, const int maxBytesToRead, const int timeOutMilliseconds) -{ - const ScopedLock sl (lock); - int bytesRead = -1; - bool waitAgain = true; + if (WriteFile (pipeH, sourceBuffer, (DWORD) numBytesToWrite, &numWritten, &over.over)) + return (int) numWritten; - while (waitAgain && pimpl != nullptr) - { - waitAgain = false; + if (GetLastError() == ERROR_IO_PENDING) + { + if (! waitForIO (over, timeOutMilliseconds)) + return -1; - if (! pimpl->connect (timeOutMilliseconds)) - break; + if (GetOverlappedResult (pipeH, &over.over, &numWritten, FALSE)) + return (int) numWritten; - if (maxBytesToRead <= 0) - return 0; + if (GetLastError() == ERROR_BROKEN_PIPE && ownsPipe) + disconnectPipe(); + } + } - OVERLAPPED over = { 0 }; - over.hEvent = CreateEvent (0, TRUE, FALSE, 0); + return -1; + } - unsigned long numRead; + const String filename; + HANDLE pipeH, cancelEvent; + bool connected, ownsPipe, shouldStop; + CriticalSection createFileLock; - if (ReadFile (pimpl->pipeH, destBuffer, (DWORD) maxBytesToRead, &numRead, &over)) +private: + struct OverlappedEvent + { + OverlappedEvent() { - bytesRead = (int) numRead; + zerostruct (over); + over.hEvent = CreateEvent (0, TRUE, FALSE, 0); } - else + + ~OverlappedEvent() { - const DWORD lastError = GetLastError(); + CloseHandle (over.hEvent); + } - if (lastError == ERROR_IO_PENDING) - { - HANDLE handles[] = { over.hEvent, pimpl->cancelEvent }; - DWORD waitResult = WaitForMultipleObjects (2, handles, FALSE, - timeOutMilliseconds >= 0 ? timeOutMilliseconds - : INFINITE); - if (waitResult != WAIT_OBJECT_0) - { - // if the operation timed out, let's cancel it... - CancelIo (pimpl->pipeH); - WaitForSingleObject (over.hEvent, INFINITE); // makes sure cancel is complete - } + OVERLAPPED over; + }; - if (GetOverlappedResult (pimpl->pipeH, &over, &numRead, FALSE)) - { - bytesRead = (int) numRead; - } - else if ((GetLastError() == ERROR_BROKEN_PIPE || GetLastError() == ERROR_PIPE_NOT_CONNECTED) && pimpl->ownsPipe) - { - pimpl->disconnectPipe(); - waitAgain = true; - } - } - else if (pimpl->ownsPipe) - { - waitAgain = true; + bool waitForIO (OverlappedEvent& over, int timeOutMilliseconds) + { + if (shouldStop) + return false; - if (lastError == ERROR_BROKEN_PIPE || lastError == ERROR_PIPE_NOT_CONNECTED) - pimpl->disconnectPipe(); - else - Sleep (5); - } - } + HANDLE handles[] = { over.over.hEvent, cancelEvent }; + DWORD waitResult = WaitForMultipleObjects (2, handles, FALSE, + timeOutMilliseconds >= 0 ? timeOutMilliseconds + : INFINITE); + + if (waitResult == WAIT_OBJECT_0) + return true; - CloseHandle (over.hEvent); + CancelIo (pipeH); + return false; } - return bytesRead; -} + JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Pimpl); +}; -int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) +void NamedPipe::close() { - int bytesWritten = -1; - if (pimpl != nullptr) { - if (! pimpl->connect (timeOutMilliseconds)) - { - pimpl = nullptr; - } - else - { - if (numBytesToWrite <= 0) - return 0; - - OVERLAPPED over = { 0 }; - over.hEvent = CreateEvent (0, TRUE, FALSE, 0); - - unsigned long numWritten; + pimpl->shouldStop = true; + SetEvent (pimpl->cancelEvent); - if (WriteFile (pimpl->pipeH, sourceBuffer, (DWORD) numBytesToWrite, &numWritten, &over)) - { - bytesWritten = (int) numWritten; - } - else if (GetLastError() == ERROR_IO_PENDING) - { - HANDLE handles[] = { over.hEvent, pimpl->cancelEvent }; - DWORD waitResult; + ScopedWriteLock sl (lock); + pimpl = nullptr; + } +} - waitResult = WaitForMultipleObjects (2, handles, FALSE, - timeOutMilliseconds >= 0 ? timeOutMilliseconds - : INFINITE); +bool NamedPipe::openInternal (const String& pipeName, const bool createPipe) +{ + pimpl = new Pimpl (pipeName, createPipe); - if (waitResult != WAIT_OBJECT_0) - { - CancelIo (pimpl->pipeH); - WaitForSingleObject (over.hEvent, INFINITE); - } + if (createPipe && pimpl->pipeH == INVALID_HANDLE_VALUE) + { + pimpl = nullptr; + return false; + } - if (GetOverlappedResult (pimpl->pipeH, &over, &numWritten, FALSE)) - { - bytesWritten = (int) numWritten; - } - else if (GetLastError() == ERROR_BROKEN_PIPE && pimpl->ownsPipe) - { - pimpl->disconnectPipe(); - } - } + return true; +} - CloseHandle (over.hEvent); - } - } +int NamedPipe::read (void* destBuffer, int maxBytesToRead, int timeOutMilliseconds) +{ + ScopedReadLock sl (lock); + return pimpl != nullptr ? pimpl->read (destBuffer, maxBytesToRead, timeOutMilliseconds) : -1; +} - return bytesWritten; +int NamedPipe::write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds) +{ + ScopedReadLock sl (lock); + return pimpl != nullptr ? pimpl->write (sourceBuffer, numBytesToWrite, timeOutMilliseconds) : -1; } diff --git a/modules/juce_core/network/juce_NamedPipe.cpp b/modules/juce_core/network/juce_NamedPipe.cpp index da13493f95..902a2df26e 100644 --- a/modules/juce_core/network/juce_NamedPipe.cpp +++ b/modules/juce_core/network/juce_NamedPipe.cpp @@ -23,14 +23,34 @@ ============================================================================== */ +NamedPipe::NamedPipe() +{ +} + +NamedPipe::~NamedPipe() +{ + close(); +} + bool NamedPipe::openExisting (const String& pipeName) { + close(); + + ScopedWriteLock sl (lock); currentPipeName = pipeName; return openInternal (pipeName, false); } +bool NamedPipe::isOpen() const +{ + return pimpl != nullptr; +} + bool NamedPipe::createNewPipe (const String& pipeName) { + close(); + + ScopedWriteLock sl (lock); currentPipeName = pipeName; return openInternal (pipeName, true); } diff --git a/modules/juce_core/network/juce_NamedPipe.h b/modules/juce_core/network/juce_NamedPipe.h index 1651e55ae8..ebb850bda5 100644 --- a/modules/juce_core/network/juce_NamedPipe.h +++ b/modules/juce_core/network/juce_NamedPipe.h @@ -26,6 +26,7 @@ #ifndef __JUCE_NAMEDPIPE_JUCEHEADER__ #define __JUCE_NAMEDPIPE_JUCEHEADER__ +#include "../threads/juce_ReadWriteLock.h" //============================================================================== /** @@ -85,15 +86,12 @@ public: */ int write (const void* sourceBuffer, int numBytesToWrite, int timeOutMilliseconds); - /** If any threads are currently blocked on a read operation, this tells them to abort. */ - void cancelPendingReads(); - private: //============================================================================== class Pimpl; ScopedPointer pimpl; String currentPipeName; - CriticalSection lock; + ReadWriteLock lock; bool openInternal (const String& pipeName, const bool createPipe); diff --git a/modules/juce_events/interprocess/juce_InterprocessConnection.cpp b/modules/juce_events/interprocess/juce_InterprocessConnection.cpp index 3d34fa3c96..9055599bad 100644 --- a/modules/juce_events/interprocess/juce_InterprocessConnection.cpp +++ b/modules/juce_events/interprocess/juce_InterprocessConnection.cpp @@ -191,8 +191,7 @@ struct ConnectionStateMessage : public MessageManager::MessageBase void messageCallback() { - InterprocessConnection* const ipc = owner; - if (ipc != nullptr) + if (InterprocessConnection* const ipc = owner) { if (connectionMade) ipc->connectionMade(); @@ -300,6 +299,7 @@ bool InterprocessConnection::readNextMessageInt() } else if (bytes < 0) { + if (socket != nullptr) { const ScopedLock sl (pipeAndSocketLock); socket = nullptr;