The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  19. const uint32 magicMessageHeaderNumber)
  20. : Thread ("Juce IPC connection"),
  21. callbackConnectionState (false),
  22. useMessageThread (callbacksOnMessageThread),
  23. magicMessageHeader (magicMessageHeaderNumber),
  24. pipeReceiveMessageTimeout (-1)
  25. {
  26. }
  27. InterprocessConnection::~InterprocessConnection()
  28. {
  29. callbackConnectionState = false;
  30. disconnect();
  31. masterReference.clear();
  32. }
  33. //==============================================================================
  34. bool InterprocessConnection::connectToSocket (const String& hostName,
  35. const int portNumber,
  36. const int timeOutMillisecs)
  37. {
  38. disconnect();
  39. const ScopedLock sl (pipeAndSocketLock);
  40. socket = new StreamingSocket();
  41. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  42. {
  43. connectionMadeInt();
  44. startThread();
  45. return true;
  46. }
  47. else
  48. {
  49. socket = nullptr;
  50. return false;
  51. }
  52. }
  53. bool InterprocessConnection::connectToPipe (const String& pipeName, const int timeoutMs)
  54. {
  55. disconnect();
  56. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  57. if (newPipe->openExisting (pipeName))
  58. {
  59. const ScopedLock sl (pipeAndSocketLock);
  60. pipeReceiveMessageTimeout = timeoutMs;
  61. initialiseWithPipe (newPipe.release());
  62. return true;
  63. }
  64. return false;
  65. }
  66. bool InterprocessConnection::createPipe (const String& pipeName, const int timeoutMs)
  67. {
  68. disconnect();
  69. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  70. if (newPipe->createNewPipe (pipeName))
  71. {
  72. const ScopedLock sl (pipeAndSocketLock);
  73. pipeReceiveMessageTimeout = timeoutMs;
  74. initialiseWithPipe (newPipe.release());
  75. return true;
  76. }
  77. return false;
  78. }
  79. void InterprocessConnection::disconnect()
  80. {
  81. if (socket != nullptr)
  82. socket->close();
  83. if (pipe != nullptr)
  84. pipe->close();
  85. stopThread (4000);
  86. {
  87. const ScopedLock sl (pipeAndSocketLock);
  88. socket = nullptr;
  89. pipe = nullptr;
  90. }
  91. connectionLostInt();
  92. }
  93. bool InterprocessConnection::isConnected() const
  94. {
  95. const ScopedLock sl (pipeAndSocketLock);
  96. return ((socket != nullptr && socket->isConnected())
  97. || (pipe != nullptr && pipe->isOpen()))
  98. && isThreadRunning();
  99. }
  100. String InterprocessConnection::getConnectedHostName() const
  101. {
  102. if (pipe != nullptr)
  103. return "localhost";
  104. if (socket != nullptr)
  105. {
  106. if (! socket->isLocal())
  107. return socket->getHostName();
  108. return "localhost";
  109. }
  110. return String::empty;
  111. }
  112. //==============================================================================
  113. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  114. {
  115. uint32 messageHeader[2];
  116. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  117. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  118. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  119. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  120. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  121. int bytesWritten = 0;
  122. const ScopedLock sl (pipeAndSocketLock);
  123. if (socket != nullptr)
  124. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  125. else if (pipe != nullptr)
  126. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize(), pipeReceiveMessageTimeout);
  127. return bytesWritten == (int) messageData.getSize();
  128. }
  129. //==============================================================================
  130. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  131. {
  132. jassert (socket == nullptr);
  133. socket = socket_;
  134. connectionMadeInt();
  135. startThread();
  136. }
  137. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  138. {
  139. jassert (pipe == nullptr);
  140. pipe = pipe_;
  141. connectionMadeInt();
  142. startThread();
  143. }
  144. //==============================================================================
  145. struct ConnectionStateMessage : public MessageManager::MessageBase
  146. {
  147. ConnectionStateMessage (InterprocessConnection* owner_, bool connectionMade_) noexcept
  148. : owner (owner_), connectionMade (connectionMade_)
  149. {}
  150. void messageCallback()
  151. {
  152. InterprocessConnection* const ipc = owner;
  153. if (ipc != nullptr)
  154. {
  155. if (connectionMade)
  156. ipc->connectionMade();
  157. else
  158. ipc->connectionLost();
  159. }
  160. }
  161. WeakReference<InterprocessConnection> owner;
  162. bool connectionMade;
  163. JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage);
  164. };
  165. void InterprocessConnection::connectionMadeInt()
  166. {
  167. if (! callbackConnectionState)
  168. {
  169. callbackConnectionState = true;
  170. if (useMessageThread)
  171. (new ConnectionStateMessage (this, true))->post();
  172. else
  173. connectionMade();
  174. }
  175. }
  176. void InterprocessConnection::connectionLostInt()
  177. {
  178. if (callbackConnectionState)
  179. {
  180. callbackConnectionState = false;
  181. if (useMessageThread)
  182. (new ConnectionStateMessage (this, false))->post();
  183. else
  184. connectionLost();
  185. }
  186. }
  187. struct DataDeliveryMessage : public Message
  188. {
  189. DataDeliveryMessage (InterprocessConnection* ipc, const MemoryBlock& d)
  190. : owner (ipc), data (d)
  191. {}
  192. void messageCallback()
  193. {
  194. if (InterprocessConnection* const ipc = owner)
  195. ipc->messageReceived (data);
  196. }
  197. WeakReference<InterprocessConnection> owner;
  198. MemoryBlock data;
  199. };
  200. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  201. {
  202. jassert (callbackConnectionState);
  203. if (useMessageThread)
  204. (new DataDeliveryMessage (this, data))->post();
  205. else
  206. messageReceived (data);
  207. }
  208. //==============================================================================
  209. bool InterprocessConnection::readNextMessageInt()
  210. {
  211. uint32 messageHeader[2];
  212. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  213. : pipe ->read (messageHeader, sizeof (messageHeader), -1);
  214. if (bytes == sizeof (messageHeader)
  215. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  216. {
  217. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  218. if (bytesInMessage > 0)
  219. {
  220. MemoryBlock messageData ((size_t) bytesInMessage, true);
  221. int bytesRead = 0;
  222. while (bytesInMessage > 0)
  223. {
  224. if (threadShouldExit())
  225. return false;
  226. const int numThisTime = jmin (bytesInMessage, 65536);
  227. void* const data = addBytesToPointer (messageData.getData(), bytesRead);
  228. const int bytesIn = socket != nullptr ? socket->read (data, numThisTime, true)
  229. : pipe ->read (data, numThisTime, -1);
  230. if (bytesIn <= 0)
  231. break;
  232. bytesRead += bytesIn;
  233. bytesInMessage -= bytesIn;
  234. }
  235. if (bytesRead >= 0)
  236. deliverDataInt (messageData);
  237. }
  238. }
  239. else if (bytes < 0)
  240. {
  241. {
  242. const ScopedLock sl (pipeAndSocketLock);
  243. socket = nullptr;
  244. }
  245. connectionLostInt();
  246. return false;
  247. }
  248. return true;
  249. }
  250. void InterprocessConnection::run()
  251. {
  252. while (! threadShouldExit())
  253. {
  254. if (socket != nullptr)
  255. {
  256. const int ready = socket->waitUntilReady (true, 0);
  257. if (ready < 0)
  258. {
  259. {
  260. const ScopedLock sl (pipeAndSocketLock);
  261. socket = nullptr;
  262. }
  263. connectionLostInt();
  264. break;
  265. }
  266. else if (ready > 0)
  267. {
  268. if (! readNextMessageInt())
  269. break;
  270. }
  271. else
  272. {
  273. Thread::sleep (1);
  274. }
  275. }
  276. else if (pipe != nullptr)
  277. {
  278. if (! pipe->isOpen())
  279. {
  280. {
  281. const ScopedLock sl (pipeAndSocketLock);
  282. pipe = nullptr;
  283. }
  284. connectionLostInt();
  285. break;
  286. }
  287. else
  288. {
  289. if (! readNextMessageInt())
  290. break;
  291. }
  292. }
  293. else
  294. {
  295. break;
  296. }
  297. }
  298. }