| @@ -1,7 +1,8 @@ | |||||
| #include <map> | #include <map> | ||||
| #include <utility> | #include <utility> | ||||
| #include <deque> | |||||
| #include <queue> | |||||
| #include <mutex> | #include <mutex> | ||||
| #include <tuple> | |||||
| #include <midi.hpp> | #include <midi.hpp> | ||||
| #include <string.hpp> | #include <string.hpp> | ||||
| @@ -286,11 +287,26 @@ std::vector<int> Input::getChannels() { | |||||
| // InputQueue | // InputQueue | ||||
| //////////////////// | //////////////////// | ||||
| struct SeqMessage { | |||||
| Message message; | |||||
| uint64_t seq; | |||||
| /** Returns whether `this` is LATER in time than `other`. | |||||
| If frame is same, compares `seq`. | |||||
| */ | |||||
| bool operator<(const SeqMessage& other) const { | |||||
| return std::make_tuple(message.getFrame(), seq) > std::make_tuple(other.message.getFrame(), other.seq); | |||||
| } | |||||
| }; | |||||
| static const size_t InputQueue_maxSize = 8192; | static const size_t InputQueue_maxSize = 8192; | ||||
| struct InputQueue::Internal { | struct InputQueue::Internal { | ||||
| std::deque<Message> queue; | |||||
| std::priority_queue<SeqMessage> queue; | |||||
| std::mutex mutex; | std::mutex mutex; | ||||
| /** Index to preserve ordering for priority_queue since it's unstable. | |||||
| */ | |||||
| uint64_t nextSeq = 0; | |||||
| }; | }; | ||||
| InputQueue::InputQueue() { | InputQueue::InputQueue() { | ||||
| @@ -306,16 +322,9 @@ void InputQueue::onMessage(const Message& message) { | |||||
| // Reject MIDI message if queue is full | // Reject MIDI message if queue is full | ||||
| if (internal->queue.size() >= InputQueue_maxSize) | if (internal->queue.size() >= InputQueue_maxSize) | ||||
| return; | return; | ||||
| // Message timestamp must be monotonically increasing, otherwise clear the queue. | |||||
| if (!internal->queue.empty()) { | |||||
| const Message& lastMessage = internal->queue.back(); | |||||
| if (message.getFrame() < lastMessage.getFrame()) { | |||||
| WARN("MIDI message at frame %lld added to InputQueue after later message at frame %lld", (long long) message.getFrame(), (long long) lastMessage.getFrame()); | |||||
| internal->queue.clear(); | |||||
| } | |||||
| } | |||||
| // Push to queue | // Push to queue | ||||
| internal->queue.push_back(message); | |||||
| internal->queue.push({message, internal->nextSeq}); | |||||
| internal->nextSeq++; | |||||
| } | } | ||||
| bool InputQueue::tryPop(Message* messageOut, int64_t maxFrame) { | bool InputQueue::tryPop(Message* messageOut, int64_t maxFrame) { | ||||
| @@ -328,10 +337,10 @@ bool InputQueue::tryPop(Message* messageOut, int64_t maxFrame) { | |||||
| if (internal->queue.empty()) | if (internal->queue.empty()) | ||||
| return false; | return false; | ||||
| const Message& message = internal->queue.front(); | |||||
| if (message.getFrame() <= maxFrame) { | |||||
| *messageOut = message; | |||||
| internal->queue.pop_front(); | |||||
| const SeqMessage& s = internal->queue.top(); | |||||
| if (s.message.getFrame() <= maxFrame) { | |||||
| *messageOut = s.message; | |||||
| internal->queue.pop(); | |||||
| return true; | return true; | ||||
| } | } | ||||