OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "mojo/edk/system/ports/message_queue.h" |
| 6 |
| 7 #include <algorithm> |
| 8 |
| 9 #include "base/logging.h" |
| 10 #include "mojo/edk/system/ports/event.h" |
| 11 |
| 12 namespace mojo { |
| 13 namespace edk { |
| 14 namespace ports { |
| 15 |
| 16 inline uint64_t GetSequenceNum(const ScopedMessage& message) { |
| 17 return GetEventData<UserEventData>(*message)->sequence_num; |
| 18 } |
| 19 |
| 20 // Used by std::{push,pop}_heap functions |
| 21 inline bool operator<(const ScopedMessage& a, const ScopedMessage& b) { |
| 22 return GetSequenceNum(a) > GetSequenceNum(b); |
| 23 } |
| 24 |
| 25 MessageQueue::MessageQueue() : MessageQueue(kInitialSequenceNum) {} |
| 26 |
| 27 MessageQueue::MessageQueue(uint64_t next_sequence_num) |
| 28 : next_sequence_num_(next_sequence_num) { |
| 29 // The message queue is blocked waiting for a message with sequence number |
| 30 // equal to |next_sequence_num|. |
| 31 } |
| 32 |
| 33 MessageQueue::~MessageQueue() { |
| 34 #ifndef NDEBUG |
| 35 size_t num_leaked_ports = 0; |
| 36 for (const auto& message : heap_) |
| 37 num_leaked_ports += message->num_ports(); |
| 38 DLOG_IF(WARNING, num_leaked_ports > 0) |
| 39 << "Leaking " << num_leaked_ports << " ports in unreceived messages"; |
| 40 #endif |
| 41 } |
| 42 |
| 43 bool MessageQueue::HasNextMessage() const { |
| 44 return !heap_.empty() && GetSequenceNum(heap_[0]) == next_sequence_num_; |
| 45 } |
| 46 |
| 47 void MessageQueue::GetNextMessageIf( |
| 48 std::function<bool(const Message&)> selector, |
| 49 ScopedMessage* message) { |
| 50 if (!HasNextMessage() || (selector && !selector(*heap_[0].get()))) { |
| 51 message->reset(); |
| 52 return; |
| 53 } |
| 54 |
| 55 std::pop_heap(heap_.begin(), heap_.end()); |
| 56 *message = std::move(heap_.back()); |
| 57 heap_.pop_back(); |
| 58 |
| 59 next_sequence_num_++; |
| 60 } |
| 61 |
| 62 void MessageQueue::AcceptMessage(ScopedMessage message, |
| 63 bool* has_next_message) { |
| 64 DCHECK(GetEventHeader(*message)->type == EventType::kUser); |
| 65 |
| 66 // TODO: Handle sequence number roll-over. |
| 67 |
| 68 heap_.emplace_back(std::move(message)); |
| 69 std::push_heap(heap_.begin(), heap_.end()); |
| 70 |
| 71 if (!signalable_) { |
| 72 *has_next_message = false; |
| 73 } else { |
| 74 *has_next_message = (GetSequenceNum(heap_[0]) == next_sequence_num_); |
| 75 } |
| 76 } |
| 77 |
| 78 void MessageQueue::GetReferencedPorts(std::deque<PortName>* port_names) { |
| 79 for (const auto& message : heap_) { |
| 80 for (size_t i = 0; i < message->num_ports(); ++i) |
| 81 port_names->push_back(message->ports()[i]); |
| 82 } |
| 83 } |
| 84 |
| 85 } // namespace ports |
| 86 } // namespace edk |
| 87 } // namespace mojo |
OLD | NEW |