OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
11 #include "base/macros.h" | 11 #include "base/macros.h" |
12 #include "base/memory/ref_counted.h" | 12 #include "base/memory/ref_counted.h" |
13 #include "mojo/edk/embedder/embedder_internal.h" | 13 #include "mojo/edk/embedder/embedder_internal.h" |
14 #include "mojo/edk/system/core.h" | 14 #include "mojo/edk/system/core.h" |
15 #include "mojo/edk/system/message_for_transit.h" | 15 #include "mojo/edk/system/message_for_transit.h" |
16 #include "mojo/edk/system/node_controller.h" | 16 #include "mojo/edk/system/node_controller.h" |
| 17 #include "mojo/edk/system/ports/message_filter.h" |
17 #include "mojo/edk/system/ports_message.h" | 18 #include "mojo/edk/system/ports_message.h" |
18 #include "mojo/edk/system/request_context.h" | 19 #include "mojo/edk/system/request_context.h" |
19 | 20 |
20 namespace mojo { | 21 namespace mojo { |
21 namespace edk { | 22 namespace edk { |
22 | 23 |
23 namespace { | 24 namespace { |
24 | 25 |
25 using DispatcherHeader = MessageForTransit::DispatcherHeader; | 26 using DispatcherHeader = MessageForTransit::DispatcherHeader; |
26 using MessageHeader = MessageForTransit::MessageHeader; | 27 using MessageHeader = MessageForTransit::MessageHeader; |
(...skipping 25 matching lines...) Expand all Loading... |
52 ~PortObserverThunk() override {} | 53 ~PortObserverThunk() override {} |
53 | 54 |
54 // NodeController::PortObserver: | 55 // NodeController::PortObserver: |
55 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } | 56 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
56 | 57 |
57 scoped_refptr<MessagePipeDispatcher> dispatcher_; | 58 scoped_refptr<MessagePipeDispatcher> dispatcher_; |
58 | 59 |
59 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); | 60 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
60 }; | 61 }; |
61 | 62 |
| 63 // A MessageFilter used by ReadMessage to determine whether a message should |
| 64 // actually be consumed yet. |
| 65 class ReadMessageFilter : public ports::MessageFilter { |
| 66 public: |
| 67 // Creates a new ReadMessageFilter which captures and potentially modifies |
| 68 // various (unowned) local state within MessagePipeDispatcher::ReadMessage. |
| 69 ReadMessageFilter(bool read_any_size, |
| 70 bool may_discard, |
| 71 uint32_t* num_bytes, |
| 72 uint32_t* num_handles, |
| 73 bool* no_space, |
| 74 bool* invalid_message) |
| 75 : read_any_size_(read_any_size), |
| 76 may_discard_(may_discard), |
| 77 num_bytes_(num_bytes), |
| 78 num_handles_(num_handles), |
| 79 no_space_(no_space), |
| 80 invalid_message_(invalid_message) {} |
| 81 |
| 82 ~ReadMessageFilter() override {} |
| 83 |
| 84 // ports::MessageFilter: |
| 85 bool Match(const ports::Message& m) override { |
| 86 const PortsMessage& message = static_cast<const PortsMessage&>(m); |
| 87 if (message.num_payload_bytes() < sizeof(MessageHeader)) { |
| 88 *invalid_message_ = true; |
| 89 return true; |
| 90 } |
| 91 |
| 92 const MessageHeader* header = |
| 93 static_cast<const MessageHeader*>(message.payload_bytes()); |
| 94 if (header->header_size > message.num_payload_bytes()) { |
| 95 *invalid_message_ = true; |
| 96 return true; |
| 97 } |
| 98 |
| 99 uint32_t bytes_to_read = 0; |
| 100 uint32_t bytes_available = |
| 101 static_cast<uint32_t>(message.num_payload_bytes()) - |
| 102 header->header_size; |
| 103 if (num_bytes_) { |
| 104 bytes_to_read = std::min(*num_bytes_, bytes_available); |
| 105 *num_bytes_ = bytes_available; |
| 106 } |
| 107 |
| 108 uint32_t handles_to_read = 0; |
| 109 uint32_t handles_available = header->num_dispatchers; |
| 110 if (num_handles_) { |
| 111 handles_to_read = std::min(*num_handles_, handles_available); |
| 112 *num_handles_ = handles_available; |
| 113 } |
| 114 |
| 115 if (handles_to_read < handles_available || |
| 116 (!read_any_size_ && bytes_to_read < bytes_available)) { |
| 117 *no_space_ = true; |
| 118 return may_discard_; |
| 119 } |
| 120 |
| 121 return true; |
| 122 } |
| 123 |
| 124 private: |
| 125 const bool read_any_size_; |
| 126 const bool may_discard_; |
| 127 uint32_t* const num_bytes_; |
| 128 uint32_t* const num_handles_; |
| 129 bool* const no_space_; |
| 130 bool* const invalid_message_; |
| 131 |
| 132 DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter); |
| 133 }; |
| 134 |
| 135 #if DCHECK_IS_ON() |
| 136 |
| 137 // A MessageFilter which never matches a message. Used to peek at the size of |
| 138 // the next available message on a port, for debug logging only. |
| 139 class PeekSizeMessageFilter : public ports::MessageFilter { |
| 140 public: |
| 141 PeekSizeMessageFilter() {} |
| 142 ~PeekSizeMessageFilter() override {} |
| 143 |
| 144 // ports::MessageFilter: |
| 145 bool Match(const ports::Message& message) override { |
| 146 message_size_ = message.num_payload_bytes(); |
| 147 return false; |
| 148 } |
| 149 |
| 150 size_t message_size() const { return message_size_; } |
| 151 |
| 152 private: |
| 153 size_t message_size_ = 0; |
| 154 |
| 155 DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter); |
| 156 }; |
| 157 |
| 158 #endif // DCHECK_IS_ON() |
| 159 |
62 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, | 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
63 const ports::PortRef& port, | 161 const ports::PortRef& port, |
64 uint64_t pipe_id, | 162 uint64_t pipe_id, |
65 int endpoint) | 163 int endpoint) |
66 : node_controller_(node_controller), | 164 : node_controller_(node_controller), |
67 port_(port), | 165 port_(port), |
68 pipe_id_(pipe_id), | 166 pipe_id_(pipe_id), |
69 endpoint_(endpoint) { | 167 endpoint_(endpoint) { |
70 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
71 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
179 bool invalid_message = false; | 277 bool invalid_message = false; |
180 | 278 |
181 // Grab a message if the provided handles buffer is large enough. If the input | 279 // Grab a message if the provided handles buffer is large enough. If the input |
182 // |num_bytes| is provided and |read_any_size| is false, we also ensure | 280 // |num_bytes| is provided and |read_any_size| is false, we also ensure |
183 // that it specifies a size at least as large as the next available payload. | 281 // that it specifies a size at least as large as the next available payload. |
184 // | 282 // |
185 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. | 283 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. |
186 // This flag exists to support both new and old API behavior. | 284 // This flag exists to support both new and old API behavior. |
187 | 285 |
188 ports::ScopedMessage ports_message; | 286 ports::ScopedMessage ports_message; |
189 int rv = node_controller_->node()->GetMessageIf( | 287 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, |
190 port_, | 288 &no_space, &invalid_message); |
191 [read_any_size, num_bytes, num_handles, &no_space, &may_discard, | 289 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); |
192 &invalid_message]( | |
193 const ports::Message& next_message) { | |
194 const PortsMessage& message = | |
195 static_cast<const PortsMessage&>(next_message); | |
196 if (message.num_payload_bytes() < sizeof(MessageHeader)) { | |
197 invalid_message = true; | |
198 return true; | |
199 } | |
200 | |
201 const MessageHeader* header = | |
202 static_cast<const MessageHeader*>(message.payload_bytes()); | |
203 if (header->header_size > message.num_payload_bytes()) { | |
204 invalid_message = true; | |
205 return true; | |
206 } | |
207 | |
208 uint32_t bytes_to_read = 0; | |
209 uint32_t bytes_available = | |
210 static_cast<uint32_t>(message.num_payload_bytes()) - | |
211 header->header_size; | |
212 if (num_bytes) { | |
213 bytes_to_read = std::min(*num_bytes, bytes_available); | |
214 *num_bytes = bytes_available; | |
215 } | |
216 | |
217 uint32_t handles_to_read = 0; | |
218 uint32_t handles_available = header->num_dispatchers; | |
219 if (num_handles) { | |
220 handles_to_read = std::min(*num_handles, handles_available); | |
221 *num_handles = handles_available; | |
222 } | |
223 | |
224 if (handles_to_read < handles_available || | |
225 (!read_any_size && bytes_to_read < bytes_available)) { | |
226 no_space = true; | |
227 return may_discard; | |
228 } | |
229 | |
230 return true; | |
231 }, | |
232 &ports_message); | |
233 | 290 |
234 if (invalid_message) | 291 if (invalid_message) |
235 return MOJO_RESULT_UNKNOWN; | 292 return MOJO_RESULT_UNKNOWN; |
236 | 293 |
237 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 294 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
238 if (rv == ports::ERROR_PORT_UNKNOWN || | 295 if (rv == ports::ERROR_PORT_UNKNOWN || |
239 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 296 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
240 return MOJO_RESULT_INVALID_ARGUMENT; | 297 return MOJO_RESULT_INVALID_ARGUMENT; |
241 | 298 |
242 NOTREACHED(); | 299 NOTREACHED(); |
(...skipping 280 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
523 // with events which are raised right before that happens. This is fine to | 580 // with events which are raised right before that happens. This is fine to |
524 // ignore. | 581 // ignore. |
525 if (port_transferred_) | 582 if (port_transferred_) |
526 return; | 583 return; |
527 | 584 |
528 #if DCHECK_IS_ON() | 585 #if DCHECK_IS_ON() |
529 ports::PortStatus port_status; | 586 ports::PortStatus port_status; |
530 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) { | 587 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) { |
531 if (port_status.has_messages) { | 588 if (port_status.has_messages) { |
532 ports::ScopedMessage unused; | 589 ports::ScopedMessage unused; |
533 size_t message_size = 0; | 590 PeekSizeMessageFilter filter; |
534 node_controller_->node()->GetMessageIf( | 591 node_controller_->node()->GetMessage(port_, &unused, &filter); |
535 port_, [&message_size](const ports::Message& message) { | |
536 message_size = message.num_payload_bytes(); | |
537 return false; | |
538 }, &unused); | |
539 DVLOG(4) << "New message detected on message pipe " << pipe_id_ | 592 DVLOG(4) << "New message detected on message pipe " << pipe_id_ |
540 << " endpoint " << endpoint_ << " [port=" << port_.name() | 593 << " endpoint " << endpoint_ << " [port=" << port_.name() |
541 << "; size=" << message_size << "]"; | 594 << "; size=" << filter.message_size() << "]"; |
542 } | 595 } |
543 if (port_status.peer_closed) { | 596 if (port_status.peer_closed) { |
544 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ | 597 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |
545 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 598 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
546 } | 599 } |
547 } | 600 } |
548 #endif | 601 #endif |
549 | 602 |
550 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 603 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
551 } | 604 } |
552 | 605 |
553 } // namespace edk | 606 } // namespace edk |
554 } // namespace mojo | 607 } // namespace mojo |
OLD | NEW |