|  | /*
  ==============================================================================
   This file is part of the JUCE library.
   Copyright (c) 2017 - ROLI Ltd.
   JUCE is an open source library subject to commercial or open-source
   licensing.
   By using JUCE, you agree to the terms of both the JUCE 5 End-User License
   Agreement and JUCE 5 Privacy Policy (both updated and effective as of the
   27th April 2017).
   End User License Agreement: www.juce.com/juce-5-licence
   Privacy Policy: www.juce.com/juce-5-privacy-policy
   Or: You may also use this code under the terms of the GPL v3 (see
   www.gnu.org/licenses).
   JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
   EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
   DISCLAIMED.
  ==============================================================================
*/
namespace juce
{
ThreadedAnalyticsDestination::ThreadedAnalyticsDestination (const String& threadName)
    : dispatcher (threadName, *this)
{}
ThreadedAnalyticsDestination::~ThreadedAnalyticsDestination()
{
    // If you hit this assertion then the analytics thread has not been shut down
    // before this class is destroyed. Call stopAnalyticsThread() in your destructor!
    jassert (! dispatcher.isThreadRunning());
}
void ThreadedAnalyticsDestination::setBatchPeriod (int newBatchPeriodMilliseconds)
{
    dispatcher.batchPeriodMilliseconds = newBatchPeriodMilliseconds;
}
void ThreadedAnalyticsDestination::logEvent (const AnalyticsEvent& event)
{
    dispatcher.addToQueue (event);
}
void ThreadedAnalyticsDestination::startAnalyticsThread (int initialBatchPeriodMilliseconds)
{
    setBatchPeriod (initialBatchPeriodMilliseconds);
    dispatcher.startThread();
}
void ThreadedAnalyticsDestination::stopAnalyticsThread (int timeout)
{
    dispatcher.signalThreadShouldExit();
    stopLoggingEvents();
    dispatcher.stopThread (timeout);
    if (dispatcher.eventQueue.size() > 0)
        saveUnloggedEvents (dispatcher.eventQueue);
}
ThreadedAnalyticsDestination::EventDispatcher::EventDispatcher (const String& threadName,
                                                                ThreadedAnalyticsDestination& destination)
    : Thread (threadName),
      parent (destination)
{}
void ThreadedAnalyticsDestination::EventDispatcher::run()
{
    // We may have inserted some events into the queue (on the message thread)
    // before this thread has started, so make sure the old events are at the
    // front of the queue.
    {
        std::deque<AnalyticsEvent> restoredEventQueue;
        parent.restoreUnloggedEvents (restoredEventQueue);
        const ScopedLock lock (queueAccess);
        for (auto rit = restoredEventQueue.rbegin(); rit != restoredEventQueue.rend(); ++rit)
            eventQueue.push_front (*rit);
    }
    const int maxBatchSize = parent.getMaximumBatchSize();
    while (! threadShouldExit())
    {
        {
            const auto numEventsInBatch = eventsToSend.size();
            const auto freeBatchCapacity = maxBatchSize - numEventsInBatch;
            if (freeBatchCapacity > 0)
            {
                const auto numNewEvents = (int) eventQueue.size() - numEventsInBatch;
                if (numNewEvents > 0)
                {
                    const ScopedLock lock (queueAccess);
                    const auto numEventsToAdd = jmin (numNewEvents, freeBatchCapacity);
                    const auto newBatchSize = numEventsInBatch + numEventsToAdd;
                    for (size_t i = numEventsInBatch; i < (size_t) newBatchSize; ++i)
                        eventsToSend.add (eventQueue[i]);
                }
            }
        }
        const auto submissionTime = Time::getMillisecondCounter();
        if (! eventsToSend.isEmpty())
        {
            if (parent.logBatchedEvents (eventsToSend))
            {
                const ScopedLock lock (queueAccess);
                for (auto i = 0; i < eventsToSend.size(); ++i)
                    eventQueue.pop_front();
                eventsToSend.clearQuick();
            }
        }
        while (Time::getMillisecondCounter() - submissionTime < (uint32) batchPeriodMilliseconds.get())
        {
            if (threadShouldExit())
                return;
            Thread::sleep (100);
        }
    }
}
void ThreadedAnalyticsDestination::EventDispatcher::addToQueue (const AnalyticsEvent& event)
{
    const ScopedLock lock (queueAccess);
    eventQueue.push_back (event);
}
//==============================================================================
#if JUCE_UNIT_TESTS
namespace DestinationTestHelpers
{
    //==============================================================================
    struct BasicDestination   : public ThreadedAnalyticsDestination
    {
        BasicDestination (std::deque<AnalyticsEvent>& loggedEvents,
                          std::deque<AnalyticsEvent>& unloggedEvents)
            : ThreadedAnalyticsDestination ("ThreadedAnalyticsDestinationTest"),
              loggedEventQueue (loggedEvents),
              unloggedEventStore (unloggedEvents)
        {
            startAnalyticsThread (20);
        }
        ~BasicDestination()
        {
            stopAnalyticsThread (1000);
        }
        int getMaximumBatchSize() override
        {
            return 5;
        }
        void saveUnloggedEvents (const std::deque<AnalyticsEvent>& eventsToSave) override
        {
            unloggedEventStore = eventsToSave;
        }
        void restoreUnloggedEvents (std::deque<AnalyticsEvent>& restoredEventQueue) override
        {
            restoredEventQueue = unloggedEventStore;
        }
        bool logBatchedEvents (const Array<AnalyticsEvent>& events) override
        {
            jassert (events.size() <= getMaximumBatchSize());
            if (loggingIsEnabled)
            {
                const ScopedLock lock (eventQueueChanging);
                for (auto& event : events)
                    loggedEventQueue.push_back (event);
                return true;
            }
            return false;
        }
        void stopLoggingEvents() override {}
        void setLoggingEnabled (bool shouldLogEvents)
        {
            loggingIsEnabled = shouldLogEvents;
        }
        std::deque<AnalyticsEvent>& loggedEventQueue;
        std::deque<AnalyticsEvent>& unloggedEventStore;
        bool loggingIsEnabled = true;
        CriticalSection eventQueueChanging;
    };
}
//==============================================================================
struct ThreadedAnalyticsDestinationTests   : public UnitTest
{
    ThreadedAnalyticsDestinationTests()
        : UnitTest ("ThreadedAnalyticsDestination")
    {}
    void compareEventQueues (const std::deque<AnalyticsDestination::AnalyticsEvent>& a,
                             const std::deque<AnalyticsDestination::AnalyticsEvent>& b)
    {
        const auto numEntries = a.size();
        expectEquals ((int) b.size(), (int) numEntries);
        for (size_t i = 0; i < numEntries; ++i)
        {
            expectEquals (a[i].name, b[i].name);
            expect (a[i].timestamp == b[i].timestamp);
        }
    }
    void runTest() override
    {
        std::deque<AnalyticsDestination::AnalyticsEvent> testEvents;
        for (int i = 0; i < 7; ++i)
            testEvents.push_back ({ String (i), 0, Time::getMillisecondCounter(), {}, "TestUser", {} });
        std::deque<AnalyticsDestination::AnalyticsEvent> loggedEvents, unloggedEvents;
        beginTest ("New events");
        {
            DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
            for (auto& event : testEvents)
                destination.logEvent (event);
            size_t waitTime = 0, numLoggedEvents = 0;
            while (numLoggedEvents < testEvents.size())
            {
                if (waitTime > 4000)
                {
                    expect (waitTime < 4000);
                    break;
                }
                Thread::sleep (40);
                waitTime += 40;
                const ScopedLock lock (destination.eventQueueChanging);
                numLoggedEvents = loggedEvents.size();
            }
        }
        compareEventQueues (loggedEvents, testEvents);
        expect (unloggedEvents.size() == 0);
        loggedEvents.clear();
        beginTest ("Unlogged events");
        {
            DestinationTestHelpers::BasicDestination destination (loggedEvents, unloggedEvents);
            destination.setLoggingEnabled (false);
            for (auto& event : testEvents)
                destination.logEvent (event);
        }
        compareEventQueues (unloggedEvents, testEvents);
        expect (loggedEvents.size() == 0);
    }
};
static ThreadedAnalyticsDestinationTests threadedAnalyticsDestinationTests;
#endif
} // namespace juce
 |