The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

369 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  19. const uint32 magicMessageHeaderNumber)
  20. : Thread ("Juce IPC connection"),
  21. callbackConnectionState (false),
  22. useMessageThread (callbacksOnMessageThread),
  23. magicMessageHeader (magicMessageHeaderNumber),
  24. pipeReceiveMessageTimeout (-1)
  25. {
  26. }
  27. InterprocessConnection::~InterprocessConnection()
  28. {
  29. callbackConnectionState = false;
  30. disconnect();
  31. masterReference.clear();
  32. }
  33. //==============================================================================
  34. bool InterprocessConnection::connectToSocket (const String& hostName,
  35. const int portNumber,
  36. const int timeOutMillisecs)
  37. {
  38. disconnect();
  39. const ScopedLock sl (pipeAndSocketLock);
  40. socket = new StreamingSocket();
  41. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  42. {
  43. connectionMadeInt();
  44. startThread();
  45. return true;
  46. }
  47. else
  48. {
  49. socket = nullptr;
  50. return false;
  51. }
  52. }
  53. bool InterprocessConnection::connectToPipe (const String& pipeName, const int timeoutMs)
  54. {
  55. disconnect();
  56. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  57. if (newPipe->openExisting (pipeName))
  58. {
  59. const ScopedLock sl (pipeAndSocketLock);
  60. pipeReceiveMessageTimeout = timeoutMs;
  61. initialiseWithPipe (newPipe.release());
  62. return true;
  63. }
  64. return false;
  65. }
  66. bool InterprocessConnection::createPipe (const String& pipeName, const int timeoutMs)
  67. {
  68. disconnect();
  69. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  70. if (newPipe->createNewPipe (pipeName))
  71. {
  72. const ScopedLock sl (pipeAndSocketLock);
  73. pipeReceiveMessageTimeout = timeoutMs;
  74. initialiseWithPipe (newPipe.release());
  75. return true;
  76. }
  77. return false;
  78. }
  79. void InterprocessConnection::disconnect()
  80. {
  81. if (socket != nullptr)
  82. socket->close();
  83. if (pipe != nullptr)
  84. pipe->close();
  85. stopThread (4000);
  86. {
  87. const ScopedLock sl (pipeAndSocketLock);
  88. socket = nullptr;
  89. pipe = nullptr;
  90. }
  91. connectionLostInt();
  92. }
  93. bool InterprocessConnection::isConnected() const
  94. {
  95. const ScopedLock sl (pipeAndSocketLock);
  96. return ((socket != nullptr && socket->isConnected())
  97. || (pipe != nullptr && pipe->isOpen()))
  98. && isThreadRunning();
  99. }
  100. String InterprocessConnection::getConnectedHostName() const
  101. {
  102. if (pipe != nullptr)
  103. {
  104. return "localhost";
  105. }
  106. else if (socket != nullptr)
  107. {
  108. if (! socket->isLocal())
  109. return socket->getHostName();
  110. return "localhost";
  111. }
  112. return String::empty;
  113. }
  114. //==============================================================================
  115. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  116. {
  117. uint32 messageHeader[2];
  118. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  119. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  120. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  121. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  122. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  123. int bytesWritten = 0;
  124. const ScopedLock sl (pipeAndSocketLock);
  125. if (socket != nullptr)
  126. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  127. else if (pipe != nullptr)
  128. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize(), pipeReceiveMessageTimeout);
  129. return bytesWritten == (int) messageData.getSize();
  130. }
  131. //==============================================================================
  132. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  133. {
  134. jassert (socket == nullptr);
  135. socket = socket_;
  136. connectionMadeInt();
  137. startThread();
  138. }
  139. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  140. {
  141. jassert (pipe == nullptr);
  142. pipe = pipe_;
  143. connectionMadeInt();
  144. startThread();
  145. }
  146. //==============================================================================
  147. struct ConnectionStateMessage : public MessageManager::MessageBase
  148. {
  149. ConnectionStateMessage (InterprocessConnection* owner_, bool connectionMade_) noexcept
  150. : owner (owner_), connectionMade (connectionMade_)
  151. {}
  152. void messageCallback()
  153. {
  154. InterprocessConnection* const ipc = owner;
  155. if (ipc != nullptr)
  156. {
  157. if (connectionMade)
  158. ipc->connectionMade();
  159. else
  160. ipc->connectionLost();
  161. }
  162. }
  163. WeakReference<InterprocessConnection> owner;
  164. bool connectionMade;
  165. JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage);
  166. };
  167. void InterprocessConnection::connectionMadeInt()
  168. {
  169. if (! callbackConnectionState)
  170. {
  171. callbackConnectionState = true;
  172. if (useMessageThread)
  173. (new ConnectionStateMessage (this, true))->post();
  174. else
  175. connectionMade();
  176. }
  177. }
  178. void InterprocessConnection::connectionLostInt()
  179. {
  180. if (callbackConnectionState)
  181. {
  182. callbackConnectionState = false;
  183. if (useMessageThread)
  184. (new ConnectionStateMessage (this, false))->post();
  185. else
  186. connectionLost();
  187. }
  188. }
  189. struct DataDeliveryMessage : public Message
  190. {
  191. DataDeliveryMessage (InterprocessConnection* owner_, const MemoryBlock& data_)
  192. : owner (owner_), data (data_)
  193. {}
  194. void messageCallback()
  195. {
  196. InterprocessConnection* const ipc = owner;
  197. if (ipc != nullptr)
  198. ipc->messageReceived (data);
  199. }
  200. WeakReference<InterprocessConnection> owner;
  201. MemoryBlock data;
  202. };
  203. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  204. {
  205. jassert (callbackConnectionState);
  206. if (useMessageThread)
  207. (new DataDeliveryMessage (this, data))->post();
  208. else
  209. messageReceived (data);
  210. }
  211. //==============================================================================
  212. bool InterprocessConnection::readNextMessageInt()
  213. {
  214. uint32 messageHeader[2];
  215. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  216. : pipe ->read (messageHeader, sizeof (messageHeader), -1);
  217. if (bytes == sizeof (messageHeader)
  218. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  219. {
  220. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  221. if (bytesInMessage > 0)
  222. {
  223. MemoryBlock messageData ((size_t) bytesInMessage, true);
  224. int bytesRead = 0;
  225. while (bytesInMessage > 0)
  226. {
  227. if (threadShouldExit())
  228. return false;
  229. const int numThisTime = jmin (bytesInMessage, 65536);
  230. void* const data = addBytesToPointer (messageData.getData(), bytesRead);
  231. const int bytesIn = socket != nullptr ? socket->read (data, numThisTime, true)
  232. : pipe ->read (data, numThisTime, -1);
  233. if (bytesIn <= 0)
  234. break;
  235. bytesRead += bytesIn;
  236. bytesInMessage -= bytesIn;
  237. }
  238. if (bytesRead >= 0)
  239. deliverDataInt (messageData);
  240. }
  241. }
  242. else if (bytes < 0)
  243. {
  244. {
  245. const ScopedLock sl (pipeAndSocketLock);
  246. socket = nullptr;
  247. }
  248. connectionLostInt();
  249. return false;
  250. }
  251. return true;
  252. }
  253. void InterprocessConnection::run()
  254. {
  255. while (! threadShouldExit())
  256. {
  257. if (socket != nullptr)
  258. {
  259. const int ready = socket->waitUntilReady (true, 0);
  260. if (ready < 0)
  261. {
  262. {
  263. const ScopedLock sl (pipeAndSocketLock);
  264. socket = nullptr;
  265. }
  266. connectionLostInt();
  267. break;
  268. }
  269. else if (ready > 0)
  270. {
  271. if (! readNextMessageInt())
  272. break;
  273. }
  274. else
  275. {
  276. Thread::sleep (1);
  277. }
  278. }
  279. else if (pipe != nullptr)
  280. {
  281. if (! pipe->isOpen())
  282. {
  283. {
  284. const ScopedLock sl (pipeAndSocketLock);
  285. pipe = nullptr;
  286. }
  287. connectionLostInt();
  288. break;
  289. }
  290. else
  291. {
  292. if (! readNextMessageInt())
  293. break;
  294. }
  295. }
  296. else
  297. {
  298. break;
  299. }
  300. }
  301. }