The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

293 lines
9.1KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library.
  4. Copyright (c) 2017 - ROLI Ltd.
  5. JUCE is an open source library subject to commercial or open-source
  6. licensing.
  7. By using JUCE, you agree to the terms of both the JUCE 5 End-User License
  8. Agreement and JUCE 5 Privacy Policy (both updated and effective as of the
  9. 27th April 2017).
  10. End User License Agreement: www.juce.com/juce-5-licence
  11. Privacy Policy: www.juce.com/juce-5-privacy-policy
  12. Or: You may also use this code under the terms of the GPL v3 (see
  13. www.gnu.org/licenses).
  14. JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
  15. EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
  16. DISCLAIMED.
  17. ==============================================================================
  18. */
  19. namespace juce
  20. {
  21. ThreadedAnalyticsDestination::ThreadedAnalyticsDestination (const String& threadName)
  22. : dispatcher (threadName, *this)
  23. {}
  24. ThreadedAnalyticsDestination::~ThreadedAnalyticsDestination()
  25. {
  26. // If you hit this assertion then the analytics thread has not been shut down
  27. // before this class is destroyed. Call stopAnalyticsThread() in your destructor!
  28. jassert (! dispatcher.isThreadRunning());
  29. }
  30. void ThreadedAnalyticsDestination::setBatchPeriod (int newBatchPeriodMilliseconds)
  31. {
  32. dispatcher.batchPeriodMilliseconds = newBatchPeriodMilliseconds;
  33. }
  34. void ThreadedAnalyticsDestination::logEvent (const AnalyticsEvent& event)
  35. {
  36. dispatcher.addToQueue (event);
  37. }
  38. void ThreadedAnalyticsDestination::startAnalyticsThread (int initialBatchPeriodMilliseconds)
  39. {
  40. setBatchPeriod (initialBatchPeriodMilliseconds);
  41. dispatcher.startThread();
  42. }
  43. void ThreadedAnalyticsDestination::stopAnalyticsThread (int timeout)
  44. {
  45. dispatcher.signalThreadShouldExit();
  46. stopLoggingEvents();
  47. dispatcher.stopThread (timeout);
  48. if (dispatcher.eventQueue.size() > 0)
  49. saveUnloggedEvents (dispatcher.eventQueue);
  50. }
  51. ThreadedAnalyticsDestination::EventDispatcher::EventDispatcher (const String& dispatcherThreadName,
  52. ThreadedAnalyticsDestination& destination)
  53. : Thread (dispatcherThreadName),
  54. parent (destination)
  55. {}
  56. void ThreadedAnalyticsDestination::EventDispatcher::run()
  57. {
  58. // We may have inserted some events into the queue (on the message thread)
  59. // before this thread has started, so make sure the old events are at the
  60. // front of the queue.
  61. {
  62. std::deque<AnalyticsEvent> restoredEventQueue;
  63. parent.restoreUnloggedEvents (restoredEventQueue);
  64. const ScopedLock lock (queueAccess);
  65. for (auto rit = restoredEventQueue.rbegin(); rit != restoredEventQueue.rend(); ++rit)
  66. eventQueue.push_front (*rit);
  67. }
  68. const int maxBatchSize = parent.getMaximumBatchSize();
  69. while (! threadShouldExit())
  70. {
  71. {
  72. const auto numEventsInBatch = eventsToSend.size();
  73. const auto freeBatchCapacity = maxBatchSize - numEventsInBatch;
  74. if (freeBatchCapacity > 0)
  75. {
  76. const auto numNewEvents = (int) eventQueue.size() - numEventsInBatch;
  77. if (numNewEvents > 0)
  78. {
  79. const ScopedLock lock (queueAccess);
  80. const auto numEventsToAdd = jmin (numNewEvents, freeBatchCapacity);
  81. const auto newBatchSize = numEventsInBatch + numEventsToAdd;
  82. for (auto i = numEventsInBatch; i < newBatchSize; ++i)
  83. eventsToSend.add (eventQueue[(size_t) i]);
  84. }
  85. }
  86. }
  87. const auto submissionTime = Time::getMillisecondCounter();
  88. if (! eventsToSend.isEmpty())
  89. {
  90. if (parent.logBatchedEvents (eventsToSend))
  91. {
  92. const ScopedLock lock (queueAccess);
  93. for (auto i = 0; i < eventsToSend.size(); ++i)
  94. eventQueue.pop_front();
  95. eventsToSend.clearQuick();
  96. }
  97. }
  98. while (Time::getMillisecondCounter() - submissionTime < (uint32) batchPeriodMilliseconds.get())
  99. {
  100. if (threadShouldExit())
  101. return;
  102. Thread::sleep (100);
  103. }
  104. }
  105. }
  106. void ThreadedAnalyticsDestination::EventDispatcher::addToQueue (const AnalyticsEvent& event)
  107. {
  108. const ScopedLock lock (queueAccess);
  109. eventQueue.push_back (event);
  110. }
  111. //==============================================================================
  112. //==============================================================================
  113. #if JUCE_UNIT_TESTS
  114. namespace DestinationTestHelpers
  115. {
  116. //==============================================================================
  117. struct BasicDestination : public ThreadedAnalyticsDestination
  118. {
  119. BasicDestination (std::deque<AnalyticsEvent>& loggedEvents,
  120. std::deque<AnalyticsEvent>& unloggedEvents)
  121. : ThreadedAnalyticsDestination ("ThreadedAnalyticsDestinationTest"),
  122. loggedEventQueue (loggedEvents),
  123. unloggedEventStore (unloggedEvents)
  124. {
  125. startAnalyticsThread (20);
  126. }
  127. ~BasicDestination() override
  128. {
  129. stopAnalyticsThread (1000);
  130. }
  131. int getMaximumBatchSize() override
  132. {
  133. return 5;
  134. }
  135. void saveUnloggedEvents (const std::deque<AnalyticsEvent>& eventsToSave) override
  136. {
  137. unloggedEventStore = eventsToSave;
  138. }
  139. void restoreUnloggedEvents (std::deque<AnalyticsEvent>& restoredEventQueue) override
  140. {
  141. restoredEventQueue = unloggedEventStore;
  142. }
  143. bool logBatchedEvents (const Array<AnalyticsEvent>& events) override
  144. {
  145. jassert (events.size() <= getMaximumBatchSize());
  146. if (loggingIsEnabled)
  147. {
  148. const ScopedLock lock (eventQueueChanging);
  149. for (auto& event : events)
  150. loggedEventQueue.push_back (event);
  151. return true;
  152. }
  153. return false;
  154. }
  155. void stopLoggingEvents() override {}
  156. void setLoggingEnabled (bool shouldLogEvents)
  157. {
  158. loggingIsEnabled = shouldLogEvents;
  159. }
  160. std::deque<AnalyticsEvent>& loggedEventQueue;
  161. std::deque<AnalyticsEvent>& unloggedEventStore;
  162. bool loggingIsEnabled = true;
  163. CriticalSection eventQueueChanging;
  164. };
  165. }
  166. //==============================================================================
  167. struct ThreadedAnalyticsDestinationTests : public UnitTest
  168. {
  169. ThreadedAnalyticsDestinationTests()
  170. : UnitTest ("ThreadedAnalyticsDestination", UnitTestCategories::analytics)
  171. {}
  172. void compareEventQueues (const std::deque<AnalyticsDestination::AnalyticsEvent>& a,
  173. const std::deque<AnalyticsDestination::AnalyticsEvent>& b)
  174. {
  175. const auto numEntries = a.size();
  176. expectEquals ((int) b.size(), (int) numEntries);
  177. for (size_t i = 0; i < numEntries; ++i)
  178. {
  179. expectEquals (a[i].name, b[i].name);
  180. expect (a[i].timestamp == b[i].timestamp);
  181. }
  182. }
  183. void runTest() override
  184. {
  185. std::deque<AnalyticsDestination::AnalyticsEvent> testEvents;
  186. for (int i = 0; i < 7; ++i)
  187. testEvents.push_back ({ String (i), 0, Time::getMillisecondCounter(), {}, "TestUser", {} });
  188. std::deque<AnalyticsDestination::AnalyticsEvent> loggedEvents, unloggedEvents;
  189. beginTest ("New events");
  190. {
  191. DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
  192. for (auto& event : testEvents)
  193. destination.logEvent (event);
  194. size_t waitTime = 0, numLoggedEvents = 0;
  195. while (numLoggedEvents < testEvents.size())
  196. {
  197. if (waitTime > 4000)
  198. {
  199. expect (waitTime < 4000);
  200. break;
  201. }
  202. Thread::sleep (40);
  203. waitTime += 40;
  204. const ScopedLock lock (destination.eventQueueChanging);
  205. numLoggedEvents = loggedEvents.size();
  206. }
  207. }
  208. compareEventQueues (loggedEvents, testEvents);
  209. expect (unloggedEvents.size() == 0);
  210. loggedEvents.clear();
  211. beginTest ("Unlogged events");
  212. {
  213. DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
  214. destination.setLoggingEnabled (false);
  215. for (auto& event : testEvents)
  216. destination.logEvent (event);
  217. }
  218. compareEventQueues (unloggedEvents, testEvents);
  219. expect (loggedEvents.size() == 0);
  220. }
  221. };
  222. static ThreadedAnalyticsDestinationTests threadedAnalyticsDestinationTests;
  223. #endif
  224. } // namespace juce