Audio plugin host https://kx.studio/carla
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

PeerGateway.hpp 7.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. /* Copyright 2016, Ableton AG, Berlin. All rights reserved.
  2. *
  3. * This program is free software: you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation, either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. * If you would like to incorporate Link into a proprietary software application,
  17. * please contact <link-devs@ableton.com>.
  18. */
  19. #pragma once
  20. #include <ableton/discovery/UdpMessenger.hpp>
  21. #include <ableton/discovery/v1/Messages.hpp>
  22. #include <ableton/platforms/asio/AsioService.hpp>
  23. #include <ableton/util/SafeAsyncHandler.hpp>
  24. #include <memory>
  25. namespace ableton
  26. {
  27. namespace discovery
  28. {
  29. template <typename Messenger, typename PeerObserver, typename IoContext>
  30. class PeerGateway
  31. {
  32. public:
  33. // The peer types are defined by the observer but must match with those
  34. // used by the Messenger
  35. using ObserverT = typename util::Injected<PeerObserver>::type;
  36. using NodeState = typename ObserverT::GatewayObserverNodeState;
  37. using NodeId = typename ObserverT::GatewayObserverNodeId;
  38. using Timer = typename util::Injected<IoContext>::type::Timer;
  39. using TimerError = typename Timer::ErrorCode;
  40. PeerGateway(util::Injected<Messenger> messenger,
  41. util::Injected<PeerObserver> observer,
  42. util::Injected<IoContext> io)
  43. : mpImpl(new Impl(std::move(messenger), std::move(observer), std::move(io)))
  44. {
  45. mpImpl->listen();
  46. }
  47. PeerGateway(const PeerGateway&) = delete;
  48. PeerGateway& operator=(const PeerGateway&) = delete;
  49. PeerGateway(PeerGateway&& rhs)
  50. : mpImpl(std::move(rhs.mpImpl))
  51. {
  52. }
  53. void updateState(NodeState state)
  54. {
  55. mpImpl->updateState(std::move(state));
  56. }
  57. private:
  58. using PeerTimeout = std::pair<std::chrono::system_clock::time_point, NodeId>;
  59. using PeerTimeouts = std::vector<PeerTimeout>;
  60. struct Impl : std::enable_shared_from_this<Impl>
  61. {
  62. Impl(util::Injected<Messenger> messenger,
  63. util::Injected<PeerObserver> observer,
  64. util::Injected<IoContext> io)
  65. : mMessenger(std::move(messenger))
  66. , mObserver(std::move(observer))
  67. , mIo(std::move(io))
  68. , mPruneTimer(mIo->makeTimer())
  69. {
  70. }
  71. void updateState(NodeState state)
  72. {
  73. mMessenger->updateState(std::move(state));
  74. try
  75. {
  76. mMessenger->broadcastState();
  77. }
  78. catch (const std::runtime_error& err)
  79. {
  80. info(mIo->log()) << "State broadcast failed on gateway: " << err.what();
  81. }
  82. }
  83. void listen()
  84. {
  85. mMessenger->receive(util::makeAsyncSafe(this->shared_from_this()));
  86. }
  87. // Operators for handling incoming messages
  88. void operator()(const PeerState<NodeState>& msg)
  89. {
  90. onPeerState(msg.peerState, msg.ttl);
  91. listen();
  92. }
  93. void operator()(const ByeBye<NodeId>& msg)
  94. {
  95. onByeBye(msg.peerId);
  96. listen();
  97. }
  98. void onPeerState(const NodeState& nodeState, const int ttl)
  99. {
  100. using namespace std;
  101. const auto peerId = nodeState.ident();
  102. const auto existing = findPeer(peerId);
  103. if (existing != end(mPeerTimeouts))
  104. {
  105. // If the peer is already present in our timeout list, remove it
  106. // as it will be re-inserted below.
  107. mPeerTimeouts.erase(existing);
  108. }
  109. auto newTo = make_pair(mPruneTimer.now() + std::chrono::seconds(ttl), peerId);
  110. mPeerTimeouts.insert(
  111. upper_bound(begin(mPeerTimeouts), end(mPeerTimeouts), newTo, TimeoutCompare{}),
  112. move(newTo));
  113. sawPeer(*mObserver, nodeState);
  114. scheduleNextPruning();
  115. }
  116. void onByeBye(const NodeId& peerId)
  117. {
  118. const auto it = findPeer(peerId);
  119. if (it != mPeerTimeouts.end())
  120. {
  121. peerLeft(*mObserver, it->second);
  122. mPeerTimeouts.erase(it);
  123. }
  124. }
  125. void pruneExpiredPeers()
  126. {
  127. using namespace std;
  128. const auto test = make_pair(mPruneTimer.now(), NodeId{});
  129. debug(mIo->log()) << "pruning peers @ " << test.first.time_since_epoch().count();
  130. const auto endExpired =
  131. lower_bound(begin(mPeerTimeouts), end(mPeerTimeouts), test, TimeoutCompare{});
  132. for_each(begin(mPeerTimeouts), endExpired, [this](const PeerTimeout& pto) {
  133. info(mIo->log()) << "pruning peer " << pto.second;
  134. peerTimedOut(*mObserver, pto.second);
  135. });
  136. mPeerTimeouts.erase(begin(mPeerTimeouts), endExpired);
  137. scheduleNextPruning();
  138. }
  139. void scheduleNextPruning()
  140. {
  141. // Find the next peer to expire and set the timer based on it
  142. if (!mPeerTimeouts.empty())
  143. {
  144. // Add a second of padding to the timer to avoid over-eager timeouts
  145. const auto t = mPeerTimeouts.front().first + std::chrono::seconds(1);
  146. debug(mIo->log()) << "scheduling next pruning for "
  147. << t.time_since_epoch().count() << " because of peer "
  148. << mPeerTimeouts.front().second;
  149. mPruneTimer.expires_at(t);
  150. mPruneTimer.async_wait([this](const TimerError e) {
  151. if (!e)
  152. {
  153. pruneExpiredPeers();
  154. }
  155. });
  156. }
  157. }
  158. struct TimeoutCompare
  159. {
  160. bool operator()(const PeerTimeout& lhs, const PeerTimeout& rhs) const
  161. {
  162. return lhs.first < rhs.first;
  163. }
  164. };
  165. typename PeerTimeouts::iterator findPeer(const NodeId& peerId)
  166. {
  167. return std::find_if(begin(mPeerTimeouts), end(mPeerTimeouts),
  168. [&peerId](const PeerTimeout& pto) { return pto.second == peerId; });
  169. }
  170. util::Injected<Messenger> mMessenger;
  171. util::Injected<PeerObserver> mObserver;
  172. util::Injected<IoContext> mIo;
  173. Timer mPruneTimer;
  174. PeerTimeouts mPeerTimeouts; // Invariant: sorted by time_point
  175. };
  176. std::shared_ptr<Impl> mpImpl;
  177. };
  178. template <typename Messenger, typename PeerObserver, typename IoContext>
  179. PeerGateway<Messenger, PeerObserver, IoContext> makePeerGateway(
  180. util::Injected<Messenger> messenger,
  181. util::Injected<PeerObserver> observer,
  182. util::Injected<IoContext> io)
  183. {
  184. return {std::move(messenger), std::move(observer), std::move(io)};
  185. }
  186. // IpV4 gateway types
  187. template <typename StateQuery, typename IoContext>
  188. using IpV4Messenger =
  189. UdpMessenger<IpV4Interface<typename util::Injected<IoContext>::type&,
  190. v1::kMaxMessageSize>,
  191. StateQuery,
  192. IoContext>;
  193. template <typename PeerObserver, typename StateQuery, typename IoContext>
  194. using IpV4Gateway =
  195. PeerGateway<IpV4Messenger<StateQuery, typename util::Injected<IoContext>::type&>,
  196. PeerObserver,
  197. IoContext>;
  198. // Factory function to bind a PeerGateway to an IpV4Interface with the given address.
  199. template <typename PeerObserver, typename NodeState, typename IoContext>
  200. IpV4Gateway<PeerObserver, NodeState, IoContext> makeIpV4Gateway(
  201. util::Injected<IoContext> io,
  202. const asio::ip::address_v4& addr,
  203. util::Injected<PeerObserver> observer,
  204. NodeState state)
  205. {
  206. using namespace std;
  207. using namespace util;
  208. const uint8_t ttl = 5;
  209. const uint8_t ttlRatio = 20;
  210. auto iface = makeIpV4Interface<v1::kMaxMessageSize>(injectRef(*io), addr);
  211. auto messenger =
  212. makeUdpMessenger(injectVal(move(iface)), move(state), injectRef(*io), ttl, ttlRatio);
  213. return {injectVal(move(messenger)), move(observer), move(io)};
  214. }
  215. } // namespace discovery
  216. } // namespace ableton