Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(220)

Side by Side Diff: mojo/edk/system/ports/message_queue.cc

Issue 1585493002: [mojo] Ports EDK (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/ports/message_queue.h"
6
7 #include <algorithm>
8
9 #include "base/logging.h"
10 #include "mojo/edk/system/ports/event.h"
11
12 namespace mojo {
13 namespace edk {
14 namespace ports {
15
16 inline uint64_t GetSequenceNum(const ScopedMessage& message) {
17 return GetEventData<UserEventData>(*message)->sequence_num;
18 }
19
20 // Used by std::{push,pop}_heap functions
21 inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) {
22 return GetSequenceNum(a) > GetSequenceNum(b);
23 }
24
25 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {}
26
27 MessageQueue::MessageQueue(uint64_t next_sequence_num)
28 : next_sequence_num_(next_sequence_num) {
29 // The message queue is blocked waiting for a message with sequence number
30 // equal to |next_sequence_num|.
31 }
32
33 MessageQueue::~MessageQueue() {
34 #ifndef NDEBUG
35 size_t num_leaked_ports = 0;
36 for (const auto& message : heap_)
37 num_leaked_ports += message->num_ports();
38 DLOG_IF(WARNING, num_leaked_ports > 0)
39 << "Leaking " << num_leaked_ports << " ports in unreceived messages";
40 #endif
41 }
42
43 bool MessageQueue::HasNextMessage() const {
44 return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_;
45 }
46
47 void MessageQueue::GetNextMessageIf(
48 std::function<bool(const Message&)> selector,
49 ScopedMessage* message) {
50 if (!HasNextMessage() || (selector && !selector(*heap_[0].get()))) {
51 message->reset();
52 return;
53 }
54
55 std::pop_heap(heap_.begin(), heap_.end());
56 *message = std::move(heap_.back());
57 heap_.pop_back();
58
59 next_sequence_num_++;
60 }
61
62 void MessageQueue::AcceptMessage(ScopedMessage message,
63 bool* has_next_message) {
64 DCHECK(GetEventHeader(*message)->type == EventType::kUser);
65
66 // TODO: Handle sequence number roll-over.
67
68 heap_.emplace_back(std::move(message));
69 std::push_heap(heap_.begin(), heap_.end());
70
71 if (!signalable_) {
72 *has_next_message = false;
73 } else {
74 *has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_);
75 }
76 }
77
78 void MessageQueue::GetReferencedPorts(std::deque<PortName>* port_names) {
79 for (const auto& message : heap_) {
80 for (size_t i = 0; i < message->num_ports(); ++i)
81 port_names->push_back(message->ports()[i]);
82 }
83 }
84
85 } // namespace ports
86 } // namespace edk
87 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698