The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

370 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-10 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. #include "../core/juce_StandardHeader.h"
  19. BEGIN_JUCE_NAMESPACE
  20. #include "juce_InterprocessConnection.h"
  21. #include "../threads/juce_ScopedLock.h"
  22. #include "../memory/juce_ScopedPointer.h"
  23. //==============================================================================
  24. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  25. const uint32 magicMessageHeaderNumber)
  26. : Thread ("Juce IPC connection"),
  27. callbackConnectionState (false),
  28. useMessageThread (callbacksOnMessageThread),
  29. magicMessageHeader (magicMessageHeaderNumber),
  30. pipeReceiveMessageTimeout (-1)
  31. {
  32. }
  33. InterprocessConnection::~InterprocessConnection()
  34. {
  35. callbackConnectionState = false;
  36. disconnect();
  37. }
  38. //==============================================================================
  39. bool InterprocessConnection::connectToSocket (const String& hostName,
  40. const int portNumber,
  41. const int timeOutMillisecs)
  42. {
  43. disconnect();
  44. const ScopedLock sl (pipeAndSocketLock);
  45. socket = new StreamingSocket();
  46. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  47. {
  48. connectionMadeInt();
  49. startThread();
  50. return true;
  51. }
  52. else
  53. {
  54. socket = 0;
  55. return false;
  56. }
  57. }
  58. bool InterprocessConnection::connectToPipe (const String& pipeName,
  59. const int pipeReceiveMessageTimeoutMs)
  60. {
  61. disconnect();
  62. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  63. if (newPipe->openExisting (pipeName))
  64. {
  65. const ScopedLock sl (pipeAndSocketLock);
  66. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  67. initialiseWithPipe (newPipe.release());
  68. return true;
  69. }
  70. return false;
  71. }
  72. bool InterprocessConnection::createPipe (const String& pipeName,
  73. const int pipeReceiveMessageTimeoutMs)
  74. {
  75. disconnect();
  76. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  77. if (newPipe->createNewPipe (pipeName))
  78. {
  79. const ScopedLock sl (pipeAndSocketLock);
  80. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  81. initialiseWithPipe (newPipe.release());
  82. return true;
  83. }
  84. return false;
  85. }
  86. void InterprocessConnection::disconnect()
  87. {
  88. if (socket != 0)
  89. socket->close();
  90. if (pipe != 0)
  91. {
  92. pipe->cancelPendingReads();
  93. pipe->close();
  94. }
  95. stopThread (4000);
  96. {
  97. const ScopedLock sl (pipeAndSocketLock);
  98. socket = 0;
  99. pipe = 0;
  100. }
  101. connectionLostInt();
  102. }
  103. bool InterprocessConnection::isConnected() const
  104. {
  105. const ScopedLock sl (pipeAndSocketLock);
  106. return ((socket != 0 && socket->isConnected())
  107. || (pipe != 0 && pipe->isOpen()))
  108. && isThreadRunning();
  109. }
  110. const String InterprocessConnection::getConnectedHostName() const
  111. {
  112. if (pipe != 0)
  113. {
  114. return "localhost";
  115. }
  116. else if (socket != 0)
  117. {
  118. if (! socket->isLocal())
  119. return socket->getHostName();
  120. return "localhost";
  121. }
  122. return String::empty;
  123. }
  124. //==============================================================================
  125. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  126. {
  127. uint32 messageHeader[2];
  128. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  129. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  130. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  131. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  132. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  133. int bytesWritten = 0;
  134. const ScopedLock sl (pipeAndSocketLock);
  135. if (socket != 0)
  136. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  137. else if (pipe != 0)
  138. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
  139. return bytesWritten == (int) messageData.getSize();
  140. }
  141. //==============================================================================
  142. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  143. {
  144. jassert (socket == 0);
  145. socket = socket_;
  146. connectionMadeInt();
  147. startThread();
  148. }
  149. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  150. {
  151. jassert (pipe == 0);
  152. pipe = pipe_;
  153. connectionMadeInt();
  154. startThread();
  155. }
  156. const int messageMagicNumber = 0xb734128b;
  157. void InterprocessConnection::handleMessage (const Message& message)
  158. {
  159. if (message.intParameter1 == messageMagicNumber)
  160. {
  161. switch (message.intParameter2)
  162. {
  163. case 0:
  164. {
  165. ScopedPointer <MemoryBlock> data (static_cast <MemoryBlock*> (message.pointerParameter));
  166. messageReceived (*data);
  167. break;
  168. }
  169. case 1:
  170. connectionMade();
  171. break;
  172. case 2:
  173. connectionLost();
  174. break;
  175. }
  176. }
  177. }
  178. void InterprocessConnection::connectionMadeInt()
  179. {
  180. if (! callbackConnectionState)
  181. {
  182. callbackConnectionState = true;
  183. if (useMessageThread)
  184. postMessage (new Message (messageMagicNumber, 1, 0, 0));
  185. else
  186. connectionMade();
  187. }
  188. }
  189. void InterprocessConnection::connectionLostInt()
  190. {
  191. if (callbackConnectionState)
  192. {
  193. callbackConnectionState = false;
  194. if (useMessageThread)
  195. postMessage (new Message (messageMagicNumber, 2, 0, 0));
  196. else
  197. connectionLost();
  198. }
  199. }
  200. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  201. {
  202. jassert (callbackConnectionState);
  203. if (useMessageThread)
  204. postMessage (new Message (messageMagicNumber, 0, 0, new MemoryBlock (data)));
  205. else
  206. messageReceived (data);
  207. }
  208. //==============================================================================
  209. bool InterprocessConnection::readNextMessageInt()
  210. {
  211. const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
  212. uint32 messageHeader[2];
  213. const int bytes = (socket != 0) ? socket->read (messageHeader, sizeof (messageHeader), true)
  214. : pipe->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
  215. if (bytes == sizeof (messageHeader)
  216. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  217. {
  218. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  219. if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
  220. {
  221. MemoryBlock messageData (bytesInMessage, true);
  222. int bytesRead = 0;
  223. while (bytesInMessage > 0)
  224. {
  225. if (threadShouldExit())
  226. return false;
  227. const int numThisTime = jmin (bytesInMessage, 65536);
  228. const int bytesIn = (socket != 0) ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
  229. : pipe->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
  230. if (bytesIn <= 0)
  231. break;
  232. bytesRead += bytesIn;
  233. bytesInMessage -= bytesIn;
  234. }
  235. if (bytesRead >= 0)
  236. deliverDataInt (messageData);
  237. }
  238. }
  239. else if (bytes < 0)
  240. {
  241. {
  242. const ScopedLock sl (pipeAndSocketLock);
  243. socket = 0;
  244. }
  245. connectionLostInt();
  246. return false;
  247. }
  248. return true;
  249. }
  250. void InterprocessConnection::run()
  251. {
  252. while (! threadShouldExit())
  253. {
  254. if (socket != 0)
  255. {
  256. const int ready = socket->waitUntilReady (true, 0);
  257. if (ready < 0)
  258. {
  259. {
  260. const ScopedLock sl (pipeAndSocketLock);
  261. socket = 0;
  262. }
  263. connectionLostInt();
  264. break;
  265. }
  266. else if (ready > 0)
  267. {
  268. if (! readNextMessageInt())
  269. break;
  270. }
  271. else
  272. {
  273. Thread::sleep (2);
  274. }
  275. }
  276. else if (pipe != 0)
  277. {
  278. if (! pipe->isOpen())
  279. {
  280. {
  281. const ScopedLock sl (pipeAndSocketLock);
  282. pipe = 0;
  283. }
  284. connectionLostInt();
  285. break;
  286. }
  287. else
  288. {
  289. if (! readNextMessageInt())
  290. break;
  291. }
  292. }
  293. else
  294. {
  295. break;
  296. }
  297. }
  298. }
  299. END_JUCE_NAMESPACE