OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
6 | 6 |
7 #include <limits> | 7 #include <limits> |
8 | 8 |
9 #include "base/macros.h" | 9 #include "base/macros.h" |
10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
83 | 83 |
84 node_controller_->SetPortObserver( | 84 node_controller_->SetPortObserver( |
85 port_, | 85 port_, |
86 make_scoped_refptr(new PortObserverThunk(this))); | 86 make_scoped_refptr(new PortObserverThunk(this))); |
87 } | 87 } |
88 | 88 |
89 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 89 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
90 return Type::MESSAGE_PIPE; | 90 return Type::MESSAGE_PIPE; |
91 } | 91 } |
92 | 92 |
93 MojoResult MessagePipeDispatcher::Close() { | 93 MojoResult MessagePipeDispatcher::Close(RequestContext* request_context) { |
94 base::AutoLock lock(signal_lock_); | 94 base::AutoLock lock(signal_lock_); |
95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
96 << " [port=" << port_.name() << "]"; | 96 << " [port=" << port_.name() << "]"; |
97 return CloseNoLock(); | 97 return CloseNoLock(request_context); |
| 98 } |
| 99 |
| 100 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, |
| 101 const WatchCallback& callback, |
| 102 uintptr_t context, |
| 103 RequestContext* request_context) { |
| 104 base::AutoLock lock(signal_lock_); |
| 105 |
| 106 if (port_closed_ || in_transit_) |
| 107 return MOJO_RESULT_INVALID_ARGUMENT; |
| 108 |
| 109 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 110 return watchers_.Add(signals, callback, context, state, request_context); |
| 111 } |
| 112 |
| 113 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
| 114 base::AutoLock lock(signal_lock_); |
| 115 |
| 116 if (port_closed_ || in_transit_) |
| 117 return MOJO_RESULT_INVALID_ARGUMENT; |
| 118 |
| 119 return watchers_.Remove(context); |
98 } | 120 } |
99 | 121 |
100 MojoResult MessagePipeDispatcher::WriteMessage( | 122 MojoResult MessagePipeDispatcher::WriteMessage( |
101 const void* bytes, | 123 const void* bytes, |
102 uint32_t num_bytes, | 124 uint32_t num_bytes, |
103 const DispatcherInTransit* dispatchers, | 125 const DispatcherInTransit* dispatchers, |
104 uint32_t num_dispatchers, | 126 uint32_t num_dispatchers, |
105 MojoWriteMessageFlags flags) { | 127 MojoWriteMessageFlags flags, |
| 128 RequestContext* request_context) { |
106 | 129 |
107 { | 130 { |
108 base::AutoLock lock(signal_lock_); | 131 base::AutoLock lock(signal_lock_); |
109 if (port_closed_ || in_transit_) | 132 if (port_closed_ || in_transit_) |
110 return MOJO_RESULT_INVALID_ARGUMENT; | 133 return MOJO_RESULT_INVALID_ARGUMENT; |
111 } | 134 } |
112 | 135 |
113 // A structure for retaining information about every Dispatcher we're about | 136 // A structure for retaining information about every Dispatcher we're about |
114 // to send. This information is collected by calling StartSerialize() on | 137 // to send. This information is collected by calling StartSerialize() on |
115 // each dispatcher in sequence. | 138 // each dispatcher in sequence. |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
219 << " [port=" << port_.name() << "; rv=" << rv | 242 << " [port=" << port_.name() << "; rv=" << rv |
220 << "; num_bytes=" << num_bytes << "]"; | 243 << "; num_bytes=" << num_bytes << "]"; |
221 | 244 |
222 if (rv != ports::OK) { | 245 if (rv != ports::OK) { |
223 if (rv == ports::ERROR_PORT_UNKNOWN || | 246 if (rv == ports::ERROR_PORT_UNKNOWN || |
224 rv == ports::ERROR_PORT_STATE_UNEXPECTED || | 247 rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
225 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { | 248 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
226 result = MOJO_RESULT_INVALID_ARGUMENT; | 249 result = MOJO_RESULT_INVALID_ARGUMENT; |
227 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { | 250 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
228 base::AutoLock lock(signal_lock_); | 251 base::AutoLock lock(signal_lock_); |
229 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 252 NotifyObserversForStateChangeNoLock(request_context); |
230 result = MOJO_RESULT_FAILED_PRECONDITION; | 253 result = MOJO_RESULT_FAILED_PRECONDITION; |
231 } else { | 254 } else { |
232 NOTREACHED(); | 255 NOTREACHED(); |
233 result = MOJO_RESULT_UNKNOWN; | 256 result = MOJO_RESULT_UNKNOWN; |
234 } | 257 } |
235 cancel_transit = true; | 258 cancel_transit = true; |
236 } else { | 259 } else { |
237 DCHECK(!message); | 260 DCHECK(!message); |
238 } | 261 } |
239 } | 262 } |
240 | 263 |
241 if (cancel_transit) { | 264 if (cancel_transit) { |
242 // We ended up not sending the message. Release all the platform handles. | 265 // We ended up not sending the message. Release all the platform handles. |
243 // Their dipatchers retain ownership when transit is canceled, so these are | 266 // Their dipatchers retain ownership when transit is canceled, so these are |
244 // not actually leaking. | 267 // not actually leaking. |
245 DCHECK(message); | 268 DCHECK(message); |
246 Channel::MessagePtr m = message->TakeChannelMessage(); | 269 Channel::MessagePtr m = message->TakeChannelMessage(); |
247 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); | 270 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); |
248 if (handles) | 271 if (handles) |
249 handles->clear(); | 272 handles->clear(); |
250 } | 273 } |
251 | 274 |
252 return result; | 275 return result; |
253 } | 276 } |
254 | 277 |
255 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, | 278 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
256 uint32_t* num_bytes, | 279 uint32_t* num_bytes, |
257 MojoHandle* handles, | 280 MojoHandle* handles, |
258 uint32_t* num_handles, | 281 uint32_t* num_handles, |
259 MojoReadMessageFlags flags) { | 282 MojoReadMessageFlags flags, |
| 283 RequestContext* request_context) { |
260 { | 284 { |
261 base::AutoLock lock(signal_lock_); | 285 base::AutoLock lock(signal_lock_); |
262 // We can't read from a port that's closed or in transit! | 286 // We can't read from a port that's closed or in transit! |
263 if (port_closed_ || in_transit_) | 287 if (port_closed_ || in_transit_) |
264 return MOJO_RESULT_INVALID_ARGUMENT; | 288 return MOJO_RESULT_INVALID_ARGUMENT; |
265 } | 289 } |
266 | 290 |
267 bool no_space = false; | 291 bool no_space = false; |
268 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; | 292 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
269 | 293 |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
329 | 353 |
330 if (!ports_message) { | 354 if (!ports_message) { |
331 // No message was available in queue. | 355 // No message was available in queue. |
332 | 356 |
333 if (rv == ports::OK) | 357 if (rv == ports::OK) |
334 return MOJO_RESULT_SHOULD_WAIT; | 358 return MOJO_RESULT_SHOULD_WAIT; |
335 | 359 |
336 // Peer is closed and there are no more messages to read. | 360 // Peer is closed and there are no more messages to read. |
337 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 361 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
338 base::AutoLock lock(signal_lock_); | 362 base::AutoLock lock(signal_lock_); |
339 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 363 NotifyObserversForStateChangeNoLock(request_context); |
340 return MOJO_RESULT_FAILED_PRECONDITION; | 364 return MOJO_RESULT_FAILED_PRECONDITION; |
341 } | 365 } |
342 | 366 |
343 // Alright! We have a message and the caller has provided sufficient storage | 367 // Alright! We have a message and the caller has provided sufficient storage |
344 // in which to receive it. | 368 // in which to receive it. |
345 | 369 |
346 scoped_ptr<PortsMessage> message( | 370 scoped_ptr<PortsMessage> message( |
347 static_cast<PortsMessage*>(ports_message.release())); | 371 static_cast<PortsMessage*>(ports_message.release())); |
348 | 372 |
349 const MessageHeader* header = | 373 const MessageHeader* header = |
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
478 bool MessagePipeDispatcher::EndSerialize(void* destination, | 502 bool MessagePipeDispatcher::EndSerialize(void* destination, |
479 ports::PortName* ports, | 503 ports::PortName* ports, |
480 PlatformHandle* handles) { | 504 PlatformHandle* handles) { |
481 SerializedState* state = static_cast<SerializedState*>(destination); | 505 SerializedState* state = static_cast<SerializedState*>(destination); |
482 state->pipe_id = pipe_id_; | 506 state->pipe_id = pipe_id_; |
483 state->endpoint = static_cast<int8_t>(endpoint_); | 507 state->endpoint = static_cast<int8_t>(endpoint_); |
484 ports[0] = port_.name(); | 508 ports[0] = port_.name(); |
485 return true; | 509 return true; |
486 } | 510 } |
487 | 511 |
488 bool MessagePipeDispatcher::BeginTransit() { | 512 bool MessagePipeDispatcher::BeginTransit(RequestContext* request_context) { |
489 base::AutoLock lock(signal_lock_); | 513 base::AutoLock lock(signal_lock_); |
490 if (in_transit_ || port_closed_) | 514 if (in_transit_ || port_closed_) |
491 return false; | 515 return false; |
492 in_transit_ = true; | 516 in_transit_ = true; |
493 return in_transit_; | 517 return in_transit_; |
494 } | 518 } |
495 | 519 |
496 void MessagePipeDispatcher::CompleteTransitAndClose() { | 520 void MessagePipeDispatcher::CompleteTransitAndClose( |
| 521 RequestContext* request_context) { |
497 node_controller_->SetPortObserver(port_, nullptr); | 522 node_controller_->SetPortObserver(port_, nullptr); |
498 | 523 |
499 base::AutoLock lock(signal_lock_); | 524 base::AutoLock lock(signal_lock_); |
500 in_transit_ = false; | 525 in_transit_ = false; |
501 port_transferred_ = true; | 526 port_transferred_ = true; |
502 CloseNoLock(); | 527 CloseNoLock(request_context); |
503 } | 528 } |
504 | 529 |
505 void MessagePipeDispatcher::CancelTransit() { | 530 void MessagePipeDispatcher::CancelTransit(RequestContext* request_context) { |
506 base::AutoLock lock(signal_lock_); | 531 base::AutoLock lock(signal_lock_); |
507 in_transit_ = false; | 532 in_transit_ = false; |
508 | 533 |
509 // Something may have happened while we were waiting for potential transit. | 534 // Something may have happened while we were waiting for potential transit. |
510 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 535 NotifyObserversForStateChangeNoLock(request_context); |
511 } | 536 } |
512 | 537 |
513 // static | 538 // static |
514 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( | 539 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
515 const void* data, | 540 const void* data, |
516 size_t num_bytes, | 541 size_t num_bytes, |
517 const ports::PortName* ports, | 542 const ports::PortName* ports, |
518 size_t num_ports, | 543 size_t num_ports, |
519 PlatformHandle* handles, | 544 PlatformHandle* handles, |
520 size_t num_handles) { | 545 size_t num_handles) { |
521 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) | 546 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) |
522 return nullptr; | 547 return nullptr; |
523 | 548 |
524 const SerializedState* state = static_cast<const SerializedState*>(data); | 549 const SerializedState* state = static_cast<const SerializedState*>(data); |
525 | 550 |
526 ports::PortRef port; | 551 ports::PortRef port; |
527 CHECK_EQ( | 552 CHECK_EQ( |
528 ports::OK, | 553 ports::OK, |
529 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); | 554 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); |
530 | 555 |
531 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, | 556 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, |
532 state->pipe_id, state->endpoint); | 557 state->pipe_id, state->endpoint); |
533 } | 558 } |
534 | 559 |
535 MessagePipeDispatcher::~MessagePipeDispatcher() { | 560 MessagePipeDispatcher::~MessagePipeDispatcher() { |
536 DCHECK(port_closed_ && !in_transit_); | 561 DCHECK(port_closed_ && !in_transit_); |
537 } | 562 } |
538 | 563 |
539 MojoResult MessagePipeDispatcher::CloseNoLock() { | 564 MojoResult MessagePipeDispatcher::CloseNoLock(RequestContext* request_context) { |
540 signal_lock_.AssertAcquired(); | 565 signal_lock_.AssertAcquired(); |
541 if (port_closed_ || in_transit_) | 566 if (port_closed_ || in_transit_) |
542 return MOJO_RESULT_INVALID_ARGUMENT; | 567 return MOJO_RESULT_INVALID_ARGUMENT; |
543 | 568 |
544 port_closed_ = true; | 569 port_closed_ = true; |
| 570 watchers_.CancelAll(request_context); |
545 awakables_.CancelAll(); | 571 awakables_.CancelAll(); |
546 | 572 |
547 if (!port_transferred_) { | 573 if (!port_transferred_) { |
548 base::AutoUnlock unlock(signal_lock_); | 574 base::AutoUnlock unlock(signal_lock_); |
549 node_controller_->ClosePort(port_); | 575 node_controller_->ClosePort(port_); |
550 } | 576 } |
551 | 577 |
552 return MOJO_RESULT_OK; | 578 return MOJO_RESULT_OK; |
553 } | 579 } |
554 | 580 |
(...skipping 17 matching lines...) Expand all Loading... |
572 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 598 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
573 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 599 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
574 } else { | 600 } else { |
575 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 601 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
576 } | 602 } |
577 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 603 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
578 return rv; | 604 return rv; |
579 } | 605 } |
580 | 606 |
581 void MessagePipeDispatcher::OnPortStatusChanged() { | 607 void MessagePipeDispatcher::OnPortStatusChanged() { |
| 608 // Create a new RequestContext that will run its finalizers after |
| 609 // |signal_lock_| is released. |
| 610 RequestContext request_context; |
| 611 |
582 base::AutoLock lock(signal_lock_); | 612 base::AutoLock lock(signal_lock_); |
583 | 613 |
584 // We stop observing our port as soon as it's transferred, but this can race | 614 // We stop observing our port as soon as it's transferred, but this can race |
585 // with events which are raised right before that happens. This is fine to | 615 // with events which are raised right before that happens. This is fine to |
586 // ignore. | 616 // ignore. |
587 if (port_transferred_) | 617 if (port_transferred_) |
588 return; | 618 return; |
589 | 619 |
590 #if !defined(NDEBUG) | 620 #if !defined(NDEBUG) |
591 ports::PortStatus port_status; | 621 ports::PortStatus port_status; |
592 node_controller_->node()->GetStatus(port_, &port_status); | 622 node_controller_->node()->GetStatus(port_, &port_status); |
593 if (port_status.has_messages) { | 623 if (port_status.has_messages) { |
594 ports::ScopedMessage unused; | 624 ports::ScopedMessage unused; |
595 size_t message_size = 0; | 625 size_t message_size = 0; |
596 node_controller_->node()->GetMessageIf( | 626 node_controller_->node()->GetMessageIf( |
597 port_, [&message_size](const ports::Message& message) { | 627 port_, [&message_size](const ports::Message& message) { |
598 message_size = message.num_payload_bytes(); | 628 message_size = message.num_payload_bytes(); |
599 return false; | 629 return false; |
600 }, &unused); | 630 }, &unused); |
601 DVLOG(1) << "New message detected on message pipe " << pipe_id_ | 631 DVLOG(1) << "New message detected on message pipe " << pipe_id_ |
602 << " endpoint " << endpoint_ << " [port=" << port_.name() | 632 << " endpoint " << endpoint_ << " [port=" << port_.name() |
603 << "; size=" << message_size << "]"; | 633 << "; size=" << message_size << "]"; |
604 } | 634 } |
605 if (port_status.peer_closed) { | 635 if (port_status.peer_closed) { |
606 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ | 636 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
607 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 637 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
608 } | 638 } |
609 #endif | 639 #endif |
610 | 640 |
611 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 641 NotifyObserversForStateChangeNoLock(&request_context); |
| 642 } |
| 643 |
| 644 void MessagePipeDispatcher::NotifyObserversForStateChangeNoLock( |
| 645 RequestContext* request_context) { |
| 646 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 647 awakables_.AwakeForStateChange(state); |
| 648 watchers_.NotifyOfStateChange(state, request_context); |
612 } | 649 } |
613 | 650 |
614 } // namespace edk | 651 } // namespace edk |
615 } // namespace mojo | 652 } // namespace mojo |
OLD | NEW |