The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

369 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  19. const uint32 magicMessageHeaderNumber)
  20. : Thread ("Juce IPC connection"),
  21. callbackConnectionState (false),
  22. useMessageThread (callbacksOnMessageThread),
  23. magicMessageHeader (magicMessageHeaderNumber),
  24. pipeReceiveMessageTimeout (-1)
  25. {
  26. }
  27. InterprocessConnection::~InterprocessConnection()
  28. {
  29. callbackConnectionState = false;
  30. disconnect();
  31. masterReference.clear();
  32. }
  33. //==============================================================================
  34. bool InterprocessConnection::connectToSocket (const String& hostName,
  35. const int portNumber,
  36. const int timeOutMillisecs)
  37. {
  38. disconnect();
  39. const ScopedLock sl (pipeAndSocketLock);
  40. socket = new StreamingSocket();
  41. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  42. {
  43. connectionMadeInt();
  44. startThread();
  45. return true;
  46. }
  47. else
  48. {
  49. socket = nullptr;
  50. return false;
  51. }
  52. }
  53. bool InterprocessConnection::connectToPipe (const String& pipeName,
  54. const int pipeReceiveMessageTimeoutMs)
  55. {
  56. disconnect();
  57. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  58. if (newPipe->openExisting (pipeName))
  59. {
  60. const ScopedLock sl (pipeAndSocketLock);
  61. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  62. initialiseWithPipe (newPipe.release());
  63. return true;
  64. }
  65. return false;
  66. }
  67. bool InterprocessConnection::createPipe (const String& pipeName,
  68. const int pipeReceiveMessageTimeoutMs)
  69. {
  70. disconnect();
  71. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  72. if (newPipe->createNewPipe (pipeName))
  73. {
  74. const ScopedLock sl (pipeAndSocketLock);
  75. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  76. initialiseWithPipe (newPipe.release());
  77. return true;
  78. }
  79. return false;
  80. }
  81. void InterprocessConnection::disconnect()
  82. {
  83. if (socket != nullptr)
  84. socket->close();
  85. if (pipe != nullptr)
  86. pipe->close();
  87. stopThread (4000);
  88. {
  89. const ScopedLock sl (pipeAndSocketLock);
  90. socket = nullptr;
  91. pipe = nullptr;
  92. }
  93. connectionLostInt();
  94. }
  95. bool InterprocessConnection::isConnected() const
  96. {
  97. const ScopedLock sl (pipeAndSocketLock);
  98. return ((socket != nullptr && socket->isConnected())
  99. || (pipe != nullptr && pipe->isOpen()))
  100. && isThreadRunning();
  101. }
  102. String InterprocessConnection::getConnectedHostName() const
  103. {
  104. if (pipe != nullptr)
  105. {
  106. return "localhost";
  107. }
  108. else if (socket != nullptr)
  109. {
  110. if (! socket->isLocal())
  111. return socket->getHostName();
  112. return "localhost";
  113. }
  114. return String::empty;
  115. }
  116. //==============================================================================
  117. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  118. {
  119. uint32 messageHeader[2];
  120. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  121. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  122. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  123. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  124. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  125. int bytesWritten = 0;
  126. const ScopedLock sl (pipeAndSocketLock);
  127. if (socket != nullptr)
  128. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  129. else if (pipe != nullptr)
  130. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
  131. return bytesWritten == (int) messageData.getSize();
  132. }
  133. //==============================================================================
  134. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  135. {
  136. jassert (socket == 0);
  137. socket = socket_;
  138. connectionMadeInt();
  139. startThread();
  140. }
  141. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  142. {
  143. jassert (pipe == 0);
  144. pipe = pipe_;
  145. connectionMadeInt();
  146. startThread();
  147. }
  148. //==============================================================================
  149. struct ConnectionStateMessage : public MessageManager::MessageBase
  150. {
  151. ConnectionStateMessage (InterprocessConnection* owner_, bool connectionMade_) noexcept
  152. : owner (owner_), connectionMade (connectionMade_)
  153. {}
  154. void messageCallback()
  155. {
  156. InterprocessConnection* const ipc = owner;
  157. if (ipc != nullptr)
  158. {
  159. if (connectionMade)
  160. ipc->connectionMade();
  161. else
  162. ipc->connectionLost();
  163. }
  164. }
  165. WeakReference<InterprocessConnection> owner;
  166. bool connectionMade;
  167. };
  168. void InterprocessConnection::connectionMadeInt()
  169. {
  170. if (! callbackConnectionState)
  171. {
  172. callbackConnectionState = true;
  173. if (useMessageThread)
  174. (new ConnectionStateMessage (this, true))->post();
  175. else
  176. connectionMade();
  177. }
  178. }
  179. void InterprocessConnection::connectionLostInt()
  180. {
  181. if (callbackConnectionState)
  182. {
  183. callbackConnectionState = false;
  184. if (useMessageThread)
  185. (new ConnectionStateMessage (this, false))->post();
  186. else
  187. connectionLost();
  188. }
  189. }
  190. struct DataDeliveryMessage : public Message
  191. {
  192. DataDeliveryMessage (InterprocessConnection* owner_, const MemoryBlock& data_)
  193. : owner (owner_), data (data_)
  194. {}
  195. void messageCallback()
  196. {
  197. InterprocessConnection* const ipc = owner;
  198. if (ipc != nullptr)
  199. ipc->messageReceived (data);
  200. }
  201. WeakReference<InterprocessConnection> owner;
  202. MemoryBlock data;
  203. };
  204. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  205. {
  206. jassert (callbackConnectionState);
  207. if (useMessageThread)
  208. (new DataDeliveryMessage (this, data))->post();
  209. else
  210. messageReceived (data);
  211. }
  212. //==============================================================================
  213. bool InterprocessConnection::readNextMessageInt()
  214. {
  215. const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
  216. uint32 messageHeader[2];
  217. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  218. : pipe ->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
  219. if (bytes == sizeof (messageHeader)
  220. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  221. {
  222. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  223. if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
  224. {
  225. MemoryBlock messageData ((size_t) bytesInMessage, true);
  226. int bytesRead = 0;
  227. while (bytesInMessage > 0)
  228. {
  229. if (threadShouldExit())
  230. return false;
  231. const int numThisTime = jmin (bytesInMessage, 65536);
  232. const int bytesIn = socket != nullptr ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
  233. : pipe ->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
  234. if (bytesIn <= 0)
  235. break;
  236. bytesRead += bytesIn;
  237. bytesInMessage -= bytesIn;
  238. }
  239. if (bytesRead >= 0)
  240. deliverDataInt (messageData);
  241. }
  242. }
  243. else if (bytes < 0)
  244. {
  245. {
  246. const ScopedLock sl (pipeAndSocketLock);
  247. socket = nullptr;
  248. }
  249. connectionLostInt();
  250. return false;
  251. }
  252. return true;
  253. }
  254. void InterprocessConnection::run()
  255. {
  256. while (! threadShouldExit())
  257. {
  258. if (socket != nullptr)
  259. {
  260. const int ready = socket->waitUntilReady (true, 0);
  261. if (ready < 0)
  262. {
  263. {
  264. const ScopedLock sl (pipeAndSocketLock);
  265. socket = nullptr;
  266. }
  267. connectionLostInt();
  268. break;
  269. }
  270. else if (ready > 0)
  271. {
  272. if (! readNextMessageInt())
  273. break;
  274. }
  275. else
  276. {
  277. Thread::sleep (2);
  278. }
  279. }
  280. else if (pipe != nullptr)
  281. {
  282. if (! pipe->isOpen())
  283. {
  284. {
  285. const ScopedLock sl (pipeAndSocketLock);
  286. pipe = nullptr;
  287. }
  288. connectionLostInt();
  289. break;
  290. }
  291. else
  292. {
  293. if (! readNextMessageInt())
  294. break;
  295. }
  296. }
  297. else
  298. {
  299. break;
  300. }
  301. }
  302. }