| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 | 8 |
| 9 #include "base/macros.h" | 9 #include "base/macros.h" |
| 10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
| (...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 83 | 83 |
| 84 node_controller_->SetPortObserver( | 84 node_controller_->SetPortObserver( |
| 85 port_, | 85 port_, |
| 86 make_scoped_refptr(new PortObserverThunk(this))); | 86 make_scoped_refptr(new PortObserverThunk(this))); |
| 87 } | 87 } |
| 88 | 88 |
| 89 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 89 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 90 return Type::MESSAGE_PIPE; | 90 return Type::MESSAGE_PIPE; |
| 91 } | 91 } |
| 92 | 92 |
| 93 MojoResult MessagePipeDispatcher::Close() { | 93 MojoResult MessagePipeDispatcher::Close(RequestContext* request_context) { |
| 94 base::AutoLock lock(signal_lock_); | 94 base::AutoLock lock(signal_lock_); |
| 95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
| 96 << " [port=" << port_.name() << "]"; | 96 << " [port=" << port_.name() << "]"; |
| 97 return CloseNoLock(); | 97 return CloseNoLock(request_context); |
| 98 } |
| 99 |
| 100 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, |
| 101 const WatchCallback& callback, |
| 102 uintptr_t context, |
| 103 RequestContext* request_context) { |
| 104 base::AutoLock lock(signal_lock_); |
| 105 |
| 106 if (port_closed_ || in_transit_) |
| 107 return MOJO_RESULT_INVALID_ARGUMENT; |
| 108 |
| 109 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 110 return watchers_.Add(signals, callback, context, state, request_context); |
| 111 } |
| 112 |
| 113 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
| 114 base::AutoLock lock(signal_lock_); |
| 115 |
| 116 if (port_closed_ || in_transit_) |
| 117 return MOJO_RESULT_INVALID_ARGUMENT; |
| 118 |
| 119 return watchers_.Remove(context); |
| 98 } | 120 } |
| 99 | 121 |
| 100 MojoResult MessagePipeDispatcher::WriteMessage( | 122 MojoResult MessagePipeDispatcher::WriteMessage( |
| 101 const void* bytes, | 123 const void* bytes, |
| 102 uint32_t num_bytes, | 124 uint32_t num_bytes, |
| 103 const DispatcherInTransit* dispatchers, | 125 const DispatcherInTransit* dispatchers, |
| 104 uint32_t num_dispatchers, | 126 uint32_t num_dispatchers, |
| 105 MojoWriteMessageFlags flags) { | 127 MojoWriteMessageFlags flags, |
| 128 RequestContext* request_context) { |
| 106 | 129 |
| 107 { | 130 { |
| 108 base::AutoLock lock(signal_lock_); | 131 base::AutoLock lock(signal_lock_); |
| 109 if (port_closed_ || in_transit_) | 132 if (port_closed_ || in_transit_) |
| 110 return MOJO_RESULT_INVALID_ARGUMENT; | 133 return MOJO_RESULT_INVALID_ARGUMENT; |
| 111 } | 134 } |
| 112 | 135 |
| 113 // A structure for retaining information about every Dispatcher we're about | 136 // A structure for retaining information about every Dispatcher we're about |
| 114 // to send. This information is collected by calling StartSerialize() on | 137 // to send. This information is collected by calling StartSerialize() on |
| 115 // each dispatcher in sequence. | 138 // each dispatcher in sequence. |
| (...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 219 << " [port=" << port_.name() << "; rv=" << rv | 242 << " [port=" << port_.name() << "; rv=" << rv |
| 220 << "; num_bytes=" << num_bytes << "]"; | 243 << "; num_bytes=" << num_bytes << "]"; |
| 221 | 244 |
| 222 if (rv != ports::OK) { | 245 if (rv != ports::OK) { |
| 223 if (rv == ports::ERROR_PORT_UNKNOWN || | 246 if (rv == ports::ERROR_PORT_UNKNOWN || |
| 224 rv == ports::ERROR_PORT_STATE_UNEXPECTED || | 247 rv == ports::ERROR_PORT_STATE_UNEXPECTED || |
| 225 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { | 248 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { |
| 226 result = MOJO_RESULT_INVALID_ARGUMENT; | 249 result = MOJO_RESULT_INVALID_ARGUMENT; |
| 227 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { | 250 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
| 228 base::AutoLock lock(signal_lock_); | 251 base::AutoLock lock(signal_lock_); |
| 229 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 252 NotifyObserversForStateChangeNoLock(request_context); |
| 230 result = MOJO_RESULT_FAILED_PRECONDITION; | 253 result = MOJO_RESULT_FAILED_PRECONDITION; |
| 231 } else { | 254 } else { |
| 232 NOTREACHED(); | 255 NOTREACHED(); |
| 233 result = MOJO_RESULT_UNKNOWN; | 256 result = MOJO_RESULT_UNKNOWN; |
| 234 } | 257 } |
| 235 cancel_transit = true; | 258 cancel_transit = true; |
| 236 } else { | 259 } else { |
| 237 DCHECK(!message); | 260 DCHECK(!message); |
| 238 } | 261 } |
| 239 } | 262 } |
| 240 | 263 |
| 241 if (cancel_transit) { | 264 if (cancel_transit) { |
| 242 // We ended up not sending the message. Release all the platform handles. | 265 // We ended up not sending the message. Release all the platform handles. |
| 243 // Their dipatchers retain ownership when transit is canceled, so these are | 266 // Their dipatchers retain ownership when transit is canceled, so these are |
| 244 // not actually leaking. | 267 // not actually leaking. |
| 245 DCHECK(message); | 268 DCHECK(message); |
| 246 Channel::MessagePtr m = message->TakeChannelMessage(); | 269 Channel::MessagePtr m = message->TakeChannelMessage(); |
| 247 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); | 270 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); |
| 248 if (handles) | 271 if (handles) |
| 249 handles->clear(); | 272 handles->clear(); |
| 250 } | 273 } |
| 251 | 274 |
| 252 return result; | 275 return result; |
| 253 } | 276 } |
| 254 | 277 |
| 255 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, | 278 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| 256 uint32_t* num_bytes, | 279 uint32_t* num_bytes, |
| 257 MojoHandle* handles, | 280 MojoHandle* handles, |
| 258 uint32_t* num_handles, | 281 uint32_t* num_handles, |
| 259 MojoReadMessageFlags flags) { | 282 MojoReadMessageFlags flags, |
| 283 RequestContext* request_context) { |
| 260 { | 284 { |
| 261 base::AutoLock lock(signal_lock_); | 285 base::AutoLock lock(signal_lock_); |
| 262 // We can't read from a port that's closed or in transit! | 286 // We can't read from a port that's closed or in transit! |
| 263 if (port_closed_ || in_transit_) | 287 if (port_closed_ || in_transit_) |
| 264 return MOJO_RESULT_INVALID_ARGUMENT; | 288 return MOJO_RESULT_INVALID_ARGUMENT; |
| 265 } | 289 } |
| 266 | 290 |
| 267 bool no_space = false; | 291 bool no_space = false; |
| 268 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; | 292 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
| 269 | 293 |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 329 | 353 |
| 330 if (!ports_message) { | 354 if (!ports_message) { |
| 331 // No message was available in queue. | 355 // No message was available in queue. |
| 332 | 356 |
| 333 if (rv == ports::OK) | 357 if (rv == ports::OK) |
| 334 return MOJO_RESULT_SHOULD_WAIT; | 358 return MOJO_RESULT_SHOULD_WAIT; |
| 335 | 359 |
| 336 // Peer is closed and there are no more messages to read. | 360 // Peer is closed and there are no more messages to read. |
| 337 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); | 361 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
| 338 base::AutoLock lock(signal_lock_); | 362 base::AutoLock lock(signal_lock_); |
| 339 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 363 NotifyObserversForStateChangeNoLock(request_context); |
| 340 return MOJO_RESULT_FAILED_PRECONDITION; | 364 return MOJO_RESULT_FAILED_PRECONDITION; |
| 341 } | 365 } |
| 342 | 366 |
| 343 // Alright! We have a message and the caller has provided sufficient storage | 367 // Alright! We have a message and the caller has provided sufficient storage |
| 344 // in which to receive it. | 368 // in which to receive it. |
| 345 | 369 |
| 346 scoped_ptr<PortsMessage> message( | 370 scoped_ptr<PortsMessage> message( |
| 347 static_cast<PortsMessage*>(ports_message.release())); | 371 static_cast<PortsMessage*>(ports_message.release())); |
| 348 | 372 |
| 349 const MessageHeader* header = | 373 const MessageHeader* header = |
| (...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 478 bool MessagePipeDispatcher::EndSerialize(void* destination, | 502 bool MessagePipeDispatcher::EndSerialize(void* destination, |
| 479 ports::PortName* ports, | 503 ports::PortName* ports, |
| 480 PlatformHandle* handles) { | 504 PlatformHandle* handles) { |
| 481 SerializedState* state = static_cast<SerializedState*>(destination); | 505 SerializedState* state = static_cast<SerializedState*>(destination); |
| 482 state->pipe_id = pipe_id_; | 506 state->pipe_id = pipe_id_; |
| 483 state->endpoint = static_cast<int8_t>(endpoint_); | 507 state->endpoint = static_cast<int8_t>(endpoint_); |
| 484 ports[0] = port_.name(); | 508 ports[0] = port_.name(); |
| 485 return true; | 509 return true; |
| 486 } | 510 } |
| 487 | 511 |
| 488 bool MessagePipeDispatcher::BeginTransit() { | 512 bool MessagePipeDispatcher::BeginTransit(RequestContext* request_context) { |
| 489 base::AutoLock lock(signal_lock_); | 513 base::AutoLock lock(signal_lock_); |
| 490 if (in_transit_ || port_closed_) | 514 if (in_transit_ || port_closed_) |
| 491 return false; | 515 return false; |
| 492 in_transit_ = true; | 516 in_transit_ = true; |
| 493 return in_transit_; | 517 return in_transit_; |
| 494 } | 518 } |
| 495 | 519 |
| 496 void MessagePipeDispatcher::CompleteTransitAndClose() { | 520 void MessagePipeDispatcher::CompleteTransitAndClose( |
| 521 RequestContext* request_context) { |
| 497 node_controller_->SetPortObserver(port_, nullptr); | 522 node_controller_->SetPortObserver(port_, nullptr); |
| 498 | 523 |
| 499 base::AutoLock lock(signal_lock_); | 524 base::AutoLock lock(signal_lock_); |
| 500 in_transit_ = false; | 525 in_transit_ = false; |
| 501 port_transferred_ = true; | 526 port_transferred_ = true; |
| 502 CloseNoLock(); | 527 CloseNoLock(request_context); |
| 503 } | 528 } |
| 504 | 529 |
| 505 void MessagePipeDispatcher::CancelTransit() { | 530 void MessagePipeDispatcher::CancelTransit(RequestContext* request_context) { |
| 506 base::AutoLock lock(signal_lock_); | 531 base::AutoLock lock(signal_lock_); |
| 507 in_transit_ = false; | 532 in_transit_ = false; |
| 508 | 533 |
| 509 // Something may have happened while we were waiting for potential transit. | 534 // Something may have happened while we were waiting for potential transit. |
| 510 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 535 NotifyObserversForStateChangeNoLock(request_context); |
| 511 } | 536 } |
| 512 | 537 |
| 513 // static | 538 // static |
| 514 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( | 539 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( |
| 515 const void* data, | 540 const void* data, |
| 516 size_t num_bytes, | 541 size_t num_bytes, |
| 517 const ports::PortName* ports, | 542 const ports::PortName* ports, |
| 518 size_t num_ports, | 543 size_t num_ports, |
| 519 PlatformHandle* handles, | 544 PlatformHandle* handles, |
| 520 size_t num_handles) { | 545 size_t num_handles) { |
| 521 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) | 546 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) |
| 522 return nullptr; | 547 return nullptr; |
| 523 | 548 |
| 524 const SerializedState* state = static_cast<const SerializedState*>(data); | 549 const SerializedState* state = static_cast<const SerializedState*>(data); |
| 525 | 550 |
| 526 ports::PortRef port; | 551 ports::PortRef port; |
| 527 CHECK_EQ( | 552 CHECK_EQ( |
| 528 ports::OK, | 553 ports::OK, |
| 529 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); | 554 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); |
| 530 | 555 |
| 531 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, | 556 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, |
| 532 state->pipe_id, state->endpoint); | 557 state->pipe_id, state->endpoint); |
| 533 } | 558 } |
| 534 | 559 |
| 535 MessagePipeDispatcher::~MessagePipeDispatcher() { | 560 MessagePipeDispatcher::~MessagePipeDispatcher() { |
| 536 DCHECK(port_closed_ && !in_transit_); | 561 DCHECK(port_closed_ && !in_transit_); |
| 537 } | 562 } |
| 538 | 563 |
| 539 MojoResult MessagePipeDispatcher::CloseNoLock() { | 564 MojoResult MessagePipeDispatcher::CloseNoLock(RequestContext* request_context) { |
| 540 signal_lock_.AssertAcquired(); | 565 signal_lock_.AssertAcquired(); |
| 541 if (port_closed_ || in_transit_) | 566 if (port_closed_ || in_transit_) |
| 542 return MOJO_RESULT_INVALID_ARGUMENT; | 567 return MOJO_RESULT_INVALID_ARGUMENT; |
| 543 | 568 |
| 544 port_closed_ = true; | 569 port_closed_ = true; |
| 570 watchers_.CancelAll(request_context); |
| 545 awakables_.CancelAll(); | 571 awakables_.CancelAll(); |
| 546 | 572 |
| 547 if (!port_transferred_) { | 573 if (!port_transferred_) { |
| 548 base::AutoUnlock unlock(signal_lock_); | 574 base::AutoUnlock unlock(signal_lock_); |
| 549 node_controller_->ClosePort(port_); | 575 node_controller_->ClosePort(port_); |
| 550 } | 576 } |
| 551 | 577 |
| 552 return MOJO_RESULT_OK; | 578 return MOJO_RESULT_OK; |
| 553 } | 579 } |
| 554 | 580 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 572 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; | 598 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
| 573 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; | 599 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| 574 } else { | 600 } else { |
| 575 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 601 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 576 } | 602 } |
| 577 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; | 603 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; |
| 578 return rv; | 604 return rv; |
| 579 } | 605 } |
| 580 | 606 |
| 581 void MessagePipeDispatcher::OnPortStatusChanged() { | 607 void MessagePipeDispatcher::OnPortStatusChanged() { |
| 608 // Create a new RequestContext that will run its finalizers after |
| 609 // |signal_lock_| is released. |
| 610 RequestContext request_context; |
| 611 |
| 582 base::AutoLock lock(signal_lock_); | 612 base::AutoLock lock(signal_lock_); |
| 583 | 613 |
| 584 // We stop observing our port as soon as it's transferred, but this can race | 614 // We stop observing our port as soon as it's transferred, but this can race |
| 585 // with events which are raised right before that happens. This is fine to | 615 // with events which are raised right before that happens. This is fine to |
| 586 // ignore. | 616 // ignore. |
| 587 if (port_transferred_) | 617 if (port_transferred_) |
| 588 return; | 618 return; |
| 589 | 619 |
| 590 #if !defined(NDEBUG) | 620 #if !defined(NDEBUG) |
| 591 ports::PortStatus port_status; | 621 ports::PortStatus port_status; |
| 592 node_controller_->node()->GetStatus(port_, &port_status); | 622 node_controller_->node()->GetStatus(port_, &port_status); |
| 593 if (port_status.has_messages) { | 623 if (port_status.has_messages) { |
| 594 ports::ScopedMessage unused; | 624 ports::ScopedMessage unused; |
| 595 size_t message_size = 0; | 625 size_t message_size = 0; |
| 596 node_controller_->node()->GetMessageIf( | 626 node_controller_->node()->GetMessageIf( |
| 597 port_, [&message_size](const ports::Message& message) { | 627 port_, [&message_size](const ports::Message& message) { |
| 598 message_size = message.num_payload_bytes(); | 628 message_size = message.num_payload_bytes(); |
| 599 return false; | 629 return false; |
| 600 }, &unused); | 630 }, &unused); |
| 601 DVLOG(1) << "New message detected on message pipe " << pipe_id_ | 631 DVLOG(1) << "New message detected on message pipe " << pipe_id_ |
| 602 << " endpoint " << endpoint_ << " [port=" << port_.name() | 632 << " endpoint " << endpoint_ << " [port=" << port_.name() |
| 603 << "; size=" << message_size << "]"; | 633 << "; size=" << message_size << "]"; |
| 604 } | 634 } |
| 605 if (port_status.peer_closed) { | 635 if (port_status.peer_closed) { |
| 606 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ | 636 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
| 607 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 637 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
| 608 } | 638 } |
| 609 #endif | 639 #endif |
| 610 | 640 |
| 611 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 641 NotifyObserversForStateChangeNoLock(&request_context); |
| 642 } |
| 643 |
| 644 void MessagePipeDispatcher::NotifyObserversForStateChangeNoLock( |
| 645 RequestContext* request_context) { |
| 646 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 647 awakables_.AwakeForStateChange(state); |
| 648 watchers_.NotifyOfStateChange(state, request_context); |
| 612 } | 649 } |
| 613 | 650 |
| 614 } // namespace edk | 651 } // namespace edk |
| 615 } // namespace mojo | 652 } // namespace mojo |
| OLD | NEW |