Audio plugin host https://kx.studio/carla
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

PeerGateway.hpp 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. /* Copyright 2016, Ableton AG, Berlin. All rights reserved.
  2. *
  3. * This program is free software: you can redistribute it and/or modify
  4. * it under the terms of the GNU General Public License as published by
  5. * the Free Software Foundation, either version 2 of the License, or
  6. * (at your option) any later version.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. * If you would like to incorporate Link into a proprietary software application,
  17. * please contact <link-devs@ableton.com>.
  18. */
  19. #pragma once
  20. #include <ableton/discovery/UdpMessenger.hpp>
  21. #include <ableton/discovery/v1/Messages.hpp>
  22. #include <ableton/util/SafeAsyncHandler.hpp>
  23. #include <memory>
  24. namespace ableton
  25. {
  26. namespace discovery
  27. {
  28. template <typename Messenger, typename PeerObserver, typename IoContext>
  29. class PeerGateway
  30. {
  31. public:
  32. // The peer types are defined by the observer but must match with those
  33. // used by the Messenger
  34. using ObserverT = typename util::Injected<PeerObserver>::type;
  35. using NodeState = typename ObserverT::GatewayObserverNodeState;
  36. using NodeId = typename ObserverT::GatewayObserverNodeId;
  37. using Timer = typename util::Injected<IoContext>::type::Timer;
  38. using TimerError = typename Timer::ErrorCode;
  39. PeerGateway(util::Injected<Messenger> messenger,
  40. util::Injected<PeerObserver> observer,
  41. util::Injected<IoContext> io)
  42. : mpImpl(new Impl(std::move(messenger), std::move(observer), std::move(io)))
  43. {
  44. mpImpl->listen();
  45. }
  46. PeerGateway(const PeerGateway&) = delete;
  47. PeerGateway& operator=(const PeerGateway&) = delete;
  48. PeerGateway(PeerGateway&& rhs)
  49. : mpImpl(std::move(rhs.mpImpl))
  50. {
  51. }
  52. void updateState(NodeState state)
  53. {
  54. mpImpl->updateState(std::move(state));
  55. }
  56. private:
  57. using PeerTimeout = std::pair<std::chrono::system_clock::time_point, NodeId>;
  58. using PeerTimeouts = std::vector<PeerTimeout>;
  59. struct Impl : std::enable_shared_from_this<Impl>
  60. {
  61. Impl(util::Injected<Messenger> messenger,
  62. util::Injected<PeerObserver> observer,
  63. util::Injected<IoContext> io)
  64. : mMessenger(std::move(messenger))
  65. , mObserver(std::move(observer))
  66. , mIo(std::move(io))
  67. , mPruneTimer(mIo->makeTimer())
  68. {
  69. }
  70. void updateState(NodeState state)
  71. {
  72. mMessenger->updateState(std::move(state));
  73. try
  74. {
  75. mMessenger->broadcastState();
  76. }
  77. catch (const std::runtime_error& err)
  78. {
  79. info(mIo->log()) << "State broadcast failed on gateway: " << err.what();
  80. }
  81. }
  82. void listen()
  83. {
  84. mMessenger->receive(util::makeAsyncSafe(this->shared_from_this()));
  85. }
  86. // Operators for handling incoming messages
  87. void operator()(const PeerState<NodeState>& msg)
  88. {
  89. onPeerState(msg.peerState, msg.ttl);
  90. listen();
  91. }
  92. void operator()(const ByeBye<NodeId>& msg)
  93. {
  94. onByeBye(msg.peerId);
  95. listen();
  96. }
  97. void onPeerState(const NodeState& nodeState, const int ttl)
  98. {
  99. using namespace std;
  100. const auto peerId = nodeState.ident();
  101. const auto existing = findPeer(peerId);
  102. if (existing != end(mPeerTimeouts))
  103. {
  104. // If the peer is already present in our timeout list, remove it
  105. // as it will be re-inserted below.
  106. mPeerTimeouts.erase(existing);
  107. }
  108. auto newTo = make_pair(mPruneTimer.now() + std::chrono::seconds(ttl), peerId);
  109. mPeerTimeouts.insert(
  110. upper_bound(begin(mPeerTimeouts), end(mPeerTimeouts), newTo, TimeoutCompare{}),
  111. move(newTo));
  112. sawPeer(*mObserver, nodeState);
  113. scheduleNextPruning();
  114. }
  115. void onByeBye(const NodeId& peerId)
  116. {
  117. const auto it = findPeer(peerId);
  118. if (it != mPeerTimeouts.end())
  119. {
  120. peerLeft(*mObserver, it->second);
  121. mPeerTimeouts.erase(it);
  122. }
  123. }
  124. void pruneExpiredPeers()
  125. {
  126. using namespace std;
  127. const auto test = make_pair(mPruneTimer.now(), NodeId{});
  128. debug(mIo->log()) << "pruning peers @ " << test.first.time_since_epoch().count();
  129. const auto endExpired =
  130. lower_bound(begin(mPeerTimeouts), end(mPeerTimeouts), test, TimeoutCompare{});
  131. for_each(begin(mPeerTimeouts), endExpired, [this](const PeerTimeout& pto) {
  132. info(mIo->log()) << "pruning peer " << pto.second;
  133. peerTimedOut(*mObserver, pto.second);
  134. });
  135. mPeerTimeouts.erase(begin(mPeerTimeouts), endExpired);
  136. scheduleNextPruning();
  137. }
  138. void scheduleNextPruning()
  139. {
  140. // Find the next peer to expire and set the timer based on it
  141. if (!mPeerTimeouts.empty())
  142. {
  143. // Add a second of padding to the timer to avoid over-eager timeouts
  144. const auto t = mPeerTimeouts.front().first + std::chrono::seconds(1);
  145. debug(mIo->log()) << "scheduling next pruning for "
  146. << t.time_since_epoch().count() << " because of peer "
  147. << mPeerTimeouts.front().second;
  148. mPruneTimer.expires_at(t);
  149. mPruneTimer.async_wait([this](const TimerError e) {
  150. if (!e)
  151. {
  152. pruneExpiredPeers();
  153. }
  154. });
  155. }
  156. }
  157. struct TimeoutCompare
  158. {
  159. bool operator()(const PeerTimeout& lhs, const PeerTimeout& rhs) const
  160. {
  161. return lhs.first < rhs.first;
  162. }
  163. };
  164. typename PeerTimeouts::iterator findPeer(const NodeId& peerId)
  165. {
  166. return std::find_if(begin(mPeerTimeouts), end(mPeerTimeouts),
  167. [&peerId](const PeerTimeout& pto) { return pto.second == peerId; });
  168. }
  169. util::Injected<Messenger> mMessenger;
  170. util::Injected<PeerObserver> mObserver;
  171. util::Injected<IoContext> mIo;
  172. Timer mPruneTimer;
  173. PeerTimeouts mPeerTimeouts; // Invariant: sorted by time_point
  174. };
  175. std::shared_ptr<Impl> mpImpl;
  176. };
  177. template <typename Messenger, typename PeerObserver, typename IoContext>
  178. PeerGateway<Messenger, PeerObserver, IoContext> makePeerGateway(
  179. util::Injected<Messenger> messenger,
  180. util::Injected<PeerObserver> observer,
  181. util::Injected<IoContext> io)
  182. {
  183. return {std::move(messenger), std::move(observer), std::move(io)};
  184. }
  185. // IpV4 gateway types
  186. template <typename StateQuery, typename IoContext>
  187. using IpV4Messenger = UdpMessenger<
  188. IpV4Interface<typename util::Injected<IoContext>::type&, v1::kMaxMessageSize>,
  189. StateQuery,
  190. IoContext>;
  191. template <typename PeerObserver, typename StateQuery, typename IoContext>
  192. using IpV4Gateway =
  193. PeerGateway<IpV4Messenger<StateQuery, typename util::Injected<IoContext>::type&>,
  194. PeerObserver,
  195. IoContext>;
  196. // Factory function to bind a PeerGateway to an IpV4Interface with the given address.
  197. template <typename PeerObserver, typename NodeState, typename IoContext>
  198. IpV4Gateway<PeerObserver, NodeState, IoContext> makeIpV4Gateway(
  199. util::Injected<IoContext> io,
  200. const asio::ip::address_v4& addr,
  201. util::Injected<PeerObserver> observer,
  202. NodeState state)
  203. {
  204. using namespace std;
  205. using namespace util;
  206. const uint8_t ttl = 5;
  207. const uint8_t ttlRatio = 20;
  208. auto iface = makeIpV4Interface<v1::kMaxMessageSize>(injectRef(*io), addr);
  209. auto messenger =
  210. makeUdpMessenger(injectVal(move(iface)), move(state), injectRef(*io), ttl, ttlRatio);
  211. return {injectVal(move(messenger)), move(observer), move(io)};
  212. }
  213. } // namespace discovery
  214. } // namespace ableton