Index: mojo/edk/system/ports/message_queue.cc |
diff --git a/mojo/edk/system/ports/message_queue.cc b/mojo/edk/system/ports/message_queue.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..08c534d130c91026b9aeaa2382fc6cd54d44efe0 |
--- /dev/null |
+++ b/mojo/edk/system/ports/message_queue.cc |
@@ -0,0 +1,87 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/ports/message_queue.h" |
+ |
+#include <algorithm> |
+ |
+#include "base/logging.h" |
+#include "mojo/edk/system/ports/event.h" |
+ |
+namespace mojo { |
+namespace edk { |
+namespace ports { |
+ |
+inline uint64_t GetSequenceNum(const ScopedMessage& message) { |
+ return GetEventData<UserEventData>(*message)->sequence_num; |
+} |
+ |
+// Used by std::{push,pop}_heap functions |
+inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) { |
+ return GetSequenceNum(a) > GetSequenceNum(b); |
+} |
+ |
+MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {} |
+ |
+MessageQueue::MessageQueue(uint64_t next_sequence_num) |
+ : next_sequence_num_(next_sequence_num) { |
+ // The message queue is blocked waiting for a message with sequence number |
+ // equal to |next_sequence_num|. |
+} |
+ |
+MessageQueue::~MessageQueue() { |
+#ifndef NDEBUG |
+ size_t num_leaked_ports = 0; |
+ for (const auto& message : heap_) |
+ num_leaked_ports += message->num_ports(); |
+ DLOG_IF(WARNING, num_leaked_ports > 0) |
+ << "Leaking " << num_leaked_ports << " ports in unreceived messages"; |
+#endif |
+} |
+ |
+bool MessageQueue::HasNextMessage() const { |
+ return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_; |
+} |
+ |
+void MessageQueue::GetNextMessageIf( |
+ std::function<bool(const Message&)> selector, |
+ ScopedMessage* message) { |
+ if (!HasNextMessage() || (selector && !selector(*heap_[0].get()))) { |
+ message->reset(); |
+ return; |
+ } |
+ |
+ std::pop_heap(heap_.begin(), heap_.end()); |
+ *message = std::move(heap_.back()); |
+ heap_.pop_back(); |
+ |
+ next_sequence_num_++; |
+} |
+ |
+void MessageQueue::AcceptMessage(ScopedMessage message, |
+ bool* has_next_message) { |
+ DCHECK(GetEventHeader(*message)->type == EventType::kUser); |
+ |
+ // TODO: Handle sequence number roll-over. |
+ |
+ heap_.emplace_back(std::move(message)); |
+ std::push_heap(heap_.begin(), heap_.end()); |
+ |
+ if (!signalable_) { |
+ *has_next_message = false; |
+ } else { |
+ *has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_); |
+ } |
+} |
+ |
+void MessageQueue::GetReferencedPorts(std::deque<PortName>* port_names) { |
+ for (const auto& message : heap_) { |
+ for (size_t i = 0; i < message->num_ports(); ++i) |
+ port_names->push_back(message->ports()[i]); |
+ } |
+} |
+ |
+} // namespace ports |
+} // namespace edk |
+} // namespace mojo |