Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(832)

Unified Diff: mojo/edk/system/ports/message_queue.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/ports/message_queue.cc
diff --git a/mojo/edk/system/ports/message_queue.cc b/mojo/edk/system/ports/message_queue.cc
new file mode 100644
index 0000000000000000000000000000000000000000..08c534d130c91026b9aeaa2382fc6cd54d44efe0
--- /dev/null
+++ b/mojo/edk/system/ports/message_queue.cc
@@ -0,0 +1,87 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/ports/message_queue.h"
+
+#include <algorithm>
+
+#include "base/logging.h"
+#include "mojo/edk/system/ports/event.h"
+
+namespace mojo {
+namespace edk {
+namespace ports {
+
+inline uint64_t GetSequenceNum(const ScopedMessage& message) {
+ return GetEventData<UserEventData>(*message)->sequence_num;
+}
+
+// Used by std::{push,pop}_heap functions
+inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) {
+ return GetSequenceNum(a) > GetSequenceNum(b);
+}
+
+MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
+
+MessageQueue::MessageQueue(uint64_t next_sequence_num)
+ : next_sequence_num_(next_sequence_num) {
+ // The message queue is blocked waiting for a message with sequence number
+ // equal to |next_sequence_num|.
+}
+
+MessageQueue::~MessageQueue() {
+#ifndef NDEBUG
+ size_t num_leaked_ports = 0;
+ for (const auto& message : heap_)
+ num_leaked_ports += message->num_ports();
+ DLOG_IF(WARNING, num_leaked_ports > 0)
+ << "Leaking " << num_leaked_ports << " ports in unreceived messages";
+#endif
+}
+
+bool MessageQueue::HasNextMessage() const {
+ return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_;
+}
+
+void MessageQueue::GetNextMessageIf(
+ std::function<bool(const Message&)> selector,
+ ScopedMessage* message) {
+ if (!HasNextMessage() || (selector && !selector(*heap_[0].get()))) {
+ message->reset();
+ return;
+ }
+
+ std::pop_heap(heap_.begin(), heap_.end());
+ *message = std::move(heap_.back());
+ heap_.pop_back();
+
+ next_sequence_num_++;
+}
+
+void MessageQueue::AcceptMessage(ScopedMessage message,
+ bool* has_next_message) {
+ DCHECK(GetEventHeader(*message)->type == EventType::kUser);
+
+ // TODO: Handle sequence number roll-over.
+
+ heap_.emplace_back(std::move(message));
+ std::push_heap(heap_.begin(), heap_.end());
+
+ if (!signalable_) {
+ *has_next_message = false;
+ } else {
+ *has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_);
+ }
+}
+
+void MessageQueue::GetReferencedPorts(std::deque<PortName>* port_names) {
+ for (const auto& message : heap_) {
+ for (size_t i = 0; i < message->num_ports(); ++i)
+ port_names->push_back(message->ports()[i]);
+ }
+}
+
+} // namespace ports
+} // namespace edk
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698