Audio plugin host https://kx.studio/carla
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

UdpMessenger.hpp 9.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /* Copyright 2016, Ableton AG, Berlin. All rights reserved.
  2. *
  3. * This program is free software: you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation, either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. * If you would like to incorporate Link into a proprietary software application,
  17. * please contact <link-devs@ableton.com>.
  18. */
  19. #pragma once
  20. #include <ableton/discovery/IpV4Interface.hpp>
  21. #include <ableton/discovery/MessageTypes.hpp>
  22. #include <ableton/discovery/v1/Messages.hpp>
  23. #include <ableton/platforms/asio/AsioWrapper.hpp>
  24. #include <ableton/util/Injected.hpp>
  25. #include <ableton/util/SafeAsyncHandler.hpp>
  26. #include <algorithm>
  27. #include <memory>
  28. namespace ableton
  29. {
  30. namespace discovery
  31. {
  32. // An exception thrown when sending a udp message fails. Stores the
  33. // interface through which the sending failed.
  34. struct UdpSendException : std::runtime_error
  35. {
  36. UdpSendException(const std::runtime_error& e, asio::ip::address ifAddr)
  37. : std::runtime_error(e.what())
  38. , interfaceAddr(std::move(ifAddr))
  39. {
  40. }
  41. asio::ip::address interfaceAddr;
  42. };
  43. // Throws UdpSendException
  44. template <typename Interface, typename NodeId, typename Payload>
  45. void sendUdpMessage(Interface& iface,
  46. NodeId from,
  47. const uint8_t ttl,
  48. const v1::MessageType messageType,
  49. const Payload& payload,
  50. const asio::ip::udp::endpoint& to)
  51. {
  52. using namespace std;
  53. v1::MessageBuffer buffer;
  54. const auto messageBegin = begin(buffer);
  55. const auto messageEnd =
  56. v1::detail::encodeMessage(move(from), ttl, messageType, payload, messageBegin);
  57. const auto numBytes = static_cast<size_t>(distance(messageBegin, messageEnd));
  58. try
  59. {
  60. iface.send(buffer.data(), numBytes, to);
  61. }
  62. catch (const std::runtime_error& err)
  63. {
  64. throw UdpSendException{err, iface.endpoint().address()};
  65. }
  66. }
  67. // UdpMessenger uses a "shared_ptr pImpl" pattern to make it movable
  68. // and to support safe async handler callbacks when receiving messages
  69. // on the given interface.
  70. template <typename Interface, typename NodeStateT, typename IoContext>
  71. class UdpMessenger
  72. {
  73. public:
  74. using NodeState = NodeStateT;
  75. using NodeId = typename NodeState::IdType;
  76. using Timer = typename util::Injected<IoContext>::type::Timer;
  77. using TimerError = typename Timer::ErrorCode;
  78. using TimePoint = typename Timer::TimePoint;
  79. UdpMessenger(util::Injected<Interface> iface,
  80. NodeState state,
  81. util::Injected<IoContext> io,
  82. const uint8_t ttl,
  83. const uint8_t ttlRatio)
  84. : mpImpl(std::make_shared<Impl>(
  85. std::move(iface), std::move(state), std::move(io), ttl, ttlRatio))
  86. {
  87. // We need to always listen for incoming traffic in order to
  88. // respond to peer state broadcasts
  89. mpImpl->listen(MulticastTag{});
  90. mpImpl->listen(UnicastTag{});
  91. mpImpl->broadcastState();
  92. }
  93. UdpMessenger(const UdpMessenger&) = delete;
  94. UdpMessenger& operator=(const UdpMessenger&) = delete;
  95. UdpMessenger(UdpMessenger&& rhs)
  96. : mpImpl(std::move(rhs.mpImpl))
  97. {
  98. }
  99. ~UdpMessenger()
  100. {
  101. if (mpImpl != nullptr)
  102. {
  103. try
  104. {
  105. mpImpl->sendByeBye();
  106. }
  107. catch (const UdpSendException& err)
  108. {
  109. debug(mpImpl->mIo->log()) << "Failed to send bye bye message: " << err.what();
  110. }
  111. }
  112. }
  113. void updateState(NodeState state)
  114. {
  115. mpImpl->updateState(std::move(state));
  116. }
  117. // Broadcast the current state of the system to all peers. May throw
  118. // std::runtime_error if assembling a broadcast message fails or if
  119. // there is an error at the transport layer. Throws on failure.
  120. void broadcastState()
  121. {
  122. mpImpl->broadcastState();
  123. }
  124. // Asynchronous receive function for incoming messages from peers. Will
  125. // return immediately and the handler will be invoked when a message
  126. // is received. Handler must have operator() overloads for PeerState and
  127. // ByeBye messages.
  128. template <typename Handler>
  129. void receive(Handler handler)
  130. {
  131. mpImpl->setReceiveHandler(std::move(handler));
  132. }
  133. private:
  134. struct Impl : std::enable_shared_from_this<Impl>
  135. {
  136. Impl(util::Injected<Interface> iface,
  137. NodeState state,
  138. util::Injected<IoContext> io,
  139. const uint8_t ttl,
  140. const uint8_t ttlRatio)
  141. : mIo(std::move(io))
  142. , mInterface(std::move(iface))
  143. , mState(std::move(state))
  144. , mTimer(mIo->makeTimer())
  145. , mLastBroadcastTime{}
  146. , mTtl(ttl)
  147. , mTtlRatio(ttlRatio)
  148. , mPeerStateHandler([](PeerState<NodeState>) {})
  149. , mByeByeHandler([](ByeBye<NodeId>) {})
  150. {
  151. }
  152. template <typename Handler>
  153. void setReceiveHandler(Handler handler)
  154. {
  155. mPeerStateHandler = [handler](
  156. PeerState<NodeState> state) { handler(std::move(state)); };
  157. mByeByeHandler = [handler](ByeBye<NodeId> byeBye) { handler(std::move(byeBye)); };
  158. }
  159. void sendByeBye()
  160. {
  161. sendUdpMessage(
  162. *mInterface, mState.ident(), 0, v1::kByeBye, makePayload(), multicastEndpoint());
  163. }
  164. void updateState(NodeState state)
  165. {
  166. mState = std::move(state);
  167. }
  168. void broadcastState()
  169. {
  170. using namespace std::chrono;
  171. const auto minBroadcastPeriod = milliseconds{50};
  172. const auto nominalBroadcastPeriod = milliseconds(mTtl * 1000 / mTtlRatio);
  173. const auto timeSinceLastBroadcast =
  174. duration_cast<milliseconds>(mTimer.now() - mLastBroadcastTime);
  175. // The rate is limited to maxBroadcastRate to prevent flooding the network.
  176. const auto delay = minBroadcastPeriod - timeSinceLastBroadcast;
  177. // Schedule the next broadcast before we actually send the
  178. // message so that if sending throws an exception we are still
  179. // scheduled to try again. We want to keep trying at our
  180. // interval as long as this instance is alive.
  181. mTimer.expires_from_now(delay > milliseconds{0} ? delay : nominalBroadcastPeriod);
  182. mTimer.async_wait([this](const TimerError e) {
  183. if (!e)
  184. {
  185. broadcastState();
  186. }
  187. });
  188. // If we're not delaying, broadcast now
  189. if (delay < milliseconds{1})
  190. {
  191. debug(mIo->log()) << "Broadcasting state";
  192. sendPeerState(v1::kAlive, multicastEndpoint());
  193. }
  194. }
  195. void sendPeerState(
  196. const v1::MessageType messageType, const asio::ip::udp::endpoint& to)
  197. {
  198. sendUdpMessage(
  199. *mInterface, mState.ident(), mTtl, messageType, toPayload(mState), to);
  200. mLastBroadcastTime = mTimer.now();
  201. }
  202. void sendResponse(const asio::ip::udp::endpoint& to)
  203. {
  204. sendPeerState(v1::kResponse, to);
  205. }
  206. template <typename Tag>
  207. void listen(Tag tag)
  208. {
  209. mInterface->receive(util::makeAsyncSafe(this->shared_from_this()), tag);
  210. }
  211. template <typename Tag, typename It>
  212. void operator()(Tag tag,
  213. const asio::ip::udp::endpoint& from,
  214. const It messageBegin,
  215. const It messageEnd)
  216. {
  217. auto result = v1::parseMessageHeader<NodeId>(messageBegin, messageEnd);
  218. const auto& header = result.first;
  219. // Ignore messages from self and other groups
  220. if (header.ident != mState.ident() && header.groupId == 0)
  221. {
  222. debug(mIo->log()) << "Received message type "
  223. << static_cast<int>(header.messageType) << " from peer "
  224. << header.ident;
  225. switch (header.messageType)
  226. {
  227. case v1::kAlive:
  228. sendResponse(from);
  229. receivePeerState(std::move(result.first), result.second, messageEnd);
  230. break;
  231. case v1::kResponse:
  232. receivePeerState(std::move(result.first), result.second, messageEnd);
  233. break;
  234. case v1::kByeBye:
  235. receiveByeBye(std::move(result.first.ident));
  236. break;
  237. default:
  238. info(mIo->log()) << "Unknown message received of type: " << header.messageType;
  239. }
  240. }
  241. listen(tag);
  242. }
  243. template <typename It>
  244. void receivePeerState(
  245. v1::MessageHeader<NodeId> header, It payloadBegin, It payloadEnd)
  246. {
  247. try
  248. {
  249. auto state = NodeState::fromPayload(
  250. std::move(header.ident), std::move(payloadBegin), std::move(payloadEnd));
  251. // Handlers must only be called once
  252. auto handler = std::move(mPeerStateHandler);
  253. mPeerStateHandler = [](PeerState<NodeState>) {};
  254. handler(PeerState<NodeState>{std::move(state), header.ttl});
  255. }
  256. catch (const std::runtime_error& err)
  257. {
  258. info(mIo->log()) << "Ignoring peer state message: " << err.what();
  259. }
  260. }
  261. void receiveByeBye(NodeId nodeId)
  262. {
  263. // Handlers must only be called once
  264. auto byeByeHandler = std::move(mByeByeHandler);
  265. mByeByeHandler = [](ByeBye<NodeId>) {};
  266. byeByeHandler(ByeBye<NodeId>{std::move(nodeId)});
  267. }
  268. util::Injected<IoContext> mIo;
  269. util::Injected<Interface> mInterface;
  270. NodeState mState;
  271. Timer mTimer;
  272. TimePoint mLastBroadcastTime;
  273. uint8_t mTtl;
  274. uint8_t mTtlRatio;
  275. std::function<void(PeerState<NodeState>)> mPeerStateHandler;
  276. std::function<void(ByeBye<NodeId>)> mByeByeHandler;
  277. };
  278. std::shared_ptr<Impl> mpImpl;
  279. };
  280. // Factory function
  281. template <typename Interface, typename NodeState, typename IoContext>
  282. UdpMessenger<Interface, NodeState, IoContext> makeUdpMessenger(
  283. util::Injected<Interface> iface,
  284. NodeState state,
  285. util::Injected<IoContext> io,
  286. const uint8_t ttl,
  287. const uint8_t ttlRatio)
  288. {
  289. return UdpMessenger<Interface, NodeState, IoContext>{
  290. std::move(iface), std::move(state), std::move(io), ttl, ttlRatio};
  291. }
  292. } // namespace discovery
  293. } // namespace ableton