The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

363 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. BEGIN_JUCE_NAMESPACE
  19. //==============================================================================
  20. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  21. const uint32 magicMessageHeaderNumber)
  22. : Thread ("Juce IPC connection"),
  23. callbackConnectionState (false),
  24. useMessageThread (callbacksOnMessageThread),
  25. magicMessageHeader (magicMessageHeaderNumber),
  26. pipeReceiveMessageTimeout (-1)
  27. {
  28. }
  29. InterprocessConnection::~InterprocessConnection()
  30. {
  31. callbackConnectionState = false;
  32. disconnect();
  33. }
  34. //==============================================================================
  35. bool InterprocessConnection::connectToSocket (const String& hostName,
  36. const int portNumber,
  37. const int timeOutMillisecs)
  38. {
  39. disconnect();
  40. const ScopedLock sl (pipeAndSocketLock);
  41. socket = new StreamingSocket();
  42. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  43. {
  44. connectionMadeInt();
  45. startThread();
  46. return true;
  47. }
  48. else
  49. {
  50. socket = nullptr;
  51. return false;
  52. }
  53. }
  54. bool InterprocessConnection::connectToPipe (const String& pipeName,
  55. const int pipeReceiveMessageTimeoutMs)
  56. {
  57. disconnect();
  58. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  59. if (newPipe->openExisting (pipeName))
  60. {
  61. const ScopedLock sl (pipeAndSocketLock);
  62. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  63. initialiseWithPipe (newPipe.release());
  64. return true;
  65. }
  66. return false;
  67. }
  68. bool InterprocessConnection::createPipe (const String& pipeName,
  69. const int pipeReceiveMessageTimeoutMs)
  70. {
  71. disconnect();
  72. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  73. if (newPipe->createNewPipe (pipeName))
  74. {
  75. const ScopedLock sl (pipeAndSocketLock);
  76. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  77. initialiseWithPipe (newPipe.release());
  78. return true;
  79. }
  80. return false;
  81. }
  82. void InterprocessConnection::disconnect()
  83. {
  84. if (socket != nullptr)
  85. socket->close();
  86. if (pipe != nullptr)
  87. {
  88. pipe->cancelPendingReads();
  89. pipe->close();
  90. }
  91. stopThread (4000);
  92. {
  93. const ScopedLock sl (pipeAndSocketLock);
  94. socket = nullptr;
  95. pipe = nullptr;
  96. }
  97. connectionLostInt();
  98. }
  99. bool InterprocessConnection::isConnected() const
  100. {
  101. const ScopedLock sl (pipeAndSocketLock);
  102. return ((socket != nullptr && socket->isConnected())
  103. || (pipe != nullptr && pipe->isOpen()))
  104. && isThreadRunning();
  105. }
  106. String InterprocessConnection::getConnectedHostName() const
  107. {
  108. if (pipe != nullptr)
  109. {
  110. return "localhost";
  111. }
  112. else if (socket != nullptr)
  113. {
  114. if (! socket->isLocal())
  115. return socket->getHostName();
  116. return "localhost";
  117. }
  118. return String::empty;
  119. }
  120. //==============================================================================
  121. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  122. {
  123. uint32 messageHeader[2];
  124. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  125. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  126. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  127. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  128. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  129. int bytesWritten = 0;
  130. const ScopedLock sl (pipeAndSocketLock);
  131. if (socket != nullptr)
  132. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  133. else if (pipe != nullptr)
  134. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
  135. return bytesWritten == (int) messageData.getSize();
  136. }
  137. //==============================================================================
  138. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  139. {
  140. jassert (socket == 0);
  141. socket = socket_;
  142. connectionMadeInt();
  143. startThread();
  144. }
  145. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  146. {
  147. jassert (pipe == 0);
  148. pipe = pipe_;
  149. connectionMadeInt();
  150. startThread();
  151. }
  152. const int messageMagicNumber = 0xb734128b;
  153. void InterprocessConnection::handleMessage (const Message& message)
  154. {
  155. if (message.intParameter1 == messageMagicNumber)
  156. {
  157. switch (message.intParameter2)
  158. {
  159. case 0:
  160. {
  161. ScopedPointer <MemoryBlock> data (static_cast <MemoryBlock*> (message.pointerParameter));
  162. messageReceived (*data);
  163. break;
  164. }
  165. case 1:
  166. connectionMade();
  167. break;
  168. case 2:
  169. connectionLost();
  170. break;
  171. }
  172. }
  173. }
  174. void InterprocessConnection::connectionMadeInt()
  175. {
  176. if (! callbackConnectionState)
  177. {
  178. callbackConnectionState = true;
  179. if (useMessageThread)
  180. postMessage (new Message (messageMagicNumber, 1, 0, 0));
  181. else
  182. connectionMade();
  183. }
  184. }
  185. void InterprocessConnection::connectionLostInt()
  186. {
  187. if (callbackConnectionState)
  188. {
  189. callbackConnectionState = false;
  190. if (useMessageThread)
  191. postMessage (new Message (messageMagicNumber, 2, 0, 0));
  192. else
  193. connectionLost();
  194. }
  195. }
  196. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  197. {
  198. jassert (callbackConnectionState);
  199. if (useMessageThread)
  200. postMessage (new Message (messageMagicNumber, 0, 0, new MemoryBlock (data)));
  201. else
  202. messageReceived (data);
  203. }
  204. //==============================================================================
  205. bool InterprocessConnection::readNextMessageInt()
  206. {
  207. const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
  208. uint32 messageHeader[2];
  209. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  210. : pipe ->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
  211. if (bytes == sizeof (messageHeader)
  212. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  213. {
  214. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  215. if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
  216. {
  217. MemoryBlock messageData ((size_t) bytesInMessage, true);
  218. int bytesRead = 0;
  219. while (bytesInMessage > 0)
  220. {
  221. if (threadShouldExit())
  222. return false;
  223. const int numThisTime = jmin (bytesInMessage, 65536);
  224. const int bytesIn = socket != nullptr ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
  225. : pipe ->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
  226. if (bytesIn <= 0)
  227. break;
  228. bytesRead += bytesIn;
  229. bytesInMessage -= bytesIn;
  230. }
  231. if (bytesRead >= 0)
  232. deliverDataInt (messageData);
  233. }
  234. }
  235. else if (bytes < 0)
  236. {
  237. {
  238. const ScopedLock sl (pipeAndSocketLock);
  239. socket = nullptr;
  240. }
  241. connectionLostInt();
  242. return false;
  243. }
  244. return true;
  245. }
  246. void InterprocessConnection::run()
  247. {
  248. while (! threadShouldExit())
  249. {
  250. if (socket != nullptr)
  251. {
  252. const int ready = socket->waitUntilReady (true, 0);
  253. if (ready < 0)
  254. {
  255. {
  256. const ScopedLock sl (pipeAndSocketLock);
  257. socket = nullptr;
  258. }
  259. connectionLostInt();
  260. break;
  261. }
  262. else if (ready > 0)
  263. {
  264. if (! readNextMessageInt())
  265. break;
  266. }
  267. else
  268. {
  269. Thread::sleep (2);
  270. }
  271. }
  272. else if (pipe != nullptr)
  273. {
  274. if (! pipe->isOpen())
  275. {
  276. {
  277. const ScopedLock sl (pipeAndSocketLock);
  278. pipe = nullptr;
  279. }
  280. connectionLostInt();
  281. break;
  282. }
  283. else
  284. {
  285. if (! readNextMessageInt())
  286. break;
  287. }
  288. }
  289. else
  290. {
  291. break;
  292. }
  293. }
  294. }
  295. END_JUCE_NAMESPACE