The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

376 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. BEGIN_JUCE_NAMESPACE
  19. //==============================================================================
  20. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  21. const uint32 magicMessageHeaderNumber)
  22. : Thread ("Juce IPC connection"),
  23. callbackConnectionState (false),
  24. useMessageThread (callbacksOnMessageThread),
  25. magicMessageHeader (magicMessageHeaderNumber),
  26. pipeReceiveMessageTimeout (-1)
  27. {
  28. }
  29. InterprocessConnection::~InterprocessConnection()
  30. {
  31. callbackConnectionState = false;
  32. disconnect();
  33. }
  34. //==============================================================================
  35. bool InterprocessConnection::connectToSocket (const String& hostName,
  36. const int portNumber,
  37. const int timeOutMillisecs)
  38. {
  39. disconnect();
  40. const ScopedLock sl (pipeAndSocketLock);
  41. socket = new StreamingSocket();
  42. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  43. {
  44. connectionMadeInt();
  45. startThread();
  46. return true;
  47. }
  48. else
  49. {
  50. socket = nullptr;
  51. return false;
  52. }
  53. }
  54. bool InterprocessConnection::connectToPipe (const String& pipeName,
  55. const int pipeReceiveMessageTimeoutMs)
  56. {
  57. disconnect();
  58. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  59. if (newPipe->openExisting (pipeName))
  60. {
  61. const ScopedLock sl (pipeAndSocketLock);
  62. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  63. initialiseWithPipe (newPipe.release());
  64. return true;
  65. }
  66. return false;
  67. }
  68. bool InterprocessConnection::createPipe (const String& pipeName,
  69. const int pipeReceiveMessageTimeoutMs)
  70. {
  71. disconnect();
  72. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  73. if (newPipe->createNewPipe (pipeName))
  74. {
  75. const ScopedLock sl (pipeAndSocketLock);
  76. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  77. initialiseWithPipe (newPipe.release());
  78. return true;
  79. }
  80. return false;
  81. }
  82. void InterprocessConnection::disconnect()
  83. {
  84. if (socket != nullptr)
  85. socket->close();
  86. if (pipe != nullptr)
  87. {
  88. pipe->cancelPendingReads();
  89. pipe->close();
  90. }
  91. stopThread (4000);
  92. {
  93. const ScopedLock sl (pipeAndSocketLock);
  94. socket = nullptr;
  95. pipe = nullptr;
  96. }
  97. connectionLostInt();
  98. }
  99. bool InterprocessConnection::isConnected() const
  100. {
  101. const ScopedLock sl (pipeAndSocketLock);
  102. return ((socket != nullptr && socket->isConnected())
  103. || (pipe != nullptr && pipe->isOpen()))
  104. && isThreadRunning();
  105. }
  106. String InterprocessConnection::getConnectedHostName() const
  107. {
  108. if (pipe != nullptr)
  109. {
  110. return "localhost";
  111. }
  112. else if (socket != nullptr)
  113. {
  114. if (! socket->isLocal())
  115. return socket->getHostName();
  116. return "localhost";
  117. }
  118. return String::empty;
  119. }
  120. //==============================================================================
  121. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  122. {
  123. uint32 messageHeader[2];
  124. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  125. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  126. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  127. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  128. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  129. int bytesWritten = 0;
  130. const ScopedLock sl (pipeAndSocketLock);
  131. if (socket != nullptr)
  132. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  133. else if (pipe != nullptr)
  134. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
  135. return bytesWritten == (int) messageData.getSize();
  136. }
  137. //==============================================================================
  138. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  139. {
  140. jassert (socket == 0);
  141. socket = socket_;
  142. connectionMadeInt();
  143. startThread();
  144. }
  145. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  146. {
  147. jassert (pipe == 0);
  148. pipe = pipe_;
  149. connectionMadeInt();
  150. startThread();
  151. }
  152. //==============================================================================
  153. struct ConnectionStateMessage : public Message
  154. {
  155. ConnectionStateMessage (bool connectionMade_) noexcept
  156. : connectionMade (connectionMade_)
  157. {}
  158. bool connectionMade;
  159. };
  160. struct DataDeliveryMessage : public Message
  161. {
  162. DataDeliveryMessage (const MemoryBlock& data_)
  163. : data (data_)
  164. {}
  165. MemoryBlock data;
  166. };
  167. void InterprocessConnection::handleMessage (const Message& message)
  168. {
  169. const ConnectionStateMessage* m = dynamic_cast <const ConnectionStateMessage*> (&message);
  170. if (m != nullptr)
  171. {
  172. if (m->connectionMade)
  173. connectionMade();
  174. else
  175. connectionLost();
  176. }
  177. else
  178. {
  179. const DataDeliveryMessage* d = dynamic_cast <const DataDeliveryMessage*> (&message);
  180. if (d != nullptr)
  181. messageReceived (d->data);
  182. }
  183. }
  184. void InterprocessConnection::connectionMadeInt()
  185. {
  186. if (! callbackConnectionState)
  187. {
  188. callbackConnectionState = true;
  189. if (useMessageThread)
  190. postMessage (new ConnectionStateMessage (true));
  191. else
  192. connectionMade();
  193. }
  194. }
  195. void InterprocessConnection::connectionLostInt()
  196. {
  197. if (callbackConnectionState)
  198. {
  199. callbackConnectionState = false;
  200. if (useMessageThread)
  201. postMessage (new ConnectionStateMessage (false));
  202. else
  203. connectionLost();
  204. }
  205. }
  206. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  207. {
  208. jassert (callbackConnectionState);
  209. if (useMessageThread)
  210. postMessage (new DataDeliveryMessage (data));
  211. else
  212. messageReceived (data);
  213. }
  214. //==============================================================================
  215. bool InterprocessConnection::readNextMessageInt()
  216. {
  217. const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
  218. uint32 messageHeader[2];
  219. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  220. : pipe ->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
  221. if (bytes == sizeof (messageHeader)
  222. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  223. {
  224. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  225. if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
  226. {
  227. MemoryBlock messageData ((size_t) bytesInMessage, true);
  228. int bytesRead = 0;
  229. while (bytesInMessage > 0)
  230. {
  231. if (threadShouldExit())
  232. return false;
  233. const int numThisTime = jmin (bytesInMessage, 65536);
  234. const int bytesIn = socket != nullptr ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
  235. : pipe ->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
  236. if (bytesIn <= 0)
  237. break;
  238. bytesRead += bytesIn;
  239. bytesInMessage -= bytesIn;
  240. }
  241. if (bytesRead >= 0)
  242. deliverDataInt (messageData);
  243. }
  244. }
  245. else if (bytes < 0)
  246. {
  247. {
  248. const ScopedLock sl (pipeAndSocketLock);
  249. socket = nullptr;
  250. }
  251. connectionLostInt();
  252. return false;
  253. }
  254. return true;
  255. }
  256. void InterprocessConnection::run()
  257. {
  258. while (! threadShouldExit())
  259. {
  260. if (socket != nullptr)
  261. {
  262. const int ready = socket->waitUntilReady (true, 0);
  263. if (ready < 0)
  264. {
  265. {
  266. const ScopedLock sl (pipeAndSocketLock);
  267. socket = nullptr;
  268. }
  269. connectionLostInt();
  270. break;
  271. }
  272. else if (ready > 0)
  273. {
  274. if (! readNextMessageInt())
  275. break;
  276. }
  277. else
  278. {
  279. Thread::sleep (2);
  280. }
  281. }
  282. else if (pipe != nullptr)
  283. {
  284. if (! pipe->isOpen())
  285. {
  286. {
  287. const ScopedLock sl (pipeAndSocketLock);
  288. pipe = nullptr;
  289. }
  290. connectionLostInt();
  291. break;
  292. }
  293. else
  294. {
  295. if (! readNextMessageInt())
  296. break;
  297. }
  298. }
  299. else
  300. {
  301. break;
  302. }
  303. }
  304. }
  305. END_JUCE_NAMESPACE