Audio plugin host https://kx.studio/carla
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

353 lines
9.7KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library.
  4. Copyright (c) 2013 - Raw Material Software Ltd.
  5. Permission is granted to use this software under the terms of either:
  6. a) the GPL v2 (or any later version)
  7. b) the Affero GPL v3
  8. Details of these licenses can be found at: www.gnu.org/licenses
  9. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  10. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  11. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  12. ------------------------------------------------------------------------------
  13. To release a closed-source product which uses JUCE, commercial licenses are
  14. available: visit www.juce.com for more information.
  15. ==============================================================================
  16. */
  17. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  18. const uint32 magicMessageHeaderNumber)
  19. : Thread ("Juce IPC connection"),
  20. callbackConnectionState (false),
  21. useMessageThread (callbacksOnMessageThread),
  22. magicMessageHeader (magicMessageHeaderNumber),
  23. pipeReceiveMessageTimeout (-1)
  24. {
  25. }
  26. InterprocessConnection::~InterprocessConnection()
  27. {
  28. callbackConnectionState = false;
  29. disconnect();
  30. masterReference.clear();
  31. }
  32. //==============================================================================
  33. bool InterprocessConnection::connectToSocket (const String& hostName,
  34. const int portNumber,
  35. const int timeOutMillisecs)
  36. {
  37. disconnect();
  38. const ScopedLock sl (pipeAndSocketLock);
  39. socket = new StreamingSocket();
  40. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  41. {
  42. connectionMadeInt();
  43. startThread();
  44. return true;
  45. }
  46. else
  47. {
  48. socket = nullptr;
  49. return false;
  50. }
  51. }
  52. bool InterprocessConnection::connectToPipe (const String& pipeName, const int timeoutMs)
  53. {
  54. disconnect();
  55. ScopedPointer<NamedPipe> newPipe (new NamedPipe());
  56. if (newPipe->openExisting (pipeName))
  57. {
  58. const ScopedLock sl (pipeAndSocketLock);
  59. pipeReceiveMessageTimeout = timeoutMs;
  60. initialiseWithPipe (newPipe.release());
  61. return true;
  62. }
  63. return false;
  64. }
  65. bool InterprocessConnection::createPipe (const String& pipeName, const int timeoutMs)
  66. {
  67. disconnect();
  68. ScopedPointer<NamedPipe> newPipe (new NamedPipe());
  69. if (newPipe->createNewPipe (pipeName))
  70. {
  71. const ScopedLock sl (pipeAndSocketLock);
  72. pipeReceiveMessageTimeout = timeoutMs;
  73. initialiseWithPipe (newPipe.release());
  74. return true;
  75. }
  76. return false;
  77. }
  78. void InterprocessConnection::disconnect()
  79. {
  80. signalThreadShouldExit();
  81. {
  82. const ScopedLock sl (pipeAndSocketLock);
  83. if (socket != nullptr) socket->close();
  84. if (pipe != nullptr) pipe->close();
  85. }
  86. stopThread (4000);
  87. deletePipeAndSocket();
  88. connectionLostInt();
  89. }
  90. void InterprocessConnection::deletePipeAndSocket()
  91. {
  92. const ScopedLock sl (pipeAndSocketLock);
  93. socket = nullptr;
  94. pipe = nullptr;
  95. }
  96. bool InterprocessConnection::isConnected() const
  97. {
  98. const ScopedLock sl (pipeAndSocketLock);
  99. return ((socket != nullptr && socket->isConnected())
  100. || (pipe != nullptr && pipe->isOpen()))
  101. && isThreadRunning();
  102. }
  103. String InterprocessConnection::getConnectedHostName() const
  104. {
  105. if (pipe != nullptr)
  106. return "localhost";
  107. if (socket != nullptr)
  108. {
  109. if (! socket->isLocal())
  110. return socket->getHostName();
  111. return "localhost";
  112. }
  113. return String::empty;
  114. }
  115. //==============================================================================
  116. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  117. {
  118. uint32 messageHeader[2];
  119. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  120. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  121. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  122. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  123. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  124. int bytesWritten = 0;
  125. const ScopedLock sl (pipeAndSocketLock);
  126. if (socket != nullptr)
  127. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  128. else if (pipe != nullptr)
  129. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize(), pipeReceiveMessageTimeout);
  130. return bytesWritten == (int) messageData.getSize();
  131. }
  132. //==============================================================================
  133. void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
  134. {
  135. jassert (socket == nullptr && pipe == nullptr);
  136. socket = newSocket;
  137. connectionMadeInt();
  138. startThread();
  139. }
  140. void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
  141. {
  142. jassert (socket == nullptr && pipe == nullptr);
  143. pipe = newPipe;
  144. connectionMadeInt();
  145. startThread();
  146. }
  147. //==============================================================================
  148. struct ConnectionStateMessage : public MessageManager::MessageBase
  149. {
  150. ConnectionStateMessage (InterprocessConnection* ipc, bool connected) noexcept
  151. : owner (ipc), connectionMade (connected)
  152. {}
  153. void messageCallback() override
  154. {
  155. if (InterprocessConnection* const ipc = owner)
  156. {
  157. if (connectionMade)
  158. ipc->connectionMade();
  159. else
  160. ipc->connectionLost();
  161. }
  162. }
  163. WeakReference<InterprocessConnection> owner;
  164. bool connectionMade;
  165. JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
  166. };
  167. void InterprocessConnection::connectionMadeInt()
  168. {
  169. if (! callbackConnectionState)
  170. {
  171. callbackConnectionState = true;
  172. if (useMessageThread)
  173. (new ConnectionStateMessage (this, true))->post();
  174. else
  175. connectionMade();
  176. }
  177. }
  178. void InterprocessConnection::connectionLostInt()
  179. {
  180. if (callbackConnectionState)
  181. {
  182. callbackConnectionState = false;
  183. if (useMessageThread)
  184. (new ConnectionStateMessage (this, false))->post();
  185. else
  186. connectionLost();
  187. }
  188. }
  189. struct DataDeliveryMessage : public Message
  190. {
  191. DataDeliveryMessage (InterprocessConnection* ipc, const MemoryBlock& d)
  192. : owner (ipc), data (d)
  193. {}
  194. void messageCallback() override
  195. {
  196. if (InterprocessConnection* const ipc = owner)
  197. ipc->messageReceived (data);
  198. }
  199. WeakReference<InterprocessConnection> owner;
  200. MemoryBlock data;
  201. };
  202. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  203. {
  204. jassert (callbackConnectionState);
  205. if (useMessageThread)
  206. (new DataDeliveryMessage (this, data))->post();
  207. else
  208. messageReceived (data);
  209. }
  210. //==============================================================================
  211. bool InterprocessConnection::readNextMessageInt()
  212. {
  213. uint32 messageHeader[2];
  214. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  215. : pipe ->read (messageHeader, sizeof (messageHeader), -1);
  216. if (bytes == sizeof (messageHeader)
  217. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  218. {
  219. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  220. if (bytesInMessage > 0)
  221. {
  222. MemoryBlock messageData ((size_t) bytesInMessage, true);
  223. int bytesRead = 0;
  224. while (bytesInMessage > 0)
  225. {
  226. if (threadShouldExit())
  227. return false;
  228. const int numThisTime = jmin (bytesInMessage, 65536);
  229. void* const data = addBytesToPointer (messageData.getData(), bytesRead);
  230. const int bytesIn = socket != nullptr ? socket->read (data, numThisTime, true)
  231. : pipe ->read (data, numThisTime, -1);
  232. if (bytesIn <= 0)
  233. break;
  234. bytesRead += bytesIn;
  235. bytesInMessage -= bytesIn;
  236. }
  237. if (bytesRead >= 0)
  238. deliverDataInt (messageData);
  239. }
  240. }
  241. else if (bytes < 0)
  242. {
  243. if (socket != nullptr)
  244. deletePipeAndSocket();
  245. connectionLostInt();
  246. return false;
  247. }
  248. return true;
  249. }
  250. void InterprocessConnection::run()
  251. {
  252. while (! threadShouldExit())
  253. {
  254. if (socket != nullptr)
  255. {
  256. const int ready = socket->waitUntilReady (true, 0);
  257. if (ready < 0)
  258. {
  259. deletePipeAndSocket();
  260. connectionLostInt();
  261. break;
  262. }
  263. if (ready == 0)
  264. {
  265. wait (1);
  266. continue;
  267. }
  268. }
  269. else if (pipe != nullptr)
  270. {
  271. if (! pipe->isOpen())
  272. {
  273. deletePipeAndSocket();
  274. connectionLostInt();
  275. break;
  276. }
  277. }
  278. else
  279. {
  280. break;
  281. }
  282. if (threadShouldExit() || ! readNextMessageInt())
  283. break;
  284. }
  285. }