OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 #include <memory> | 8 #include <memory> |
9 | 9 |
10 #include "base/logging.h" | 10 #include "base/logging.h" |
(...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
157 | 157 |
158 #endif // DCHECK_IS_ON() | 158 #endif // DCHECK_IS_ON() |
159 | 159 |
160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, | 160 MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
161 const ports::PortRef& port, | 161 const ports::PortRef& port, |
162 uint64_t pipe_id, | 162 uint64_t pipe_id, |
163 int endpoint) | 163 int endpoint) |
164 : node_controller_(node_controller), | 164 : node_controller_(node_controller), |
165 port_(port), | 165 port_(port), |
166 pipe_id_(pipe_id), | 166 pipe_id_(pipe_id), |
167 endpoint_(endpoint) { | 167 endpoint_(endpoint), |
| 168 watchers_(this) { |
168 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 169 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
169 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 170 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
170 | 171 |
171 node_controller_->SetPortObserver( | 172 node_controller_->SetPortObserver( |
172 port_, | 173 port_, |
173 make_scoped_refptr(new PortObserverThunk(this))); | 174 make_scoped_refptr(new PortObserverThunk(this))); |
174 } | 175 } |
175 | 176 |
176 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 177 bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { |
177 node_controller_->SetPortObserver(port_, nullptr); | 178 node_controller_->SetPortObserver(port_, nullptr); |
178 node_controller_->SetPortObserver(other->port_, nullptr); | 179 node_controller_->SetPortObserver(other->port_, nullptr); |
179 | 180 |
180 ports::PortRef port0; | 181 ports::PortRef port0; |
181 { | 182 { |
182 base::AutoLock lock(signal_lock_); | 183 base::AutoLock lock(signal_lock_); |
183 port0 = port_; | 184 port0 = port_; |
184 port_closed_.Set(true); | 185 port_closed_.Set(true); |
185 awakables_.CancelAll(); | 186 awakables_.CancelAll(); |
| 187 watchers_.NotifyClosed(); |
186 } | 188 } |
187 | 189 |
188 ports::PortRef port1; | 190 ports::PortRef port1; |
189 { | 191 { |
190 base::AutoLock lock(other->signal_lock_); | 192 base::AutoLock lock(other->signal_lock_); |
191 port1 = other->port_; | 193 port1 = other->port_; |
192 other->port_closed_.Set(true); | 194 other->port_closed_.Set(true); |
193 other->awakables_.CancelAll(); | 195 other->awakables_.CancelAll(); |
| 196 other->watchers_.NotifyClosed(); |
194 } | 197 } |
195 | 198 |
196 // Both ports are always closed by this call. | 199 // Both ports are always closed by this call. |
197 int rv = node_controller_->MergeLocalPorts(port0, port1); | 200 int rv = node_controller_->MergeLocalPorts(port0, port1); |
198 return rv == ports::OK; | 201 return rv == ports::OK; |
199 } | 202 } |
200 | 203 |
201 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 204 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
202 return Type::MESSAGE_PIPE; | 205 return Type::MESSAGE_PIPE; |
203 } | 206 } |
204 | 207 |
205 MojoResult MessagePipeDispatcher::Close() { | 208 MojoResult MessagePipeDispatcher::Close() { |
206 base::AutoLock lock(signal_lock_); | 209 base::AutoLock lock(signal_lock_); |
207 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 210 DVLOG(2) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
208 << " [port=" << port_.name() << "]"; | 211 << " [port=" << port_.name() << "]"; |
209 return CloseNoLock(); | 212 return CloseNoLock(); |
210 } | 213 } |
211 | 214 |
212 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, | |
213 const Watcher::WatchCallback& callback, | |
214 uintptr_t context) { | |
215 base::AutoLock lock(signal_lock_); | |
216 | |
217 if (port_closed_ || in_transit_) | |
218 return MOJO_RESULT_INVALID_ARGUMENT; | |
219 | |
220 return awakables_.AddWatcher( | |
221 signals, callback, context, GetHandleSignalsStateNoLock()); | |
222 } | |
223 | |
224 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | |
225 base::AutoLock lock(signal_lock_); | |
226 | |
227 if (port_closed_ || in_transit_) | |
228 return MOJO_RESULT_INVALID_ARGUMENT; | |
229 | |
230 return awakables_.RemoveWatcher(context); | |
231 } | |
232 | |
233 MojoResult MessagePipeDispatcher::WriteMessage( | 215 MojoResult MessagePipeDispatcher::WriteMessage( |
234 std::unique_ptr<MessageForTransit> message, | 216 std::unique_ptr<MessageForTransit> message, |
235 MojoWriteMessageFlags flags) { | 217 MojoWriteMessageFlags flags) { |
236 if (port_closed_ || in_transit_) | 218 if (port_closed_ || in_transit_) |
237 return MOJO_RESULT_INVALID_ARGUMENT; | 219 return MOJO_RESULT_INVALID_ARGUMENT; |
238 | 220 |
239 size_t num_bytes = message->num_bytes(); | 221 size_t num_bytes = message->num_bytes(); |
240 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); | 222 int rv = node_controller_->SendMessage(port_, message->TakePortsMessage()); |
241 | 223 |
242 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ | 224 DVLOG(4) << "Sent message on pipe " << pipe_id_ << " endpoint " << endpoint_ |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
292 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { | 274 if (rv != ports::OK && rv != ports::ERROR_PORT_PEER_CLOSED) { |
293 if (rv == ports::ERROR_PORT_UNKNOWN || | 275 if (rv == ports::ERROR_PORT_UNKNOWN || |
294 rv == ports::ERROR_PORT_STATE_UNEXPECTED) | 276 rv == ports::ERROR_PORT_STATE_UNEXPECTED) |
295 return MOJO_RESULT_INVALID_ARGUMENT; | 277 return MOJO_RESULT_INVALID_ARGUMENT; |
296 | 278 |
297 NOTREACHED(); | 279 NOTREACHED(); |
298 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? | 280 return MOJO_RESULT_UNKNOWN; // TODO: Add a better error code here? |
299 } | 281 } |
300 | 282 |
301 if (no_space) { | 283 if (no_space) { |
| 284 if (may_discard) { |
| 285 // May have been the last message on the pipe. Need to update signals just |
| 286 // in case. |
| 287 base::AutoLock lock(signal_lock_); |
| 288 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 289 } |
302 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't | 290 // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't |
303 // sufficient to hold this message's data. The message will still be in | 291 // sufficient to hold this message's data. The message will still be in |
304 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | 292 // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
305 return MOJO_RESULT_RESOURCE_EXHAUSTED; | 293 return MOJO_RESULT_RESOURCE_EXHAUSTED; |
306 } | 294 } |
307 | 295 |
308 if (!ports_message) { | 296 if (!ports_message) { |
309 // No message was available in queue. | 297 // No message was available in queue. |
310 | 298 |
311 if (rv == ports::OK) | 299 if (rv == ports::OK) |
312 return MOJO_RESULT_SHOULD_WAIT; | 300 return MOJO_RESULT_SHOULD_WAIT; |
313 | 301 |
314 // Peer is closed and there are no more messages to read. | 302 // Peer is closed and there are no more messages to read. |
315 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 303 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
316 return MOJO_RESULT_FAILED_PRECONDITION; | 304 return MOJO_RESULT_FAILED_PRECONDITION; |
317 } | 305 } |
318 | 306 |
319 // Alright! We have a message and the caller has provided sufficient storage | 307 // Alright! We have a message and the caller has provided sufficient storage |
320 // in which to receive it. | 308 // in which to receive it. |
321 | 309 |
| 310 { |
| 311 // We need to update anyone watching our signals in case that was the last |
| 312 // available message. |
| 313 base::AutoLock lock(signal_lock_); |
| 314 watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| 315 } |
| 316 |
322 std::unique_ptr<PortsMessage> msg( | 317 std::unique_ptr<PortsMessage> msg( |
323 static_cast<PortsMessage*>(ports_message.release())); | 318 static_cast<PortsMessage*>(ports_message.release())); |
324 | 319 |
325 const MessageHeader* header = | 320 const MessageHeader* header = |
326 static_cast<const MessageHeader*>(msg->payload_bytes()); | 321 static_cast<const MessageHeader*>(msg->payload_bytes()); |
327 const DispatcherHeader* dispatcher_headers = | 322 const DispatcherHeader* dispatcher_headers = |
328 reinterpret_cast<const DispatcherHeader*>(header + 1); | 323 reinterpret_cast<const DispatcherHeader*>(header + 1); |
329 | 324 |
330 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) | 325 if (header->num_dispatchers > std::numeric_limits<uint16_t>::max()) |
331 return MOJO_RESULT_UNKNOWN; | 326 return MOJO_RESULT_UNKNOWN; |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
389 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); | 384 *message = MessageForTransit::WrapPortsMessage(std::move(msg)); |
390 return MOJO_RESULT_OK; | 385 return MOJO_RESULT_OK; |
391 } | 386 } |
392 | 387 |
393 HandleSignalsState | 388 HandleSignalsState |
394 MessagePipeDispatcher::GetHandleSignalsState() const { | 389 MessagePipeDispatcher::GetHandleSignalsState() const { |
395 base::AutoLock lock(signal_lock_); | 390 base::AutoLock lock(signal_lock_); |
396 return GetHandleSignalsStateNoLock(); | 391 return GetHandleSignalsStateNoLock(); |
397 } | 392 } |
398 | 393 |
| 394 MojoResult MessagePipeDispatcher::AddWatcherRef( |
| 395 const scoped_refptr<WatcherDispatcher>& watcher, |
| 396 uintptr_t context) { |
| 397 base::AutoLock lock(signal_lock_); |
| 398 if (port_closed_ || in_transit_) |
| 399 return MOJO_RESULT_INVALID_ARGUMENT; |
| 400 return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
| 401 } |
| 402 |
| 403 MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher, |
| 404 uintptr_t context) { |
| 405 base::AutoLock lock(signal_lock_); |
| 406 if (port_closed_ || in_transit_) |
| 407 return MOJO_RESULT_INVALID_ARGUMENT; |
| 408 return watchers_.Remove(watcher, context); |
| 409 } |
| 410 |
399 MojoResult MessagePipeDispatcher::AddAwakable( | 411 MojoResult MessagePipeDispatcher::AddAwakable( |
400 Awakable* awakable, | 412 Awakable* awakable, |
401 MojoHandleSignals signals, | 413 MojoHandleSignals signals, |
402 uintptr_t context, | 414 uintptr_t context, |
403 HandleSignalsState* signals_state) { | 415 HandleSignalsState* signals_state) { |
404 base::AutoLock lock(signal_lock_); | 416 base::AutoLock lock(signal_lock_); |
405 | 417 |
406 if (port_closed_ || in_transit_) { | 418 if (port_closed_ || in_transit_) { |
407 if (signals_state) | 419 if (signals_state) |
408 *signals_state = HandleSignalsState(); | 420 *signals_state = HandleSignalsState(); |
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
489 port_transferred_ = true; | 501 port_transferred_ = true; |
490 in_transit_.Set(false); | 502 in_transit_.Set(false); |
491 CloseNoLock(); | 503 CloseNoLock(); |
492 } | 504 } |
493 | 505 |
494 void MessagePipeDispatcher::CancelTransit() { | 506 void MessagePipeDispatcher::CancelTransit() { |
495 base::AutoLock lock(signal_lock_); | 507 base::AutoLock lock(signal_lock_); |
496 in_transit_.Set(false); | 508 in_transit_.Set(false); |
497 | 509 |
498 // Something may have happened while we were waiting for potential transit. | 510 // Something may have happened while we were waiting for potential transit. |
499 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 511 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 512 awakables_.AwakeForStateChange(state); |
| 513 watchers_.NotifyState(state); |
500 } | 514 } |
501 | 515 |
502 // static | 516 // static |
503 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( | 517 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
504 const void* data, | 518 const void* data, |
505 size_t num_bytes, | 519 size_t num_bytes, |
506 const ports::PortName* ports, | 520 const ports::PortName* ports, |
507 size_t num_ports, | 521 size_t num_ports, |
508 PlatformHandle* handles, | 522 PlatformHandle* handles, |
509 size_t num_handles) { | 523 size_t num_handles) { |
(...skipping 15 matching lines...) Expand all Loading... |
525 DCHECK(port_closed_ && !in_transit_); | 539 DCHECK(port_closed_ && !in_transit_); |
526 } | 540 } |
527 | 541 |
528 MojoResult MessagePipeDispatcher::CloseNoLock() { | 542 MojoResult MessagePipeDispatcher::CloseNoLock() { |
529 signal_lock_.AssertAcquired(); | 543 signal_lock_.AssertAcquired(); |
530 if (port_closed_ || in_transit_) | 544 if (port_closed_ || in_transit_) |
531 return MOJO_RESULT_INVALID_ARGUMENT; | 545 return MOJO_RESULT_INVALID_ARGUMENT; |
532 | 546 |
533 port_closed_.Set(true); | 547 port_closed_.Set(true); |
534 awakables_.CancelAll(); | 548 awakables_.CancelAll(); |
| 549 watchers_.NotifyClosed(); |
535 | 550 |
536 if (!port_transferred_) { | 551 if (!port_transferred_) { |
537 base::AutoUnlock unlock(signal_lock_); | 552 base::AutoUnlock unlock(signal_lock_); |
538 node_controller_->ClosePort(port_); | 553 node_controller_->ClosePort(port_); |
539 } | 554 } |
540 | 555 |
541 return MOJO_RESULT_OK; | 556 return MOJO_RESULT_OK; |
542 } | 557 } |
543 | 558 |
544 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { | 559 HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
589 << " endpoint " << endpoint_ << " [port=" << port_.name() | 604 << " endpoint " << endpoint_ << " [port=" << port_.name() |
590 << "; size=" << filter.message_size() << "]"; | 605 << "; size=" << filter.message_size() << "]"; |
591 } | 606 } |
592 if (port_status.peer_closed) { | 607 if (port_status.peer_closed) { |
593 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ | 608 DVLOG(2) << "Peer closure detected on message pipe " << pipe_id_ |
594 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 609 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
595 } | 610 } |
596 } | 611 } |
597 #endif | 612 #endif |
598 | 613 |
599 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 614 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 615 awakables_.AwakeForStateChange(state); |
| 616 watchers_.NotifyState(state); |
600 } | 617 } |
601 | 618 |
602 } // namespace edk | 619 } // namespace edk |
603 } // namespace mojo | 620 } // namespace mojo |
OLD | NEW |