The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

287 lines
8.8KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library.
  4. Copyright (c) 2017 - ROLI Ltd.
  5. JUCE is an open source library subject to commercial or open-source
  6. licensing.
  7. By using JUCE, you agree to the terms of both the JUCE 5 End-User License
  8. Agreement and JUCE 5 Privacy Policy (both updated and effective as of the
  9. 27th April 2017).
  10. End User License Agreement: www.juce.com/juce-5-licence
  11. Privacy Policy: www.juce.com/juce-5-privacy-policy
  12. Or: You may also use this code under the terms of the GPL v3 (see
  13. www.gnu.org/licenses).
  14. JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
  15. EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
  16. DISCLAIMED.
  17. ==============================================================================
  18. */
  19. namespace juce
  20. {
  21. ThreadedAnalyticsDestination::ThreadedAnalyticsDestination (const String& threadName)
  22. : dispatcher (threadName, *this)
  23. {}
  24. ThreadedAnalyticsDestination::~ThreadedAnalyticsDestination()
  25. {
  26. // If you hit this assertion then the analytics thread has not been shut down
  27. // before this class is destroyed. Call stopAnalyticsThread() in your destructor!
  28. jassert (! dispatcher.isThreadRunning());
  29. }
  30. void ThreadedAnalyticsDestination::setBatchPeriod (int newBatchPeriodMilliseconds)
  31. {
  32. dispatcher.batchPeriodMilliseconds = newBatchPeriodMilliseconds;
  33. }
  34. void ThreadedAnalyticsDestination::logEvent (const AnalyticsEvent& event)
  35. {
  36. dispatcher.addToQueue (event);
  37. }
  38. void ThreadedAnalyticsDestination::startAnalyticsThread (int initialBatchPeriodMilliseconds)
  39. {
  40. setBatchPeriod (initialBatchPeriodMilliseconds);
  41. dispatcher.startThread();
  42. }
  43. void ThreadedAnalyticsDestination::stopAnalyticsThread (int timeout)
  44. {
  45. dispatcher.signalThreadShouldExit();
  46. stopLoggingEvents();
  47. dispatcher.stopThread (timeout);
  48. if (dispatcher.eventQueue.size() > 0)
  49. saveUnloggedEvents (dispatcher.eventQueue);
  50. }
  51. ThreadedAnalyticsDestination::EventDispatcher::EventDispatcher (const String& threadName,
  52. ThreadedAnalyticsDestination& destination)
  53. : Thread (threadName),
  54. parent (destination)
  55. {}
  56. void ThreadedAnalyticsDestination::EventDispatcher::run()
  57. {
  58. // We may have inserted some events into the queue (on the message thread)
  59. // before this thread has started, so make sure the old events are at the
  60. // front of the queue.
  61. {
  62. std::deque<AnalyticsEvent> restoredEventQueue;
  63. parent.restoreUnloggedEvents (restoredEventQueue);
  64. const ScopedLock lock (queueAccess);
  65. for (auto rit = restoredEventQueue.rbegin(); rit != restoredEventQueue.rend(); ++rit)
  66. eventQueue.push_front (*rit);
  67. }
  68. const int maxBatchSize = parent.getMaximumBatchSize();
  69. while (! threadShouldExit())
  70. {
  71. auto eventsToSendCapacity = maxBatchSize - eventsToSend.size();
  72. if (eventsToSendCapacity > 0)
  73. {
  74. const ScopedLock lock (queueAccess);
  75. const auto numEventsInQueue = (int) eventQueue.size();
  76. if (numEventsInQueue > 0)
  77. {
  78. const auto numEventsToAdd = jmin (eventsToSendCapacity, numEventsInQueue);
  79. for (size_t i = 0; i < (size_t) numEventsToAdd; ++i)
  80. eventsToSend.add (eventQueue[i]);
  81. }
  82. }
  83. const auto submissionTime = Time::getMillisecondCounter();
  84. if (! eventsToSend.isEmpty())
  85. {
  86. if (parent.logBatchedEvents (eventsToSend))
  87. {
  88. const ScopedLock lock (queueAccess);
  89. for (auto i = 0; i < eventsToSend.size(); ++i)
  90. eventQueue.pop_front();
  91. eventsToSend.clearQuick();
  92. }
  93. }
  94. while (Time::getMillisecondCounter() - submissionTime < (uint32) batchPeriodMilliseconds.get())
  95. {
  96. if (threadShouldExit())
  97. return;
  98. Thread::sleep (100);
  99. }
  100. }
  101. }
  102. void ThreadedAnalyticsDestination::EventDispatcher::addToQueue (const AnalyticsEvent& event)
  103. {
  104. const ScopedLock lock (queueAccess);
  105. eventQueue.push_back (event);
  106. }
  107. //==============================================================================
  108. #if JUCE_UNIT_TESTS
  109. namespace DestinationTestHelpers
  110. {
  111. //==============================================================================
  112. struct BasicDestination : public ThreadedAnalyticsDestination
  113. {
  114. BasicDestination (std::deque<AnalyticsEvent>& loggedEvents,
  115. std::deque<AnalyticsEvent>& unloggedEvents)
  116. : ThreadedAnalyticsDestination ("ThreadedAnalyticsDestinationTest"),
  117. loggedEventQueue (loggedEvents),
  118. unloggedEventStore (unloggedEvents)
  119. {
  120. startAnalyticsThread (20);
  121. }
  122. ~BasicDestination()
  123. {
  124. stopAnalyticsThread (1000);
  125. }
  126. int getMaximumBatchSize() override
  127. {
  128. return 5;
  129. }
  130. void saveUnloggedEvents (const std::deque<AnalyticsEvent>& eventsToSave) override
  131. {
  132. unloggedEventStore = eventsToSave;
  133. }
  134. void restoreUnloggedEvents (std::deque<AnalyticsEvent>& restoredEventQueue) override
  135. {
  136. restoredEventQueue = unloggedEventStore;
  137. }
  138. bool logBatchedEvents (const Array<AnalyticsEvent>& events) override
  139. {
  140. jassert (events.size() <= getMaximumBatchSize());
  141. if (loggingIsEnabled)
  142. {
  143. const ScopedLock lock (eventQueueChanging);
  144. for (auto& event : events)
  145. loggedEventQueue.push_back (event);
  146. return true;
  147. }
  148. return false;
  149. }
  150. void stopLoggingEvents() override {}
  151. void setLoggingEnabled (bool shouldLogEvents)
  152. {
  153. loggingIsEnabled = shouldLogEvents;
  154. }
  155. std::deque<AnalyticsEvent>& loggedEventQueue;
  156. std::deque<AnalyticsEvent>& unloggedEventStore;
  157. bool loggingIsEnabled = true;
  158. CriticalSection eventQueueChanging;
  159. };
  160. }
  161. //==============================================================================
  162. struct ThreadedAnalyticsDestinationTests : public UnitTest
  163. {
  164. ThreadedAnalyticsDestinationTests()
  165. : UnitTest ("ThreadedAnalyticsDestination")
  166. {}
  167. void compareEventQueues (const std::deque<AnalyticsDestination::AnalyticsEvent>& a,
  168. const std::deque<AnalyticsDestination::AnalyticsEvent>& b)
  169. {
  170. const auto numEntries = a.size();
  171. expectEquals (b.size(), numEntries);
  172. for (size_t i = 0; i < numEntries; ++i)
  173. {
  174. expectEquals (a[i].name, b[i].name);
  175. expect (a[i].timestamp == b[i].timestamp);
  176. }
  177. }
  178. void runTest() override
  179. {
  180. std::deque<AnalyticsDestination::AnalyticsEvent> testEvents;
  181. for (int i = 0; i < 7; ++i)
  182. testEvents.push_back ({ String (i), 0, Time::getMillisecondCounter(), {}, "TestUser", {} });
  183. std::deque<AnalyticsDestination::AnalyticsEvent> loggedEvents, unloggedEvents;
  184. beginTest ("New events");
  185. {
  186. DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
  187. for (auto& event : testEvents)
  188. destination.logEvent (event);
  189. size_t waitTime = 0, numLoggedEvents = 0;
  190. while (numLoggedEvents < testEvents.size())
  191. {
  192. if (waitTime > 4000)
  193. {
  194. expect (waitTime < 4000);
  195. break;
  196. }
  197. Thread::sleep (40);
  198. waitTime += 40;
  199. const ScopedLock lock (destination.eventQueueChanging);
  200. numLoggedEvents = loggedEvents.size();
  201. }
  202. }
  203. compareEventQueues (loggedEvents, testEvents);
  204. expect (unloggedEvents.size() == 0);
  205. loggedEvents.clear();
  206. beginTest ("Unlogged events");
  207. {
  208. DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
  209. destination.setLoggingEnabled (false);
  210. for (auto& event : testEvents)
  211. destination.logEvent (event);
  212. }
  213. compareEventQueues (unloggedEvents, testEvents);
  214. expect (loggedEvents.size() == 0);
  215. }
  216. };
  217. static ThreadedAnalyticsDestinationTests threadedAnalyticsDestinationTests;
  218. #endif
  219. } // namespace juce