The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

369 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. #include "../core/juce_StandardHeader.h"
  19. BEGIN_JUCE_NAMESPACE
  20. #include "juce_InterprocessConnection.h"
  21. #include "../memory/juce_ScopedPointer.h"
  22. //==============================================================================
  23. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  24. const uint32 magicMessageHeaderNumber)
  25. : Thread ("Juce IPC connection"),
  26. callbackConnectionState (false),
  27. useMessageThread (callbacksOnMessageThread),
  28. magicMessageHeader (magicMessageHeaderNumber),
  29. pipeReceiveMessageTimeout (-1)
  30. {
  31. }
  32. InterprocessConnection::~InterprocessConnection()
  33. {
  34. callbackConnectionState = false;
  35. disconnect();
  36. }
  37. //==============================================================================
  38. bool InterprocessConnection::connectToSocket (const String& hostName,
  39. const int portNumber,
  40. const int timeOutMillisecs)
  41. {
  42. disconnect();
  43. const ScopedLock sl (pipeAndSocketLock);
  44. socket = new StreamingSocket();
  45. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  46. {
  47. connectionMadeInt();
  48. startThread();
  49. return true;
  50. }
  51. else
  52. {
  53. socket = 0;
  54. return false;
  55. }
  56. }
  57. bool InterprocessConnection::connectToPipe (const String& pipeName,
  58. const int pipeReceiveMessageTimeoutMs)
  59. {
  60. disconnect();
  61. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  62. if (newPipe->openExisting (pipeName))
  63. {
  64. const ScopedLock sl (pipeAndSocketLock);
  65. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  66. initialiseWithPipe (newPipe.release());
  67. return true;
  68. }
  69. return false;
  70. }
  71. bool InterprocessConnection::createPipe (const String& pipeName,
  72. const int pipeReceiveMessageTimeoutMs)
  73. {
  74. disconnect();
  75. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  76. if (newPipe->createNewPipe (pipeName))
  77. {
  78. const ScopedLock sl (pipeAndSocketLock);
  79. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  80. initialiseWithPipe (newPipe.release());
  81. return true;
  82. }
  83. return false;
  84. }
  85. void InterprocessConnection::disconnect()
  86. {
  87. if (socket != 0)
  88. socket->close();
  89. if (pipe != 0)
  90. {
  91. pipe->cancelPendingReads();
  92. pipe->close();
  93. }
  94. stopThread (4000);
  95. {
  96. const ScopedLock sl (pipeAndSocketLock);
  97. socket = 0;
  98. pipe = 0;
  99. }
  100. connectionLostInt();
  101. }
  102. bool InterprocessConnection::isConnected() const
  103. {
  104. const ScopedLock sl (pipeAndSocketLock);
  105. return ((socket != 0 && socket->isConnected())
  106. || (pipe != 0 && pipe->isOpen()))
  107. && isThreadRunning();
  108. }
  109. const String InterprocessConnection::getConnectedHostName() const
  110. {
  111. if (pipe != 0)
  112. {
  113. return "localhost";
  114. }
  115. else if (socket != 0)
  116. {
  117. if (! socket->isLocal())
  118. return socket->getHostName();
  119. return "localhost";
  120. }
  121. return String::empty;
  122. }
  123. //==============================================================================
  124. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  125. {
  126. uint32 messageHeader[2];
  127. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  128. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  129. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  130. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  131. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  132. int bytesWritten = 0;
  133. const ScopedLock sl (pipeAndSocketLock);
  134. if (socket != 0)
  135. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  136. else if (pipe != 0)
  137. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
  138. return bytesWritten == (int) messageData.getSize();
  139. }
  140. //==============================================================================
  141. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  142. {
  143. jassert (socket == 0);
  144. socket = socket_;
  145. connectionMadeInt();
  146. startThread();
  147. }
  148. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  149. {
  150. jassert (pipe == 0);
  151. pipe = pipe_;
  152. connectionMadeInt();
  153. startThread();
  154. }
  155. const int messageMagicNumber = 0xb734128b;
  156. void InterprocessConnection::handleMessage (const Message& message)
  157. {
  158. if (message.intParameter1 == messageMagicNumber)
  159. {
  160. switch (message.intParameter2)
  161. {
  162. case 0:
  163. {
  164. ScopedPointer <MemoryBlock> data (static_cast <MemoryBlock*> (message.pointerParameter));
  165. messageReceived (*data);
  166. break;
  167. }
  168. case 1:
  169. connectionMade();
  170. break;
  171. case 2:
  172. connectionLost();
  173. break;
  174. }
  175. }
  176. }
  177. void InterprocessConnection::connectionMadeInt()
  178. {
  179. if (! callbackConnectionState)
  180. {
  181. callbackConnectionState = true;
  182. if (useMessageThread)
  183. postMessage (new Message (messageMagicNumber, 1, 0, 0));
  184. else
  185. connectionMade();
  186. }
  187. }
  188. void InterprocessConnection::connectionLostInt()
  189. {
  190. if (callbackConnectionState)
  191. {
  192. callbackConnectionState = false;
  193. if (useMessageThread)
  194. postMessage (new Message (messageMagicNumber, 2, 0, 0));
  195. else
  196. connectionLost();
  197. }
  198. }
  199. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  200. {
  201. jassert (callbackConnectionState);
  202. if (useMessageThread)
  203. postMessage (new Message (messageMagicNumber, 0, 0, new MemoryBlock (data)));
  204. else
  205. messageReceived (data);
  206. }
  207. //==============================================================================
  208. bool InterprocessConnection::readNextMessageInt()
  209. {
  210. const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
  211. uint32 messageHeader[2];
  212. const int bytes = (socket != 0) ? socket->read (messageHeader, sizeof (messageHeader), true)
  213. : pipe->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
  214. if (bytes == sizeof (messageHeader)
  215. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  216. {
  217. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  218. if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
  219. {
  220. MemoryBlock messageData (bytesInMessage, true);
  221. int bytesRead = 0;
  222. while (bytesInMessage > 0)
  223. {
  224. if (threadShouldExit())
  225. return false;
  226. const int numThisTime = jmin (bytesInMessage, 65536);
  227. const int bytesIn = (socket != 0) ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
  228. : pipe->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
  229. if (bytesIn <= 0)
  230. break;
  231. bytesRead += bytesIn;
  232. bytesInMessage -= bytesIn;
  233. }
  234. if (bytesRead >= 0)
  235. deliverDataInt (messageData);
  236. }
  237. }
  238. else if (bytes < 0)
  239. {
  240. {
  241. const ScopedLock sl (pipeAndSocketLock);
  242. socket = 0;
  243. }
  244. connectionLostInt();
  245. return false;
  246. }
  247. return true;
  248. }
  249. void InterprocessConnection::run()
  250. {
  251. while (! threadShouldExit())
  252. {
  253. if (socket != 0)
  254. {
  255. const int ready = socket->waitUntilReady (true, 0);
  256. if (ready < 0)
  257. {
  258. {
  259. const ScopedLock sl (pipeAndSocketLock);
  260. socket = 0;
  261. }
  262. connectionLostInt();
  263. break;
  264. }
  265. else if (ready > 0)
  266. {
  267. if (! readNextMessageInt())
  268. break;
  269. }
  270. else
  271. {
  272. Thread::sleep (2);
  273. }
  274. }
  275. else if (pipe != 0)
  276. {
  277. if (! pipe->isOpen())
  278. {
  279. {
  280. const ScopedLock sl (pipeAndSocketLock);
  281. pipe = 0;
  282. }
  283. connectionLostInt();
  284. break;
  285. }
  286. else
  287. {
  288. if (! readNextMessageInt())
  289. break;
  290. }
  291. }
  292. else
  293. {
  294. break;
  295. }
  296. }
  297. }
  298. END_JUCE_NAMESPACE