OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
11 #include "base/macros.h" | 11 #include "base/macros.h" |
12 #include "base/memory/ref_counted.h" | 12 #include "base/memory/ref_counted.h" |
13 #include "mojo/edk/embedder/embedder_internal.h" | 13 #include "mojo/edk/embedder/embedder_internal.h" |
14 #include "mojo/edk/system/core.h" | 14 #include "mojo/edk/system/core.h" |
15 #include "mojo/edk/system/message_for_transit.h" | 15 #include "mojo/edk/system/message_for_transit.h" |
16 #include "mojo/edk/system/node_controller.h" | 16 #include "mojo/edk/system/node_controller.h" |
17 #include "mojo/edk/system/ports/message_filter.h" | |
17 #include "mojo/edk/system/ports_message.h" | 18 #include "mojo/edk/system/ports_message.h" |
18 #include "mojo/edk/system/request_context.h" | 19 #include "mojo/edk/system/request_context.h" |
19 | 20 |
20 namespace mojo { | 21 namespace mojo { |
21 namespace edk { | 22 namespace edk { |
22 | 23 |
23 namespace { | 24 namespace { |
24 | 25 |
25 using DispatcherHeader = MessageForTransit::DispatcherHeader; | 26 using DispatcherHeader = MessageForTransit::DispatcherHeader; |
26 using MessageHeader = MessageForTransit::MessageHeader; | 27 using MessageHeader = MessageForTransit::MessageHeader; |
(...skipping 25 matching lines...) Expand all Loading... | |
52 ~PortObserverThunk() override {} | 53 ~PortObserverThunk() override {} |
53 | 54 |
54 // NodeController::PortObserver: | 55 // NodeController::PortObserver: |
55 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } | 56 void OnPortStatusChanged() override { dispatcher_->OnPortStatusChanged(); } |
56 | 57 |
57 scoped_refptr<MessagePipeDispatcher> dispatcher_; | 58 scoped_refptr<MessagePipeDispatcher> dispatcher_; |
58 | 59 |
59 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); | 60 DISALLOW_COPY_AND_ASSIGN(PortObserverThunk); |
60 }; | 61 }; |
61 | 62 |
63 // A MessageFilter used by ReadMessage to determine whether a message should | |
64 // actually be consumed yet. | |
65 class ReadMessageFilter : public ports::MessageFilter { | |
66 public: | |
67 // Creates a new ReadMessageFilter which captures and potentially modifies | |
68 // various (unowned) local state within MessagePipeDispatcher::ReadMessage. | |
69 ReadMessageFilter(bool read_any_size, | |
70 bool may_discard, | |
71 uint32_t* num_bytes, | |
72 uint32_t* num_handles, | |
73 bool* no_space, | |
74 bool* invalid_message) | |
75 : read_any_size_(read_any_size), | |
76 may_discard_(may_discard), | |
77 num_bytes_(num_bytes), | |
78 num_handles_(num_handles), | |
79 no_space_(no_space), | |
80 invalid_message_(invalid_message) {} | |
81 | |
82 ~ReadMessageFilter() override {} | |
83 | |
84 // ports::MessageFilter: | |
85 bool Match(const ports::Message& m) override { | |
86 const PortsMessage& message = static_cast<const PortsMessage&>(m); | |
87 if (message.num_payload_bytes() < sizeof(MessageHeader)) { | |
88 *invalid_message_ = true; | |
89 return true; | |
90 } | |
91 | |
92 const MessageHeader* header = | |
93 static_cast<const MessageHeader*>(message.payload_bytes()); | |
94 if (header->header_size > message.num_payload_bytes()) { | |
95 *invalid_message_ = true; | |
96 return true; | |
97 } | |
98 | |
99 uint32_t bytes_to_read = 0; | |
100 uint32_t bytes_available = | |
101 static_cast<uint32_t>(message.num_payload_bytes()) - | |
102 header->header_size; | |
103 if (num_bytes_) { | |
104 bytes_to_read = std::min(*num_bytes_, bytes_available); | |
105 *num_bytes_ = bytes_available; | |
106 } | |
107 | |
108 uint32_t handles_to_read = 0; | |
109 uint32_t handles_available = header->num_dispatchers; | |
110 if (num_handles_) { | |
111 handles_to_read = std::min(*num_handles_, handles_available); | |
112 *num_handles_ = handles_available; | |
113 } | |
114 | |
115 if (handles_to_read < handles_available || | |
116 (!read_any_size_ && bytes_to_read < bytes_available)) { | |
117 *no_space_ = true; | |
118 return may_discard_; | |
119 } | |
120 | |
121 return true; | |
122 } | |
123 | |
124 private: | |
125 const bool read_any_size_; | |
126 const bool may_discard_; | |
127 uint32_t* const num_bytes_; | |
128 uint32_t* const num_handles_; | |
129 bool* const no_space_; | |
130 bool* invalid_message_; | |
yzshen1
2016/11/03 00:11:27
nit: now that the other pointers are made const, m
Ken Rockot(use gerrit already)
2016/11/03 01:25:07
done
| |
131 | |
132 DISALLOW_COPY_AND_ASSIGN(ReadMessageFilter); | |
133 }; | |
134 | |
135 #if DCHECK_IS_ON() | |
136 | |
137 // A MessageFilter which never matches a message. Used to peek at the size of | |
138 // the next available message on a port, for debug logging only. | |
139 class PeekSizeMessageFilter : public ports::MessageFilter { | |
140 public: | |
141 PeekSizeMessageFilter(size_t* size_storage) : size_storage_(size_storage) {} | |
142 ~PeekSizeMessageFilter() override {} | |
143 | |
144 // ports::MessageFilter: | |
145 bool Match(const ports::Message& message) override { | |
146 *size_storage_ = message.num_payload_bytes(); | |
147 return false; | |
148 } | |
149 | |
150 private: | |
151 size_t* const size_storage_; | |
152 | |
153 DISALLOW_COPY_AND_ASSIGN(PeekSizeMessageFilter); | |
154 }; | |
155 | |
156 #endif // DCHECK_IS_ON() | |
157 | |
62 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, | 158 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
63 const ports::PortRef& port, | 159 const ports::PortRef& port, |
64 uint64_t pipe_id, | 160 uint64_t pipe_id, |
65 int endpoint) | 161 int endpoint) |
66 : node_controller_(node_controller), | 162 : node_controller_(node_controller), |
67 port_(port), | 163 port_(port), |
68 pipe_id_(pipe_id), | 164 pipe_id_(pipe_id), |
69 endpoint_(endpoint) { | 165 endpoint_(endpoint) { |
70 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 166 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
71 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 167 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
179 bool invalid_message = false; | 275 bool invalid_message = false; |
180 | 276 |
181 // Grab a message if the provided handles buffer is large enough. If the input | 277 // Grab a message if the provided handles buffer is large enough. If the input |
182 // |num_bytes| is provided and |read_any_size| is false, we also ensure | 278 // |num_bytes| is provided and |read_any_size| is false, we also ensure |
183 // that it specifies a size at least as large as the next available payload. | 279 // that it specifies a size at least as large as the next available payload. |
184 // | 280 // |
185 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. | 281 // If |read_any_size| is true, the input value of |*num_bytes| is ignored. |
186 // This flag exists to support both new and old API behavior. | 282 // This flag exists to support both new and old API behavior. |
187 | 283 |
188 ports::ScopedMessage ports_message; | 284 ports::ScopedMessage ports_message; |
189 int rv = node_controller_->node()->GetMessageIf( | 285 ReadMessageFilter filter(read_any_size, may_discard, num_bytes, num_handles, |
190 port_, | 286 &no_space, &invalid_message); |
191 [read_any_size, num_bytes, num_handles, &no_space, &may_discard, | 287 int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); |
192 &invalid_message]( | |
193 const ports::Message& next_message) { | |
194 const PortsMessage& message = | |
195 static_cast<const PortsMessage&>(next_message); | |
196 if (message.num_payload_bytes() < sizeof(MessageHeader)) { | |
197 invalid_message = true; | |
198 return true; | |
199 } | |
200 | |
201 const MessageHeader* header = | |
202 static_cast<const MessageHeader*>(message.payload_bytes()); | |
203 if (header->header_size > message.num_payload_bytes()) { | |
204 invalid_message = true; | |
205 return true; | |
206 } | |
207 | |
208 uint32_t bytes_to_read = 0; | |
209 uint32_t bytes_available = | |
210 static_cast<uint32_t>(message.num_payload_bytes()) - | |
211 header->header_size; | |
212 if (num_bytes) { | |
213 bytes_to_read = std::min(*num_bytes, bytes_available); | |
214 *num_bytes = bytes_available; | |
215 } | |
216 | |
217 uint32_t handles_to_read = 0; | |
218 uint32_t handles_available = header->num_dispatchers; | |
219 if (num_handles) { | |
220 handles_to_read = std::min(*num_handles, handles_available); | |
221 *num_handles = handles_available; | |
222 } | |
223 | |
224 if (handles_to_read < handles_available || | |
225 (!read_any_size && bytes_to_read < bytes_available)) { | |
226 no_space = true; | |
227 return may_discard; | |
228 } | |
229 | |
230 return true; | |
231 }, | |
232 &ports_message); | |
233 | 288 |
234 if (invalid_message) | 289 if (invalid_message) |
235 return MOJO_RESULT_UNKNOWN; | 290 return MOJO_RESULT_UNKNOWN; |
236 | 291 |
237 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
238 if (rv == ports::ERROR_PORT_UNKNOWN || | 293 if (rv == ports::ERROR_PORT_UNKNOWN || |
239 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
240 return MOJO_RESULT_INVALID_ARGUMENT; | 295 return MOJO_RESULT_INVALID_ARGUMENT; |
241 | 296 |
242 NOTREACHED(); | 297 NOTREACHED(); |
(...skipping 281 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
524 // ignore. | 579 // ignore. |
525 if (port_transferred_) | 580 if (port_transferred_) |
526 return; | 581 return; |
527 | 582 |
528 #if DCHECK_IS_ON() | 583 #if DCHECK_IS_ON() |
529 ports::PortStatus port_status; | 584 ports::PortStatus port_status; |
530 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) { | 585 if (node_controller_->node()->GetStatus(port_, &port_status) == ports::OK) { |
531 if (port_status.has_messages) { | 586 if (port_status.has_messages) { |
532 ports::ScopedMessage unused; | 587 ports::ScopedMessage unused; |
533 size_t message_size = 0; | 588 size_t message_size = 0; |
534 node_controller_->node()->GetMessageIf( | 589 PeekSizeMessageFilter filter(&message_size); |
yzshen1
2016/11/03 00:11:27
Does it make sense to make message_size a member o
Ken Rockot(use gerrit already)
2016/11/03 01:25:07
done
| |
535 port_, [&message_size](const ports::Message& message) { | 590 node_controller_->node()->GetMessage(port_, &unused, &filter); |
536 message_size = message.num_payload_bytes(); | |
537 return false; | |
538 }, &unused); | |
539 DVLOG(4) << "New message detected on message pipe " << pipe_id_ | 591 DVLOG(4) << "New message detected on message pipe " << pipe_id_ |
540 << " endpoint " << endpoint_ << " [port=" << port_.name() | 592 << " endpoint " << endpoint_ << " [port=" << port_.name() |
541 << "; size=" << message_size << "]"; | 593 << "; size=" << message_size << "]"; |
542 } | 594 } |
543 if (port_status.peer_closed) { | 595 if (port_status.peer_closed) { |
544 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ | 596 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |
545 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 597 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
546 } | 598 } |
547 } | 599 } |
548 #endif | 600 #endif |
549 | 601 |
550 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 602 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
551 } | 603 } |
552 | 604 |
553 } // namespace edk | 605 } // namespace edk |
554 } // namespace mojo | 606 } // namespace mojo |
OLD | NEW |