|  | /*
  ==============================================================================
   This file is part of the JUCE library - "Jules' Utility Class Extensions"
   Copyright 2004-11 by Raw Material Software Ltd.
  ------------------------------------------------------------------------------
   JUCE can be redistributed and/or modified under the terms of the GNU General
   Public License (Version 2), as published by the Free Software Foundation.
   A copy of the license is included in the JUCE distribution, or can be found
   online at www.gnu.org/licenses.
   JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
   A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
  ------------------------------------------------------------------------------
   To release a closed-source product which uses JUCE, commercial licenses are
   available: visit www.rawmaterialsoftware.com/juce for more information.
  ==============================================================================
*/
BEGIN_JUCE_NAMESPACE
//==============================================================================
InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
                                                const uint32 magicMessageHeaderNumber)
    : Thread ("Juce IPC connection"),
      callbackConnectionState (false),
      useMessageThread (callbacksOnMessageThread),
      magicMessageHeader (magicMessageHeaderNumber),
      pipeReceiveMessageTimeout (-1)
{
}
InterprocessConnection::~InterprocessConnection()
{
    callbackConnectionState = false;
    disconnect();
}
//==============================================================================
bool InterprocessConnection::connectToSocket (const String& hostName,
                                              const int portNumber,
                                              const int timeOutMillisecs)
{
    disconnect();
    const ScopedLock sl (pipeAndSocketLock);
    socket = new StreamingSocket();
    if (socket->connect (hostName, portNumber, timeOutMillisecs))
    {
        connectionMadeInt();
        startThread();
        return true;
    }
    else
    {
        socket = nullptr;
        return false;
    }
}
bool InterprocessConnection::connectToPipe (const String& pipeName,
                                            const int pipeReceiveMessageTimeoutMs)
{
    disconnect();
    ScopedPointer <NamedPipe> newPipe (new NamedPipe());
    if (newPipe->openExisting (pipeName))
    {
        const ScopedLock sl (pipeAndSocketLock);
        pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
        initialiseWithPipe (newPipe.release());
        return true;
    }
    return false;
}
bool InterprocessConnection::createPipe (const String& pipeName,
                                         const int pipeReceiveMessageTimeoutMs)
{
    disconnect();
    ScopedPointer <NamedPipe> newPipe (new NamedPipe());
    if (newPipe->createNewPipe (pipeName))
    {
        const ScopedLock sl (pipeAndSocketLock);
        pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
        initialiseWithPipe (newPipe.release());
        return true;
    }
    return false;
}
void InterprocessConnection::disconnect()
{
    if (socket != nullptr)
        socket->close();
    if (pipe != nullptr)
    {
        pipe->cancelPendingReads();
        pipe->close();
    }
    stopThread (4000);
    {
        const ScopedLock sl (pipeAndSocketLock);
        socket = nullptr;
        pipe = nullptr;
    }
    connectionLostInt();
}
bool InterprocessConnection::isConnected() const
{
    const ScopedLock sl (pipeAndSocketLock);
    return ((socket != nullptr && socket->isConnected())
              || (pipe != nullptr && pipe->isOpen()))
            && isThreadRunning();
}
String InterprocessConnection::getConnectedHostName() const
{
    if (pipe != nullptr)
    {
        return "localhost";
    }
    else if (socket != nullptr)
    {
        if (! socket->isLocal())
            return socket->getHostName();
        return "localhost";
    }
    return String::empty;
}
//==============================================================================
bool InterprocessConnection::sendMessage (const MemoryBlock& message)
{
    uint32 messageHeader[2];
    messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
    messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
    MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
    messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
    messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
    int bytesWritten = 0;
    const ScopedLock sl (pipeAndSocketLock);
    if (socket != nullptr)
        bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
    else if (pipe != nullptr)
        bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
    return bytesWritten == (int) messageData.getSize();
}
//==============================================================================
void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
{
    jassert (socket == 0);
    socket = socket_;
    connectionMadeInt();
    startThread();
}
void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
{
    jassert (pipe == 0);
    pipe = pipe_;
    connectionMadeInt();
    startThread();
}
//==============================================================================
struct ConnectionStateMessage  : public Message
{
    ConnectionStateMessage (bool connectionMade_) noexcept
        : connectionMade (connectionMade_)
    {}
    bool connectionMade;
};
struct DataDeliveryMessage  : public Message
{
    DataDeliveryMessage (const MemoryBlock& data_)
        : data (data_)
    {}
    MemoryBlock data;
};
void InterprocessConnection::handleMessage (const Message& message)
{
    const ConnectionStateMessage* m = dynamic_cast <const ConnectionStateMessage*> (&message);
    if (m != nullptr)
    {
        if (m->connectionMade)
            connectionMade();
        else
            connectionLost();
    }
    else
    {
        const DataDeliveryMessage* d = dynamic_cast <const DataDeliveryMessage*> (&message);
        if (d != nullptr)
            messageReceived (d->data);
    }
}
void InterprocessConnection::connectionMadeInt()
{
    if (! callbackConnectionState)
    {
        callbackConnectionState = true;
        if (useMessageThread)
            postMessage (new ConnectionStateMessage (true));
        else
            connectionMade();
    }
}
void InterprocessConnection::connectionLostInt()
{
    if (callbackConnectionState)
    {
        callbackConnectionState = false;
        if (useMessageThread)
            postMessage (new ConnectionStateMessage (false));
        else
            connectionLost();
    }
}
void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
{
    jassert (callbackConnectionState);
    if (useMessageThread)
        postMessage (new DataDeliveryMessage (data));
    else
        messageReceived (data);
}
//==============================================================================
bool InterprocessConnection::readNextMessageInt()
{
    const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
    uint32 messageHeader[2];
    const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
                                        : pipe  ->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
    if (bytes == sizeof (messageHeader)
         && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
    {
        int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
        if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
        {
            MemoryBlock messageData ((size_t) bytesInMessage, true);
            int bytesRead = 0;
            while (bytesInMessage > 0)
            {
                if (threadShouldExit())
                    return false;
                const int numThisTime = jmin (bytesInMessage, 65536);
                const int bytesIn = socket != nullptr ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
                                                      : pipe  ->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
                if (bytesIn <= 0)
                    break;
                bytesRead += bytesIn;
                bytesInMessage -= bytesIn;
            }
            if (bytesRead >= 0)
                deliverDataInt (messageData);
        }
    }
    else if (bytes < 0)
    {
        {
            const ScopedLock sl (pipeAndSocketLock);
            socket = nullptr;
        }
        connectionLostInt();
        return false;
    }
    return true;
}
void InterprocessConnection::run()
{
    while (! threadShouldExit())
    {
        if (socket != nullptr)
        {
            const int ready = socket->waitUntilReady (true, 0);
            if (ready < 0)
            {
                {
                    const ScopedLock sl (pipeAndSocketLock);
                    socket = nullptr;
                }
                connectionLostInt();
                break;
            }
            else if (ready > 0)
            {
                if (! readNextMessageInt())
                    break;
            }
            else
            {
                Thread::sleep (2);
            }
        }
        else if (pipe != nullptr)
        {
            if (! pipe->isOpen())
            {
                {
                    const ScopedLock sl (pipeAndSocketLock);
                    pipe = nullptr;
                }
                connectionLostInt();
                break;
            }
            else
            {
                if (! readNextMessageInt())
                    break;
            }
        }
        else
        {
            break;
        }
    }
}
END_JUCE_NAMESPACE
 |