The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  19. const uint32 magicMessageHeaderNumber)
  20. : Thread ("Juce IPC connection"),
  21. callbackConnectionState (false),
  22. useMessageThread (callbacksOnMessageThread),
  23. magicMessageHeader (magicMessageHeaderNumber),
  24. pipeReceiveMessageTimeout (-1)
  25. {
  26. }
  27. InterprocessConnection::~InterprocessConnection()
  28. {
  29. callbackConnectionState = false;
  30. disconnect();
  31. masterReference.clear();
  32. }
  33. //==============================================================================
  34. bool InterprocessConnection::connectToSocket (const String& hostName,
  35. const int portNumber,
  36. const int timeOutMillisecs)
  37. {
  38. disconnect();
  39. const ScopedLock sl (pipeAndSocketLock);
  40. socket = new StreamingSocket();
  41. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  42. {
  43. connectionMadeInt();
  44. startThread();
  45. return true;
  46. }
  47. else
  48. {
  49. socket = nullptr;
  50. return false;
  51. }
  52. }
  53. bool InterprocessConnection::connectToPipe (const String& pipeName,
  54. const int pipeReceiveMessageTimeoutMs)
  55. {
  56. disconnect();
  57. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  58. if (newPipe->openExisting (pipeName))
  59. {
  60. const ScopedLock sl (pipeAndSocketLock);
  61. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  62. initialiseWithPipe (newPipe.release());
  63. return true;
  64. }
  65. return false;
  66. }
  67. bool InterprocessConnection::createPipe (const String& pipeName,
  68. const int pipeReceiveMessageTimeoutMs)
  69. {
  70. disconnect();
  71. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  72. if (newPipe->createNewPipe (pipeName))
  73. {
  74. const ScopedLock sl (pipeAndSocketLock);
  75. pipeReceiveMessageTimeout = pipeReceiveMessageTimeoutMs;
  76. initialiseWithPipe (newPipe.release());
  77. return true;
  78. }
  79. return false;
  80. }
  81. void InterprocessConnection::disconnect()
  82. {
  83. if (socket != nullptr)
  84. socket->close();
  85. if (pipe != nullptr)
  86. {
  87. pipe->cancelPendingReads();
  88. pipe->close();
  89. }
  90. stopThread (4000);
  91. {
  92. const ScopedLock sl (pipeAndSocketLock);
  93. socket = nullptr;
  94. pipe = nullptr;
  95. }
  96. connectionLostInt();
  97. }
  98. bool InterprocessConnection::isConnected() const
  99. {
  100. const ScopedLock sl (pipeAndSocketLock);
  101. return ((socket != nullptr && socket->isConnected())
  102. || (pipe != nullptr && pipe->isOpen()))
  103. && isThreadRunning();
  104. }
  105. String InterprocessConnection::getConnectedHostName() const
  106. {
  107. if (pipe != nullptr)
  108. {
  109. return "localhost";
  110. }
  111. else if (socket != nullptr)
  112. {
  113. if (! socket->isLocal())
  114. return socket->getHostName();
  115. return "localhost";
  116. }
  117. return String::empty;
  118. }
  119. //==============================================================================
  120. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  121. {
  122. uint32 messageHeader[2];
  123. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  124. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  125. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  126. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  127. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  128. int bytesWritten = 0;
  129. const ScopedLock sl (pipeAndSocketLock);
  130. if (socket != nullptr)
  131. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  132. else if (pipe != nullptr)
  133. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize());
  134. return bytesWritten == (int) messageData.getSize();
  135. }
  136. //==============================================================================
  137. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  138. {
  139. jassert (socket == 0);
  140. socket = socket_;
  141. connectionMadeInt();
  142. startThread();
  143. }
  144. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  145. {
  146. jassert (pipe == 0);
  147. pipe = pipe_;
  148. connectionMadeInt();
  149. startThread();
  150. }
  151. //==============================================================================
  152. struct ConnectionStateMessage : public MessageManager::MessageBase
  153. {
  154. ConnectionStateMessage (InterprocessConnection* owner_, bool connectionMade_) noexcept
  155. : owner (owner_), connectionMade (connectionMade_)
  156. {}
  157. void messageCallback()
  158. {
  159. InterprocessConnection* const ipc = owner;
  160. if (ipc != nullptr)
  161. {
  162. if (connectionMade)
  163. ipc->connectionMade();
  164. else
  165. ipc->connectionLost();
  166. }
  167. }
  168. WeakReference<InterprocessConnection> owner;
  169. bool connectionMade;
  170. };
  171. void InterprocessConnection::connectionMadeInt()
  172. {
  173. if (! callbackConnectionState)
  174. {
  175. callbackConnectionState = true;
  176. if (useMessageThread)
  177. (new ConnectionStateMessage (this, true))->post();
  178. else
  179. connectionMade();
  180. }
  181. }
  182. void InterprocessConnection::connectionLostInt()
  183. {
  184. if (callbackConnectionState)
  185. {
  186. callbackConnectionState = false;
  187. if (useMessageThread)
  188. (new ConnectionStateMessage (this, false))->post();
  189. else
  190. connectionLost();
  191. }
  192. }
  193. struct DataDeliveryMessage : public Message
  194. {
  195. DataDeliveryMessage (InterprocessConnection* owner_, const MemoryBlock& data_)
  196. : owner (owner_), data (data_)
  197. {}
  198. void messageCallback()
  199. {
  200. InterprocessConnection* const ipc = owner;
  201. if (ipc != nullptr)
  202. ipc->messageReceived (data);
  203. }
  204. WeakReference<InterprocessConnection> owner;
  205. MemoryBlock data;
  206. };
  207. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  208. {
  209. jassert (callbackConnectionState);
  210. if (useMessageThread)
  211. (new DataDeliveryMessage (this, data))->post();
  212. else
  213. messageReceived (data);
  214. }
  215. //==============================================================================
  216. bool InterprocessConnection::readNextMessageInt()
  217. {
  218. const int maximumMessageSize = 1024 * 1024 * 10; // sanity check
  219. uint32 messageHeader[2];
  220. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  221. : pipe ->read (messageHeader, sizeof (messageHeader), pipeReceiveMessageTimeout);
  222. if (bytes == sizeof (messageHeader)
  223. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  224. {
  225. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  226. if (bytesInMessage > 0 && bytesInMessage < maximumMessageSize)
  227. {
  228. MemoryBlock messageData ((size_t) bytesInMessage, true);
  229. int bytesRead = 0;
  230. while (bytesInMessage > 0)
  231. {
  232. if (threadShouldExit())
  233. return false;
  234. const int numThisTime = jmin (bytesInMessage, 65536);
  235. const int bytesIn = socket != nullptr ? socket->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, true)
  236. : pipe ->read (static_cast <char*> (messageData.getData()) + bytesRead, numThisTime, pipeReceiveMessageTimeout);
  237. if (bytesIn <= 0)
  238. break;
  239. bytesRead += bytesIn;
  240. bytesInMessage -= bytesIn;
  241. }
  242. if (bytesRead >= 0)
  243. deliverDataInt (messageData);
  244. }
  245. }
  246. else if (bytes < 0)
  247. {
  248. {
  249. const ScopedLock sl (pipeAndSocketLock);
  250. socket = nullptr;
  251. }
  252. connectionLostInt();
  253. return false;
  254. }
  255. return true;
  256. }
  257. void InterprocessConnection::run()
  258. {
  259. while (! threadShouldExit())
  260. {
  261. if (socket != nullptr)
  262. {
  263. const int ready = socket->waitUntilReady (true, 0);
  264. if (ready < 0)
  265. {
  266. {
  267. const ScopedLock sl (pipeAndSocketLock);
  268. socket = nullptr;
  269. }
  270. connectionLostInt();
  271. break;
  272. }
  273. else if (ready > 0)
  274. {
  275. if (! readNextMessageInt())
  276. break;
  277. }
  278. else
  279. {
  280. Thread::sleep (2);
  281. }
  282. }
  283. else if (pipe != nullptr)
  284. {
  285. if (! pipe->isOpen())
  286. {
  287. {
  288. const ScopedLock sl (pipeAndSocketLock);
  289. pipe = nullptr;
  290. }
  291. connectionLostInt();
  292. break;
  293. }
  294. else
  295. {
  296. if (! readNextMessageInt())
  297. break;
  298. }
  299. }
  300. else
  301. {
  302. break;
  303. }
  304. }
  305. }