The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

291 lines
9.0KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library.
  4. Copyright (c) 2017 - ROLI Ltd.
  5. JUCE is an open source library subject to commercial or open-source
  6. licensing.
  7. By using JUCE, you agree to the terms of both the JUCE 5 End-User License
  8. Agreement and JUCE 5 Privacy Policy (both updated and effective as of the
  9. 27th April 2017).
  10. End User License Agreement: www.juce.com/juce-5-licence
  11. Privacy Policy: www.juce.com/juce-5-privacy-policy
  12. Or: You may also use this code under the terms of the GPL v3 (see
  13. www.gnu.org/licenses).
  14. JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
  15. EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
  16. DISCLAIMED.
  17. ==============================================================================
  18. */
  19. namespace juce
  20. {
  21. ThreadedAnalyticsDestination::ThreadedAnalyticsDestination (const String& threadName)
  22. : dispatcher (threadName, *this)
  23. {}
  24. ThreadedAnalyticsDestination::~ThreadedAnalyticsDestination()
  25. {
  26. // If you hit this assertion then the analytics thread has not been shut down
  27. // before this class is destroyed. Call stopAnalyticsThread() in your destructor!
  28. jassert (! dispatcher.isThreadRunning());
  29. }
  30. void ThreadedAnalyticsDestination::setBatchPeriod (int newBatchPeriodMilliseconds)
  31. {
  32. dispatcher.batchPeriodMilliseconds = newBatchPeriodMilliseconds;
  33. }
  34. void ThreadedAnalyticsDestination::logEvent (const AnalyticsEvent& event)
  35. {
  36. dispatcher.addToQueue (event);
  37. }
  38. void ThreadedAnalyticsDestination::startAnalyticsThread (int initialBatchPeriodMilliseconds)
  39. {
  40. setBatchPeriod (initialBatchPeriodMilliseconds);
  41. dispatcher.startThread();
  42. }
  43. void ThreadedAnalyticsDestination::stopAnalyticsThread (int timeout)
  44. {
  45. dispatcher.signalThreadShouldExit();
  46. stopLoggingEvents();
  47. dispatcher.stopThread (timeout);
  48. if (dispatcher.eventQueue.size() > 0)
  49. saveUnloggedEvents (dispatcher.eventQueue);
  50. }
  51. ThreadedAnalyticsDestination::EventDispatcher::EventDispatcher (const String& dispatcherThreadName,
  52. ThreadedAnalyticsDestination& destination)
  53. : Thread (dispatcherThreadName),
  54. parent (destination)
  55. {}
  56. void ThreadedAnalyticsDestination::EventDispatcher::run()
  57. {
  58. // We may have inserted some events into the queue (on the message thread)
  59. // before this thread has started, so make sure the old events are at the
  60. // front of the queue.
  61. {
  62. std::deque<AnalyticsEvent> restoredEventQueue;
  63. parent.restoreUnloggedEvents (restoredEventQueue);
  64. const ScopedLock lock (queueAccess);
  65. for (auto rit = restoredEventQueue.rbegin(); rit != restoredEventQueue.rend(); ++rit)
  66. eventQueue.push_front (*rit);
  67. }
  68. const int maxBatchSize = parent.getMaximumBatchSize();
  69. while (! threadShouldExit())
  70. {
  71. {
  72. const auto numEventsInBatch = eventsToSend.size();
  73. const auto freeBatchCapacity = maxBatchSize - numEventsInBatch;
  74. if (freeBatchCapacity > 0)
  75. {
  76. const auto numNewEvents = (int) eventQueue.size() - numEventsInBatch;
  77. if (numNewEvents > 0)
  78. {
  79. const ScopedLock lock (queueAccess);
  80. const auto numEventsToAdd = jmin (numNewEvents, freeBatchCapacity);
  81. const auto newBatchSize = numEventsInBatch + numEventsToAdd;
  82. for (auto i = numEventsInBatch; i < newBatchSize; ++i)
  83. eventsToSend.add (eventQueue[(size_t) i]);
  84. }
  85. }
  86. }
  87. const auto submissionTime = Time::getMillisecondCounter();
  88. if (! eventsToSend.isEmpty())
  89. {
  90. if (parent.logBatchedEvents (eventsToSend))
  91. {
  92. const ScopedLock lock (queueAccess);
  93. for (auto i = 0; i < eventsToSend.size(); ++i)
  94. eventQueue.pop_front();
  95. eventsToSend.clearQuick();
  96. }
  97. }
  98. while (Time::getMillisecondCounter() - submissionTime < (uint32) batchPeriodMilliseconds.get())
  99. {
  100. if (threadShouldExit())
  101. return;
  102. Thread::sleep (100);
  103. }
  104. }
  105. }
  106. void ThreadedAnalyticsDestination::EventDispatcher::addToQueue (const AnalyticsEvent& event)
  107. {
  108. const ScopedLock lock (queueAccess);
  109. eventQueue.push_back (event);
  110. }
  111. //==============================================================================
  112. #if JUCE_UNIT_TESTS
  113. namespace DestinationTestHelpers
  114. {
  115. //==============================================================================
  116. struct BasicDestination : public ThreadedAnalyticsDestination
  117. {
  118. BasicDestination (std::deque<AnalyticsEvent>& loggedEvents,
  119. std::deque<AnalyticsEvent>& unloggedEvents)
  120. : ThreadedAnalyticsDestination ("ThreadedAnalyticsDestinationTest"),
  121. loggedEventQueue (loggedEvents),
  122. unloggedEventStore (unloggedEvents)
  123. {
  124. startAnalyticsThread (20);
  125. }
  126. ~BasicDestination()
  127. {
  128. stopAnalyticsThread (1000);
  129. }
  130. int getMaximumBatchSize() override
  131. {
  132. return 5;
  133. }
  134. void saveUnloggedEvents (const std::deque<AnalyticsEvent>& eventsToSave) override
  135. {
  136. unloggedEventStore = eventsToSave;
  137. }
  138. void restoreUnloggedEvents (std::deque<AnalyticsEvent>& restoredEventQueue) override
  139. {
  140. restoredEventQueue = unloggedEventStore;
  141. }
  142. bool logBatchedEvents (const Array<AnalyticsEvent>& events) override
  143. {
  144. jassert (events.size() <= getMaximumBatchSize());
  145. if (loggingIsEnabled)
  146. {
  147. const ScopedLock lock (eventQueueChanging);
  148. for (auto& event : events)
  149. loggedEventQueue.push_back (event);
  150. return true;
  151. }
  152. return false;
  153. }
  154. void stopLoggingEvents() override {}
  155. void setLoggingEnabled (bool shouldLogEvents)
  156. {
  157. loggingIsEnabled = shouldLogEvents;
  158. }
  159. std::deque<AnalyticsEvent>& loggedEventQueue;
  160. std::deque<AnalyticsEvent>& unloggedEventStore;
  161. bool loggingIsEnabled = true;
  162. CriticalSection eventQueueChanging;
  163. };
  164. }
  165. //==============================================================================
  166. struct ThreadedAnalyticsDestinationTests : public UnitTest
  167. {
  168. ThreadedAnalyticsDestinationTests()
  169. : UnitTest ("ThreadedAnalyticsDestination")
  170. {}
  171. void compareEventQueues (const std::deque<AnalyticsDestination::AnalyticsEvent>& a,
  172. const std::deque<AnalyticsDestination::AnalyticsEvent>& b)
  173. {
  174. const auto numEntries = a.size();
  175. expectEquals ((int) b.size(), (int) numEntries);
  176. for (size_t i = 0; i < numEntries; ++i)
  177. {
  178. expectEquals (a[i].name, b[i].name);
  179. expect (a[i].timestamp == b[i].timestamp);
  180. }
  181. }
  182. void runTest() override
  183. {
  184. std::deque<AnalyticsDestination::AnalyticsEvent> testEvents;
  185. for (int i = 0; i < 7; ++i)
  186. testEvents.push_back ({ String (i), 0, Time::getMillisecondCounter(), {}, "TestUser", {} });
  187. std::deque<AnalyticsDestination::AnalyticsEvent> loggedEvents, unloggedEvents;
  188. beginTest ("New events");
  189. {
  190. DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
  191. for (auto& event : testEvents)
  192. destination.logEvent (event);
  193. size_t waitTime = 0, numLoggedEvents = 0;
  194. while (numLoggedEvents < testEvents.size())
  195. {
  196. if (waitTime > 4000)
  197. {
  198. expect (waitTime < 4000);
  199. break;
  200. }
  201. Thread::sleep (40);
  202. waitTime += 40;
  203. const ScopedLock lock (destination.eventQueueChanging);
  204. numLoggedEvents = loggedEvents.size();
  205. }
  206. }
  207. compareEventQueues (loggedEvents, testEvents);
  208. expect (unloggedEvents.size() == 0);
  209. loggedEvents.clear();
  210. beginTest ("Unlogged events");
  211. {
  212. DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
  213. destination.setLoggingEnabled (false);
  214. for (auto& event : testEvents)
  215. destination.logEvent (event);
  216. }
  217. compareEventQueues (unloggedEvents, testEvents);
  218. expect (loggedEvents.size() == 0);
  219. }
  220. };
  221. static ThreadedAnalyticsDestinationTests threadedAnalyticsDestinationTests;
  222. #endif
  223. } // namespace juce