| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/edk/system/message_pipe_dispatcher.h" | 5 #include "mojo/edk/system/message_pipe_dispatcher.h" |
| 6 | 6 |
| 7 #include <limits> | 7 #include <limits> |
| 8 | 8 |
| 9 #include "base/macros.h" | 9 #include "base/macros.h" |
| 10 #include "base/memory/ref_counted.h" | 10 #include "base/memory/ref_counted.h" |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 92 pipe_id_(pipe_id), | 92 pipe_id_(pipe_id), |
| 93 endpoint_(endpoint) { | 93 endpoint_(endpoint) { |
| 94 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 94 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
| 95 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 95 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
| 96 | 96 |
| 97 node_controller_->SetPortObserver( | 97 node_controller_->SetPortObserver( |
| 98 port_, | 98 port_, |
| 99 make_scoped_refptr(new PortObserverThunk(this))); | 99 make_scoped_refptr(new PortObserverThunk(this))); |
| 100 } | 100 } |
| 101 | 101 |
| 102 bool MessagePipeDispatcher::BeginFuse() { |
| 103 base::AutoLock lock(signal_lock_); |
| 104 if (port_closed_ || in_transit_ || is_fusing_) |
| 105 return false; |
| 106 is_fusing_ = true; |
| 107 return true; |
| 108 } |
| 109 |
| 110 void MessagePipeDispatcher::CancelFuse() { |
| 111 base::AutoLock lock(signal_lock_); |
| 112 CHECK(is_fusing_); |
| 113 is_fusing_ = false; |
| 114 } |
| 115 |
| 116 bool MessagePipeDispatcher::CompleteFuse(MessagePipeDispatcher* other) { |
| 117 node_controller_->SetPortObserver(port_, nullptr); |
| 118 node_controller_->SetPortObserver(other->port_, nullptr); |
| 119 |
| 120 ports::PortRef port0; |
| 121 { |
| 122 base::AutoLock lock(signal_lock_); |
| 123 CHECK(is_fusing_); |
| 124 port0 = port_; |
| 125 port_closed_ = true; |
| 126 awakables_.CancelAll(); |
| 127 } |
| 128 |
| 129 ports::PortRef port1; |
| 130 { |
| 131 base::AutoLock lock(other->signal_lock_); |
| 132 CHECK(other->is_fusing_); |
| 133 port1 = other->port_; |
| 134 other->port_closed_ = true; |
| 135 other->awakables_.CancelAll(); |
| 136 } |
| 137 |
| 138 // Both ports are always closed by this call. |
| 139 int rv = node_controller_->MergeLocalPorts(port0, port1); |
| 140 return rv == ports::OK; |
| 141 } |
| 142 |
| 102 Dispatcher::Type MessagePipeDispatcher::GetType() const { | 143 Dispatcher::Type MessagePipeDispatcher::GetType() const { |
| 103 return Type::MESSAGE_PIPE; | 144 return Type::MESSAGE_PIPE; |
| 104 } | 145 } |
| 105 | 146 |
| 106 MojoResult MessagePipeDispatcher::Close() { | 147 MojoResult MessagePipeDispatcher::Close() { |
| 107 base::AutoLock lock(signal_lock_); | 148 base::AutoLock lock(signal_lock_); |
| 108 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ | 149 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
| 109 << " [port=" << port_.name() << "]"; | 150 << " [port=" << port_.name() << "]"; |
| 110 return CloseNoLock(); | 151 return CloseNoLock(); |
| 111 } | 152 } |
| 112 | 153 |
| 113 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, | 154 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, |
| 114 const Watcher::WatchCallback& callback, | 155 const Watcher::WatchCallback& callback, |
| 115 uintptr_t context) { | 156 uintptr_t context) { |
| 116 base::AutoLock lock(signal_lock_); | 157 base::AutoLock lock(signal_lock_); |
| 117 | 158 |
| 118 if (port_closed_ || in_transit_) | 159 if (port_closed_ || in_transit_ || is_fusing_) |
| 119 return MOJO_RESULT_INVALID_ARGUMENT; | 160 return MOJO_RESULT_INVALID_ARGUMENT; |
| 120 | 161 |
| 121 return awakables_.AddWatcher( | 162 return awakables_.AddWatcher( |
| 122 signals, callback, context, GetHandleSignalsStateNoLock()); | 163 signals, callback, context, GetHandleSignalsStateNoLock()); |
| 123 } | 164 } |
| 124 | 165 |
| 125 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | 166 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
| 126 base::AutoLock lock(signal_lock_); | 167 base::AutoLock lock(signal_lock_); |
| 127 | 168 |
| 128 if (port_closed_ || in_transit_) | 169 if (port_closed_ || in_transit_ || is_fusing_) |
| 129 return MOJO_RESULT_INVALID_ARGUMENT; | 170 return MOJO_RESULT_INVALID_ARGUMENT; |
| 130 | 171 |
| 131 return awakables_.RemoveWatcher(context); | 172 return awakables_.RemoveWatcher(context); |
| 132 } | 173 } |
| 133 | 174 |
| 134 MojoResult MessagePipeDispatcher::WriteMessage( | 175 MojoResult MessagePipeDispatcher::WriteMessage( |
| 135 const void* bytes, | 176 const void* bytes, |
| 136 uint32_t num_bytes, | 177 uint32_t num_bytes, |
| 137 const DispatcherInTransit* dispatchers, | 178 const DispatcherInTransit* dispatchers, |
| 138 uint32_t num_dispatchers, | 179 uint32_t num_dispatchers, |
| 139 MojoWriteMessageFlags flags) { | 180 MojoWriteMessageFlags flags) { |
| 140 | 181 |
| 141 { | 182 { |
| 142 base::AutoLock lock(signal_lock_); | 183 base::AutoLock lock(signal_lock_); |
| 143 if (port_closed_ || in_transit_) | 184 if (port_closed_ || in_transit_ || is_fusing_) |
| 144 return MOJO_RESULT_INVALID_ARGUMENT; | 185 return MOJO_RESULT_INVALID_ARGUMENT; |
| 145 } | 186 } |
| 146 | 187 |
| 147 // A structure for retaining information about every Dispatcher we're about | 188 // A structure for retaining information about every Dispatcher we're about |
| 148 // to send. This information is collected by calling StartSerialize() on | 189 // to send. This information is collected by calling StartSerialize() on |
| 149 // each dispatcher in sequence. | 190 // each dispatcher in sequence. |
| 150 struct DispatcherInfo { | 191 struct DispatcherInfo { |
| 151 uint32_t num_bytes; | 192 uint32_t num_bytes; |
| 152 uint32_t num_ports; | 193 uint32_t num_ports; |
| 153 uint32_t num_handles; | 194 uint32_t num_handles; |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 287 } | 328 } |
| 288 | 329 |
| 289 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, | 330 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
| 290 uint32_t* num_bytes, | 331 uint32_t* num_bytes, |
| 291 MojoHandle* handles, | 332 MojoHandle* handles, |
| 292 uint32_t* num_handles, | 333 uint32_t* num_handles, |
| 293 MojoReadMessageFlags flags) { | 334 MojoReadMessageFlags flags) { |
| 294 { | 335 { |
| 295 base::AutoLock lock(signal_lock_); | 336 base::AutoLock lock(signal_lock_); |
| 296 // We can't read from a port that's closed or in transit! | 337 // We can't read from a port that's closed or in transit! |
| 297 if (port_closed_ || in_transit_) | 338 if (port_closed_ || in_transit_ || is_fusing_) |
| 298 return MOJO_RESULT_INVALID_ARGUMENT; | 339 return MOJO_RESULT_INVALID_ARGUMENT; |
| 299 } | 340 } |
| 300 | 341 |
| 301 bool no_space = false; | 342 bool no_space = false; |
| 302 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; | 343 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; |
| 303 | 344 |
| 304 // Ensure the provided buffers are large enough to hold the next message. | 345 // Ensure the provided buffers are large enough to hold the next message. |
| 305 // GetMessageIf provides an atomic way to test the next message without | 346 // GetMessageIf provides an atomic way to test the next message without |
| 306 // committing to removing it from the port's underlying message queue until | 347 // committing to removing it from the port's underlying message queue until |
| 307 // we are sure we can consume it. | 348 // we are sure we can consume it. |
| (...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 442 return GetHandleSignalsStateNoLock(); | 483 return GetHandleSignalsStateNoLock(); |
| 443 } | 484 } |
| 444 | 485 |
| 445 MojoResult MessagePipeDispatcher::AddAwakable( | 486 MojoResult MessagePipeDispatcher::AddAwakable( |
| 446 Awakable* awakable, | 487 Awakable* awakable, |
| 447 MojoHandleSignals signals, | 488 MojoHandleSignals signals, |
| 448 uintptr_t context, | 489 uintptr_t context, |
| 449 HandleSignalsState* signals_state) { | 490 HandleSignalsState* signals_state) { |
| 450 base::AutoLock lock(signal_lock_); | 491 base::AutoLock lock(signal_lock_); |
| 451 | 492 |
| 452 if (port_closed_ || in_transit_) { | 493 if (port_closed_ || in_transit_ || is_fusing_) { |
| 453 if (signals_state) | 494 if (signals_state) |
| 454 *signals_state = HandleSignalsState(); | 495 *signals_state = HandleSignalsState(); |
| 455 return MOJO_RESULT_INVALID_ARGUMENT; | 496 return MOJO_RESULT_INVALID_ARGUMENT; |
| 456 } | 497 } |
| 457 | 498 |
| 458 HandleSignalsState state = GetHandleSignalsStateNoLock(); | 499 HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| 459 | 500 |
| 460 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint " | 501 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint " |
| 461 << endpoint_ << " [awakable=" << awakable << "; port=" | 502 << endpoint_ << " [awakable=" << awakable << "; port=" |
| 462 << port_.name() << "; signals=" << signals << "; satisfied=" | 503 << port_.name() << "; signals=" << signals << "; satisfied=" |
| (...skipping 17 matching lines...) Expand all Loading... |
| 480 << endpoint_ << " [awakable=" << awakable << "; port=" | 521 << endpoint_ << " [awakable=" << awakable << "; port=" |
| 481 << port_.name() << "; signals=" << signals << "]"; | 522 << port_.name() << "; signals=" << signals << "]"; |
| 482 | 523 |
| 483 awakables_.Add(awakable, signals, context); | 524 awakables_.Add(awakable, signals, context); |
| 484 return MOJO_RESULT_OK; | 525 return MOJO_RESULT_OK; |
| 485 } | 526 } |
| 486 | 527 |
| 487 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable, | 528 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable, |
| 488 HandleSignalsState* signals_state) { | 529 HandleSignalsState* signals_state) { |
| 489 base::AutoLock lock(signal_lock_); | 530 base::AutoLock lock(signal_lock_); |
| 490 if (port_closed_ || in_transit_) { | 531 if (port_closed_ || in_transit_ || is_fusing_) { |
| 491 if (signals_state) | 532 if (signals_state) |
| 492 *signals_state = HandleSignalsState(); | 533 *signals_state = HandleSignalsState(); |
| 493 } else if (signals_state) { | 534 } else if (signals_state) { |
| 494 *signals_state = GetHandleSignalsStateNoLock(); | 535 *signals_state = GetHandleSignalsStateNoLock(); |
| 495 } | 536 } |
| 496 | 537 |
| 497 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint " | 538 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint " |
| 498 << endpoint_ << " [awakable=" << awakable << "; port=" | 539 << endpoint_ << " [awakable=" << awakable << "; port=" |
| 499 << port_.name() << "]"; | 540 << port_.name() << "]"; |
| 500 | 541 |
| (...skipping 14 matching lines...) Expand all Loading... |
| 515 SerializedState* state = static_cast<SerializedState*>(destination); | 556 SerializedState* state = static_cast<SerializedState*>(destination); |
| 516 state->pipe_id = pipe_id_; | 557 state->pipe_id = pipe_id_; |
| 517 state->endpoint = static_cast<int8_t>(endpoint_); | 558 state->endpoint = static_cast<int8_t>(endpoint_); |
| 518 memset(state->padding, 0, sizeof(state->padding)); | 559 memset(state->padding, 0, sizeof(state->padding)); |
| 519 ports[0] = port_.name(); | 560 ports[0] = port_.name(); |
| 520 return true; | 561 return true; |
| 521 } | 562 } |
| 522 | 563 |
| 523 bool MessagePipeDispatcher::BeginTransit() { | 564 bool MessagePipeDispatcher::BeginTransit() { |
| 524 base::AutoLock lock(signal_lock_); | 565 base::AutoLock lock(signal_lock_); |
| 525 if (in_transit_ || port_closed_) | 566 if (in_transit_ || port_closed_ || is_fusing_) |
| 526 return false; | 567 return false; |
| 527 in_transit_ = true; | 568 in_transit_ = true; |
| 528 return in_transit_; | 569 return in_transit_; |
| 529 } | 570 } |
| 530 | 571 |
| 531 void MessagePipeDispatcher::CompleteTransitAndClose() { | 572 void MessagePipeDispatcher::CompleteTransitAndClose() { |
| 532 node_controller_->SetPortObserver(port_, nullptr); | 573 node_controller_->SetPortObserver(port_, nullptr); |
| 533 | 574 |
| 534 base::AutoLock lock(signal_lock_); | 575 base::AutoLock lock(signal_lock_); |
| 535 in_transit_ = false; | 576 in_transit_ = false; |
| (...skipping 30 matching lines...) Expand all Loading... |
| 566 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, | 607 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, |
| 567 state->pipe_id, state->endpoint); | 608 state->pipe_id, state->endpoint); |
| 568 } | 609 } |
| 569 | 610 |
| 570 MessagePipeDispatcher::~MessagePipeDispatcher() { | 611 MessagePipeDispatcher::~MessagePipeDispatcher() { |
| 571 DCHECK(port_closed_ && !in_transit_); | 612 DCHECK(port_closed_ && !in_transit_); |
| 572 } | 613 } |
| 573 | 614 |
| 574 MojoResult MessagePipeDispatcher::CloseNoLock() { | 615 MojoResult MessagePipeDispatcher::CloseNoLock() { |
| 575 signal_lock_.AssertAcquired(); | 616 signal_lock_.AssertAcquired(); |
| 576 if (port_closed_ || in_transit_) | 617 if (port_closed_ || in_transit_ || is_fusing_) |
| 577 return MOJO_RESULT_INVALID_ARGUMENT; | 618 return MOJO_RESULT_INVALID_ARGUMENT; |
| 578 | 619 |
| 579 port_closed_ = true; | 620 port_closed_ = true; |
| 580 awakables_.CancelAll(); | 621 awakables_.CancelAll(); |
| 581 | 622 |
| 582 if (!port_transferred_) { | 623 if (!port_transferred_) { |
| 583 base::AutoUnlock unlock(signal_lock_); | 624 base::AutoUnlock unlock(signal_lock_); |
| 584 node_controller_->ClosePort(port_); | 625 node_controller_->ClosePort(port_); |
| 585 } | 626 } |
| 586 | 627 |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 643 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ | 684 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ |
| 644 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; | 685 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; |
| 645 } | 686 } |
| 646 #endif | 687 #endif |
| 647 | 688 |
| 648 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 689 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| 649 } | 690 } |
| 650 | 691 |
| 651 } // namespace edk | 692 } // namespace edk |
| 652 } // namespace mojo | 693 } // namespace mojo |
| OLD | NEW |