| @@ -139,136 +139,63 @@ void ThreadedAnalyticsDestination::EventDispatcher::addToQueue (const AnalyticsE | |||
| namespace DestinationTestHelpers | |||
| { | |||
| //============================================================================== | |||
| struct TestDestination : public ThreadedAnalyticsDestination | |||
| struct BasicDestination : public ThreadedAnalyticsDestination | |||
| { | |||
| TestDestination (std::deque<AnalyticsEvent>& loggedEvents, | |||
| std::deque<AnalyticsEvent>& unloggedEvents) | |||
| BasicDestination (std::deque<AnalyticsEvent>& loggedEvents, | |||
| std::deque<AnalyticsEvent>& unloggedEvents) | |||
| : ThreadedAnalyticsDestination ("ThreadedAnalyticsDestinationTest"), | |||
| loggedEventQueue (loggedEvents), | |||
| unloggedEventStore (unloggedEvents) | |||
| {} | |||
| virtual ~TestDestination() {} | |||
| int getMaximumBatchSize() override | |||
| { | |||
| return 5; | |||
| } | |||
| void saveUnloggedEvents (const std::deque<AnalyticsEvent>& eventsToSave) override | |||
| { | |||
| unloggedEventStore = eventsToSave; | |||
| } | |||
| void restoreUnloggedEvents (std::deque<AnalyticsEvent>& restoredEventQueue) override | |||
| { | |||
| restoredEventQueue = unloggedEventStore; | |||
| } | |||
| std::deque<AnalyticsEvent>& loggedEventQueue; | |||
| std::deque<AnalyticsEvent>& unloggedEventStore; | |||
| }; | |||
| //============================================================================== | |||
| struct BasicDestination : public TestDestination | |||
| { | |||
| BasicDestination (std::deque<AnalyticsEvent>& loggedEvents, | |||
| std::deque<AnalyticsEvent>& unloggedEvents) | |||
| : TestDestination (loggedEvents, unloggedEvents) | |||
| { | |||
| startAnalyticsThread (20); | |||
| } | |||
| virtual ~BasicDestination() | |||
| ~BasicDestination() | |||
| { | |||
| stopAnalyticsThread (1000); | |||
| } | |||
| bool logBatchedEvents (const Array<AnalyticsEvent>& events) override | |||
| int getMaximumBatchSize() override | |||
| { | |||
| jassert (events.size() <= getMaximumBatchSize()); | |||
| for (auto& event : events) | |||
| loggedEventQueue.push_back (event); | |||
| return true; | |||
| return 5; | |||
| } | |||
| void stopLoggingEvents() override {} | |||
| }; | |||
| //============================================================================== | |||
| struct SlowWebDestination : public TestDestination | |||
| { | |||
| SlowWebDestination (std::deque<AnalyticsEvent>& loggedEvents, | |||
| std::deque<AnalyticsEvent>& unloggedEvents) | |||
| : TestDestination (loggedEvents, unloggedEvents) | |||
| void saveUnloggedEvents (const std::deque<AnalyticsEvent>& eventsToSave) override | |||
| { | |||
| startAnalyticsThread (initialPeriod); | |||
| unloggedEventStore = eventsToSave; | |||
| } | |||
| virtual ~SlowWebDestination() | |||
| void restoreUnloggedEvents (std::deque<AnalyticsEvent>& restoredEventQueue) override | |||
| { | |||
| stopAnalyticsThread (1000); | |||
| restoredEventQueue = unloggedEventStore; | |||
| } | |||
| bool logBatchedEvents (const Array<AnalyticsEvent>& events) override | |||
| { | |||
| threadHasStarted.signal(); | |||
| jassert (events.size() <= getMaximumBatchSize()); | |||
| if (loggingIsEnabled) | |||
| { | |||
| const ScopedLock lock (webStreamCreation); | |||
| if (shouldExit) | |||
| return false; | |||
| const ScopedLock lock (eventQueueChanging); | |||
| // An attempt to connect to an unroutable IP address will hang | |||
| // indefinitely, which simulates a very slow server | |||
| webStream = new WebInputStream (URL ("http://1.192.0.0"), true); | |||
| for (auto& event : events) | |||
| loggedEventQueue.push_back (event); | |||
| } | |||
| String data; | |||
| for (auto& event : events) | |||
| data << event.name; | |||
| webStream->withExtraHeaders (data); | |||
| const auto success = webStream->connect (nullptr); | |||
| // Exponential backoff on failure | |||
| if (success) | |||
| period = initialPeriod; | |||
| else | |||
| period *= 2; | |||
| setBatchPeriod (period); | |||
| return success; | |||
| return true; | |||
| } | |||
| void stopLoggingEvents() override | |||
| { | |||
| const ScopedLock lock (webStreamCreation); | |||
| shouldExit = true; | |||
| void stopLoggingEvents() override {} | |||
| if (webStream != nullptr) | |||
| webStream->cancel(); | |||
| void setLoggingEnabled (bool shouldLogEvents) | |||
| { | |||
| loggingIsEnabled = shouldLogEvents; | |||
| } | |||
| const int initialPeriod = 100; | |||
| int period = initialPeriod; | |||
| CriticalSection webStreamCreation; | |||
| bool shouldExit = false; | |||
| ScopedPointer<WebInputStream> webStream; | |||
| WaitableEvent threadHasStarted; | |||
| std::deque<AnalyticsEvent>& loggedEventQueue; | |||
| std::deque<AnalyticsEvent>& unloggedEventStore; | |||
| bool loggingIsEnabled = true; | |||
| CriticalSection eventQueueChanging; | |||
| }; | |||
| } | |||
| @@ -301,47 +228,48 @@ struct ThreadedAnalyticsDestinationTests : public UnitTest | |||
| std::deque<AnalyticsDestination::AnalyticsEvent> loggedEvents, unloggedEvents; | |||
| beginTest ("Basic"); | |||
| beginTest ("New events"); | |||
| { | |||
| { | |||
| DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents); | |||
| DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents); | |||
| for (auto& event : testEvents) | |||
| destination.logEvent (event); | |||
| for (auto& event : testEvents) | |||
| destination.logEvent (event); | |||
| Thread::sleep (400); | |||
| } | |||
| size_t waitTime = 0, numLoggedEvents = 0; | |||
| compareEventQueues (loggedEvents, testEvents); | |||
| expect (unloggedEvents.size() == 0); | |||
| loggedEvents.clear(); | |||
| } | |||
| beginTest ("Web"); | |||
| { | |||
| while (numLoggedEvents < testEvents.size()) | |||
| { | |||
| DestinationTestHelpers::SlowWebDestination destination (loggedEvents, unloggedEvents); | |||
| if (waitTime > 4000) | |||
| { | |||
| expect (waitTime < 4000); | |||
| break; | |||
| } | |||
| Thread::sleep (40); | |||
| waitTime += 40; | |||
| for (auto& event : testEvents) | |||
| destination.logEvent (event); | |||
| const ScopedLock lock (destination.eventQueueChanging); | |||
| numLoggedEvents = loggedEvents.size(); | |||
| } | |||
| } | |||
| expect (loggedEvents.size() == 0); | |||
| compareEventQueues (unloggedEvents, testEvents); | |||
| compareEventQueues (loggedEvents, testEvents); | |||
| expect (unloggedEvents.size() == 0); | |||
| { | |||
| DestinationTestHelpers::SlowWebDestination destination (loggedEvents, unloggedEvents); | |||
| loggedEvents.clear(); | |||
| destination.threadHasStarted.wait(); | |||
| unloggedEvents.clear(); | |||
| } | |||
| expect (loggedEvents.size() == 0); | |||
| compareEventQueues (unloggedEvents, testEvents); | |||
| beginTest ("Unlogged events"); | |||
| { | |||
| DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents); | |||
| destination.setLoggingEnabled (false); | |||
| unloggedEvents.clear(); | |||
| for (auto& event : testEvents) | |||
| destination.logEvent (event); | |||
| } | |||
| compareEventQueues (unloggedEvents, testEvents); | |||
| expect (loggedEvents.size() == 0); | |||
| } | |||
| }; | |||