Audio plugin host https://kx.studio/carla
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

366 lines
10.0KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library.
  4. Copyright (c) 2013 - Raw Material Software Ltd.
  5. Permission is granted to use this software under the terms of either:
  6. a) the GPL v2 (or any later version)
  7. b) the Affero GPL v3
  8. Details of these licenses can be found at: www.gnu.org/licenses
  9. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  10. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  11. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  12. ------------------------------------------------------------------------------
  13. To release a closed-source product which uses JUCE, commercial licenses are
  14. available: visit www.juce.com for more information.
  15. ==============================================================================
  16. */
  17. InterprocessConnection::InterprocessConnection (const bool callbacksOnMessageThread,
  18. const uint32 magicMessageHeaderNumber)
  19. : Thread ("Juce IPC connection"),
  20. callbackConnectionState (false),
  21. useMessageThread (callbacksOnMessageThread),
  22. magicMessageHeader (magicMessageHeaderNumber),
  23. pipeReceiveMessageTimeout (-1)
  24. {
  25. }
  26. InterprocessConnection::~InterprocessConnection()
  27. {
  28. callbackConnectionState = false;
  29. disconnect();
  30. masterReference.clear();
  31. }
  32. //==============================================================================
  33. bool InterprocessConnection::connectToSocket (const String& hostName,
  34. const int portNumber,
  35. const int timeOutMillisecs)
  36. {
  37. disconnect();
  38. const ScopedLock sl (pipeAndSocketLock);
  39. socket = new StreamingSocket();
  40. if (socket->connect (hostName, portNumber, timeOutMillisecs))
  41. {
  42. connectionMadeInt();
  43. startThread();
  44. return true;
  45. }
  46. else
  47. {
  48. socket = nullptr;
  49. return false;
  50. }
  51. }
  52. bool InterprocessConnection::connectToPipe (const String& pipeName, const int timeoutMs)
  53. {
  54. disconnect();
  55. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  56. if (newPipe->openExisting (pipeName))
  57. {
  58. const ScopedLock sl (pipeAndSocketLock);
  59. pipeReceiveMessageTimeout = timeoutMs;
  60. initialiseWithPipe (newPipe.release());
  61. return true;
  62. }
  63. return false;
  64. }
  65. bool InterprocessConnection::createPipe (const String& pipeName, const int timeoutMs)
  66. {
  67. disconnect();
  68. ScopedPointer <NamedPipe> newPipe (new NamedPipe());
  69. if (newPipe->createNewPipe (pipeName))
  70. {
  71. const ScopedLock sl (pipeAndSocketLock);
  72. pipeReceiveMessageTimeout = timeoutMs;
  73. initialiseWithPipe (newPipe.release());
  74. return true;
  75. }
  76. return false;
  77. }
  78. void InterprocessConnection::disconnect()
  79. {
  80. if (socket != nullptr)
  81. socket->close();
  82. if (pipe != nullptr)
  83. pipe->close();
  84. stopThread (4000);
  85. {
  86. const ScopedLock sl (pipeAndSocketLock);
  87. socket = nullptr;
  88. pipe = nullptr;
  89. }
  90. connectionLostInt();
  91. }
  92. bool InterprocessConnection::isConnected() const
  93. {
  94. const ScopedLock sl (pipeAndSocketLock);
  95. return ((socket != nullptr && socket->isConnected())
  96. || (pipe != nullptr && pipe->isOpen()))
  97. && isThreadRunning();
  98. }
  99. String InterprocessConnection::getConnectedHostName() const
  100. {
  101. if (pipe != nullptr)
  102. return "localhost";
  103. if (socket != nullptr)
  104. {
  105. if (! socket->isLocal())
  106. return socket->getHostName();
  107. return "localhost";
  108. }
  109. return String::empty;
  110. }
  111. //==============================================================================
  112. bool InterprocessConnection::sendMessage (const MemoryBlock& message)
  113. {
  114. uint32 messageHeader[2];
  115. messageHeader [0] = ByteOrder::swapIfBigEndian (magicMessageHeader);
  116. messageHeader [1] = ByteOrder::swapIfBigEndian ((uint32) message.getSize());
  117. MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
  118. messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
  119. messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
  120. int bytesWritten = 0;
  121. const ScopedLock sl (pipeAndSocketLock);
  122. if (socket != nullptr)
  123. bytesWritten = socket->write (messageData.getData(), (int) messageData.getSize());
  124. else if (pipe != nullptr)
  125. bytesWritten = pipe->write (messageData.getData(), (int) messageData.getSize(), pipeReceiveMessageTimeout);
  126. return bytesWritten == (int) messageData.getSize();
  127. }
  128. //==============================================================================
  129. void InterprocessConnection::initialiseWithSocket (StreamingSocket* const socket_)
  130. {
  131. jassert (socket == nullptr);
  132. socket = socket_;
  133. connectionMadeInt();
  134. startThread();
  135. }
  136. void InterprocessConnection::initialiseWithPipe (NamedPipe* const pipe_)
  137. {
  138. jassert (pipe == nullptr);
  139. pipe = pipe_;
  140. connectionMadeInt();
  141. startThread();
  142. }
  143. //==============================================================================
  144. struct ConnectionStateMessage : public MessageManager::MessageBase
  145. {
  146. ConnectionStateMessage (InterprocessConnection* ipc, bool connected) noexcept
  147. : owner (ipc), connectionMade (connected)
  148. {}
  149. void messageCallback() override
  150. {
  151. if (InterprocessConnection* const ipc = owner)
  152. {
  153. if (connectionMade)
  154. ipc->connectionMade();
  155. else
  156. ipc->connectionLost();
  157. }
  158. }
  159. WeakReference<InterprocessConnection> owner;
  160. bool connectionMade;
  161. JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
  162. };
  163. void InterprocessConnection::connectionMadeInt()
  164. {
  165. if (! callbackConnectionState)
  166. {
  167. callbackConnectionState = true;
  168. if (useMessageThread)
  169. (new ConnectionStateMessage (this, true))->post();
  170. else
  171. connectionMade();
  172. }
  173. }
  174. void InterprocessConnection::connectionLostInt()
  175. {
  176. if (callbackConnectionState)
  177. {
  178. callbackConnectionState = false;
  179. if (useMessageThread)
  180. (new ConnectionStateMessage (this, false))->post();
  181. else
  182. connectionLost();
  183. }
  184. }
  185. struct DataDeliveryMessage : public Message
  186. {
  187. DataDeliveryMessage (InterprocessConnection* ipc, const MemoryBlock& d)
  188. : owner (ipc), data (d)
  189. {}
  190. void messageCallback() override
  191. {
  192. if (InterprocessConnection* const ipc = owner)
  193. ipc->messageReceived (data);
  194. }
  195. WeakReference<InterprocessConnection> owner;
  196. MemoryBlock data;
  197. };
  198. void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
  199. {
  200. jassert (callbackConnectionState);
  201. if (useMessageThread)
  202. (new DataDeliveryMessage (this, data))->post();
  203. else
  204. messageReceived (data);
  205. }
  206. //==============================================================================
  207. bool InterprocessConnection::readNextMessageInt()
  208. {
  209. uint32 messageHeader[2];
  210. const int bytes = socket != nullptr ? socket->read (messageHeader, sizeof (messageHeader), true)
  211. : pipe ->read (messageHeader, sizeof (messageHeader), -1);
  212. if (bytes == sizeof (messageHeader)
  213. && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
  214. {
  215. int bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
  216. if (bytesInMessage > 0)
  217. {
  218. MemoryBlock messageData ((size_t) bytesInMessage, true);
  219. int bytesRead = 0;
  220. while (bytesInMessage > 0)
  221. {
  222. if (threadShouldExit())
  223. return false;
  224. const int numThisTime = jmin (bytesInMessage, 65536);
  225. void* const data = addBytesToPointer (messageData.getData(), bytesRead);
  226. const int bytesIn = socket != nullptr ? socket->read (data, numThisTime, true)
  227. : pipe ->read (data, numThisTime, -1);
  228. if (bytesIn <= 0)
  229. break;
  230. bytesRead += bytesIn;
  231. bytesInMessage -= bytesIn;
  232. }
  233. if (bytesRead >= 0)
  234. deliverDataInt (messageData);
  235. }
  236. }
  237. else if (bytes < 0)
  238. {
  239. if (socket != nullptr)
  240. {
  241. const ScopedLock sl (pipeAndSocketLock);
  242. socket = nullptr;
  243. }
  244. connectionLostInt();
  245. return false;
  246. }
  247. return true;
  248. }
  249. void InterprocessConnection::run()
  250. {
  251. while (! threadShouldExit())
  252. {
  253. if (socket != nullptr)
  254. {
  255. const int ready = socket->waitUntilReady (true, 0);
  256. if (ready < 0)
  257. {
  258. {
  259. const ScopedLock sl (pipeAndSocketLock);
  260. socket = nullptr;
  261. }
  262. connectionLostInt();
  263. break;
  264. }
  265. else if (ready > 0)
  266. {
  267. if (! readNextMessageInt())
  268. break;
  269. }
  270. else
  271. {
  272. Thread::sleep (1);
  273. }
  274. }
  275. else if (pipe != nullptr)
  276. {
  277. if (! pipe->isOpen())
  278. {
  279. {
  280. const ScopedLock sl (pipeAndSocketLock);
  281. pipe = nullptr;
  282. }
  283. connectionLostInt();
  284. break;
  285. }
  286. else
  287. {
  288. if (! readNextMessageInt())
  289. break;
  290. }
  291. }
  292. else
  293. {
  294. break;
  295. }
  296. }
  297. }