Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(251)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1785843002: [mojo] Implement pipe fusion API (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/macros.h" 9 #include "base/macros.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 pipe_id_(pipe_id), 92 pipe_id_(pipe_id),
93 endpoint_(endpoint) { 93 endpoint_(endpoint) {
94 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() 94 DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
95 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; 95 << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
96 96
97 node_controller_->SetPortObserver( 97 node_controller_->SetPortObserver(
98 port_, 98 port_,
99 make_scoped_refptr(new PortObserverThunk(this))); 99 make_scoped_refptr(new PortObserverThunk(this)));
100 } 100 }
101 101
102 bool MessagePipeDispatcher::BeginFuse() {
103 base::AutoLock lock(signal_lock_);
104 if (port_closed_ || in_transit_ || is_fusing_)
105 return false;
106 is_fusing_ = true;
107 return true;
108 }
109
110 void MessagePipeDispatcher::CancelFuse() {
111 base::AutoLock lock(signal_lock_);
112 CHECK(is_fusing_);
113 is_fusing_ = false;
114 }
115
116 bool MessagePipeDispatcher::CompleteFuse(MessagePipeDispatcher* other) {
117 node_controller_->SetPortObserver(port_, nullptr);
118 node_controller_->SetPortObserver(other->port_, nullptr);
119
120 ports::PortRef port0;
121 {
122 base::AutoLock lock(signal_lock_);
123 CHECK(is_fusing_);
124 port0 = port_;
125 port_closed_ = true;
126 awakables_.CancelAll();
127 }
128
129 ports::PortRef port1;
130 {
131 base::AutoLock lock(other->signal_lock_);
132 CHECK(other->is_fusing_);
133 port1 = other->port_;
134 other->port_closed_ = true;
135 other->awakables_.CancelAll();
136 }
137
138 // Both ports are always closed by this call.
139 int rv = node_controller_->MergeLocalPorts(port0, port1);
140 return rv == ports::OK;
141 }
142
102 Dispatcher::Type MessagePipeDispatcher::GetType() const { 143 Dispatcher::Type MessagePipeDispatcher::GetType() const {
103 return Type::MESSAGE_PIPE; 144 return Type::MESSAGE_PIPE;
104 } 145 }
105 146
106 MojoResult MessagePipeDispatcher::Close() { 147 MojoResult MessagePipeDispatcher::Close() {
107 base::AutoLock lock(signal_lock_); 148 base::AutoLock lock(signal_lock_);
108 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ 149 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
109 << " [port=" << port_.name() << "]"; 150 << " [port=" << port_.name() << "]";
110 return CloseNoLock(); 151 return CloseNoLock();
111 } 152 }
112 153
113 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, 154 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
114 const Watcher::WatchCallback& callback, 155 const Watcher::WatchCallback& callback,
115 uintptr_t context) { 156 uintptr_t context) {
116 base::AutoLock lock(signal_lock_); 157 base::AutoLock lock(signal_lock_);
117 158
118 if (port_closed_ || in_transit_) 159 if (port_closed_ || in_transit_ || is_fusing_)
119 return MOJO_RESULT_INVALID_ARGUMENT; 160 return MOJO_RESULT_INVALID_ARGUMENT;
120 161
121 return awakables_.AddWatcher( 162 return awakables_.AddWatcher(
122 signals, callback, context, GetHandleSignalsStateNoLock()); 163 signals, callback, context, GetHandleSignalsStateNoLock());
123 } 164 }
124 165
125 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { 166 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
126 base::AutoLock lock(signal_lock_); 167 base::AutoLock lock(signal_lock_);
127 168
128 if (port_closed_ || in_transit_) 169 if (port_closed_ || in_transit_ || is_fusing_)
129 return MOJO_RESULT_INVALID_ARGUMENT; 170 return MOJO_RESULT_INVALID_ARGUMENT;
130 171
131 return awakables_.RemoveWatcher(context); 172 return awakables_.RemoveWatcher(context);
132 } 173 }
133 174
134 MojoResult MessagePipeDispatcher::WriteMessage( 175 MojoResult MessagePipeDispatcher::WriteMessage(
135 const void* bytes, 176 const void* bytes,
136 uint32_t num_bytes, 177 uint32_t num_bytes,
137 const DispatcherInTransit* dispatchers, 178 const DispatcherInTransit* dispatchers,
138 uint32_t num_dispatchers, 179 uint32_t num_dispatchers,
139 MojoWriteMessageFlags flags) { 180 MojoWriteMessageFlags flags) {
140 181
141 { 182 {
142 base::AutoLock lock(signal_lock_); 183 base::AutoLock lock(signal_lock_);
143 if (port_closed_ || in_transit_) 184 if (port_closed_ || in_transit_ || is_fusing_)
144 return MOJO_RESULT_INVALID_ARGUMENT; 185 return MOJO_RESULT_INVALID_ARGUMENT;
145 } 186 }
146 187
147 // A structure for retaining information about every Dispatcher we're about 188 // A structure for retaining information about every Dispatcher we're about
148 // to send. This information is collected by calling StartSerialize() on 189 // to send. This information is collected by calling StartSerialize() on
149 // each dispatcher in sequence. 190 // each dispatcher in sequence.
150 struct DispatcherInfo { 191 struct DispatcherInfo {
151 uint32_t num_bytes; 192 uint32_t num_bytes;
152 uint32_t num_ports; 193 uint32_t num_ports;
153 uint32_t num_handles; 194 uint32_t num_handles;
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
287 } 328 }
288 329
289 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, 330 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
290 uint32_t* num_bytes, 331 uint32_t* num_bytes,
291 MojoHandle* handles, 332 MojoHandle* handles,
292 uint32_t* num_handles, 333 uint32_t* num_handles,
293 MojoReadMessageFlags flags) { 334 MojoReadMessageFlags flags) {
294 { 335 {
295 base::AutoLock lock(signal_lock_); 336 base::AutoLock lock(signal_lock_);
296 // We can't read from a port that's closed or in transit! 337 // We can't read from a port that's closed or in transit!
297 if (port_closed_ || in_transit_) 338 if (port_closed_ || in_transit_ || is_fusing_)
298 return MOJO_RESULT_INVALID_ARGUMENT; 339 return MOJO_RESULT_INVALID_ARGUMENT;
299 } 340 }
300 341
301 bool no_space = false; 342 bool no_space = false;
302 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; 343 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
303 344
304 // Ensure the provided buffers are large enough to hold the next message. 345 // Ensure the provided buffers are large enough to hold the next message.
305 // GetMessageIf provides an atomic way to test the next message without 346 // GetMessageIf provides an atomic way to test the next message without
306 // committing to removing it from the port's underlying message queue until 347 // committing to removing it from the port's underlying message queue until
307 // we are sure we can consume it. 348 // we are sure we can consume it.
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after
442 return GetHandleSignalsStateNoLock(); 483 return GetHandleSignalsStateNoLock();
443 } 484 }
444 485
445 MojoResult MessagePipeDispatcher::AddAwakable( 486 MojoResult MessagePipeDispatcher::AddAwakable(
446 Awakable* awakable, 487 Awakable* awakable,
447 MojoHandleSignals signals, 488 MojoHandleSignals signals,
448 uintptr_t context, 489 uintptr_t context,
449 HandleSignalsState* signals_state) { 490 HandleSignalsState* signals_state) {
450 base::AutoLock lock(signal_lock_); 491 base::AutoLock lock(signal_lock_);
451 492
452 if (port_closed_ || in_transit_) { 493 if (port_closed_ || in_transit_ || is_fusing_) {
453 if (signals_state) 494 if (signals_state)
454 *signals_state = HandleSignalsState(); 495 *signals_state = HandleSignalsState();
455 return MOJO_RESULT_INVALID_ARGUMENT; 496 return MOJO_RESULT_INVALID_ARGUMENT;
456 } 497 }
457 498
458 HandleSignalsState state = GetHandleSignalsStateNoLock(); 499 HandleSignalsState state = GetHandleSignalsStateNoLock();
459 500
460 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint " 501 DVLOG(2) << "Getting signal state for pipe " << pipe_id_ << " endpoint "
461 << endpoint_ << " [awakable=" << awakable << "; port=" 502 << endpoint_ << " [awakable=" << awakable << "; port="
462 << port_.name() << "; signals=" << signals << "; satisfied=" 503 << port_.name() << "; signals=" << signals << "; satisfied="
(...skipping 17 matching lines...) Expand all
480 << endpoint_ << " [awakable=" << awakable << "; port=" 521 << endpoint_ << " [awakable=" << awakable << "; port="
481 << port_.name() << "; signals=" << signals << "]"; 522 << port_.name() << "; signals=" << signals << "]";
482 523
483 awakables_.Add(awakable, signals, context); 524 awakables_.Add(awakable, signals, context);
484 return MOJO_RESULT_OK; 525 return MOJO_RESULT_OK;
485 } 526 }
486 527
487 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable, 528 void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
488 HandleSignalsState* signals_state) { 529 HandleSignalsState* signals_state) {
489 base::AutoLock lock(signal_lock_); 530 base::AutoLock lock(signal_lock_);
490 if (port_closed_ || in_transit_) { 531 if (port_closed_ || in_transit_ || is_fusing_) {
491 if (signals_state) 532 if (signals_state)
492 *signals_state = HandleSignalsState(); 533 *signals_state = HandleSignalsState();
493 } else if (signals_state) { 534 } else if (signals_state) {
494 *signals_state = GetHandleSignalsStateNoLock(); 535 *signals_state = GetHandleSignalsStateNoLock();
495 } 536 }
496 537
497 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint " 538 DVLOG(2) << "Removing awakable from pipe " << pipe_id_ << " endpoint "
498 << endpoint_ << " [awakable=" << awakable << "; port=" 539 << endpoint_ << " [awakable=" << awakable << "; port="
499 << port_.name() << "]"; 540 << port_.name() << "]";
500 541
(...skipping 14 matching lines...) Expand all
515 SerializedState* state = static_cast<SerializedState*>(destination); 556 SerializedState* state = static_cast<SerializedState*>(destination);
516 state->pipe_id = pipe_id_; 557 state->pipe_id = pipe_id_;
517 state->endpoint = static_cast<int8_t>(endpoint_); 558 state->endpoint = static_cast<int8_t>(endpoint_);
518 memset(state->padding, 0, sizeof(state->padding)); 559 memset(state->padding, 0, sizeof(state->padding));
519 ports[0] = port_.name(); 560 ports[0] = port_.name();
520 return true; 561 return true;
521 } 562 }
522 563
523 bool MessagePipeDispatcher::BeginTransit() { 564 bool MessagePipeDispatcher::BeginTransit() {
524 base::AutoLock lock(signal_lock_); 565 base::AutoLock lock(signal_lock_);
525 if (in_transit_ || port_closed_) 566 if (in_transit_ || port_closed_ || is_fusing_)
526 return false; 567 return false;
527 in_transit_ = true; 568 in_transit_ = true;
528 return in_transit_; 569 return in_transit_;
529 } 570 }
530 571
531 void MessagePipeDispatcher::CompleteTransitAndClose() { 572 void MessagePipeDispatcher::CompleteTransitAndClose() {
532 node_controller_->SetPortObserver(port_, nullptr); 573 node_controller_->SetPortObserver(port_, nullptr);
533 574
534 base::AutoLock lock(signal_lock_); 575 base::AutoLock lock(signal_lock_);
535 in_transit_ = false; 576 in_transit_ = false;
(...skipping 30 matching lines...) Expand all
566 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, 607 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
567 state->pipe_id, state->endpoint); 608 state->pipe_id, state->endpoint);
568 } 609 }
569 610
570 MessagePipeDispatcher::~MessagePipeDispatcher() { 611 MessagePipeDispatcher::~MessagePipeDispatcher() {
571 DCHECK(port_closed_ && !in_transit_); 612 DCHECK(port_closed_ && !in_transit_);
572 } 613 }
573 614
574 MojoResult MessagePipeDispatcher::CloseNoLock() { 615 MojoResult MessagePipeDispatcher::CloseNoLock() {
575 signal_lock_.AssertAcquired(); 616 signal_lock_.AssertAcquired();
576 if (port_closed_ || in_transit_) 617 if (port_closed_ || in_transit_ || is_fusing_)
577 return MOJO_RESULT_INVALID_ARGUMENT; 618 return MOJO_RESULT_INVALID_ARGUMENT;
578 619
579 port_closed_ = true; 620 port_closed_ = true;
580 awakables_.CancelAll(); 621 awakables_.CancelAll();
581 622
582 if (!port_transferred_) { 623 if (!port_transferred_) {
583 base::AutoUnlock unlock(signal_lock_); 624 base::AutoUnlock unlock(signal_lock_);
584 node_controller_->ClosePort(port_); 625 node_controller_->ClosePort(port_);
585 } 626 }
586 627
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
643 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ 684 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_
644 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; 685 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
645 } 686 }
646 #endif 687 #endif
647 688
648 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 689 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
649 } 690 }
650 691
651 } // namespace edk 692 } // namespace edk
652 } // namespace mojo 693 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698