Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(217)

Side by Side Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1748503002: [mojo-edk] Add MojoWatch and MojoCancelWatch APIs (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/message_pipe_dispatcher.h" 5 #include "mojo/edk/system/message_pipe_dispatcher.h"
6 6
7 #include <limits> 7 #include <limits>
8 8
9 #include "base/macros.h" 9 #include "base/macros.h"
10 #include "base/memory/ref_counted.h" 10 #include "base/memory/ref_counted.h"
(...skipping 72 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 83
84 node_controller_->SetPortObserver( 84 node_controller_->SetPortObserver(
85 port_, 85 port_,
86 make_scoped_refptr(new PortObserverThunk(this))); 86 make_scoped_refptr(new PortObserverThunk(this)));
87 } 87 }
88 88
89 Dispatcher::Type MessagePipeDispatcher::GetType() const { 89 Dispatcher::Type MessagePipeDispatcher::GetType() const {
90 return Type::MESSAGE_PIPE; 90 return Type::MESSAGE_PIPE;
91 } 91 }
92 92
93 MojoResult MessagePipeDispatcher::Close() { 93 MojoResult MessagePipeDispatcher::Close(RequestContext* request_context) {
94 base::AutoLock lock(signal_lock_); 94 base::AutoLock lock(signal_lock_);
95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ 95 DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
96 << " [port=" << port_.name() << "]"; 96 << " [port=" << port_.name() << "]";
97 return CloseNoLock(); 97 return CloseNoLock(request_context);
98 }
99
100 MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
101 const WatchCallback& callback,
102 uintptr_t context,
103 RequestContext* request_context) {
104 base::AutoLock lock(signal_lock_);
105
106 if (port_closed_ || in_transit_)
107 return MOJO_RESULT_INVALID_ARGUMENT;
108
109 HandleSignalsState state = GetHandleSignalsStateNoLock();
110 return watchers_.Add(signals, callback, context, state, request_context);
111 }
112
113 MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
114 base::AutoLock lock(signal_lock_);
115
116 if (port_closed_ || in_transit_)
117 return MOJO_RESULT_INVALID_ARGUMENT;
118
119 return watchers_.Remove(context);
98 } 120 }
99 121
100 MojoResult MessagePipeDispatcher::WriteMessage( 122 MojoResult MessagePipeDispatcher::WriteMessage(
101 const void* bytes, 123 const void* bytes,
102 uint32_t num_bytes, 124 uint32_t num_bytes,
103 const DispatcherInTransit* dispatchers, 125 const DispatcherInTransit* dispatchers,
104 uint32_t num_dispatchers, 126 uint32_t num_dispatchers,
105 MojoWriteMessageFlags flags) { 127 MojoWriteMessageFlags flags,
128 RequestContext* request_context) {
106 129
107 { 130 {
108 base::AutoLock lock(signal_lock_); 131 base::AutoLock lock(signal_lock_);
109 if (port_closed_ || in_transit_) 132 if (port_closed_ || in_transit_)
110 return MOJO_RESULT_INVALID_ARGUMENT; 133 return MOJO_RESULT_INVALID_ARGUMENT;
111 } 134 }
112 135
113 // A structure for retaining information about every Dispatcher we're about 136 // A structure for retaining information about every Dispatcher we're about
114 // to send. This information is collected by calling StartSerialize() on 137 // to send. This information is collected by calling StartSerialize() on
115 // each dispatcher in sequence. 138 // each dispatcher in sequence.
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
219 << " [port=" << port_.name() << "; rv=" << rv 242 << " [port=" << port_.name() << "; rv=" << rv
220 << "; num_bytes=" << num_bytes << "]"; 243 << "; num_bytes=" << num_bytes << "]";
221 244
222 if (rv != ports::OK) { 245 if (rv != ports::OK) {
223 if (rv == ports::ERROR_PORT_UNKNOWN || 246 if (rv == ports::ERROR_PORT_UNKNOWN ||
224 rv == ports::ERROR_PORT_STATE_UNEXPECTED || 247 rv == ports::ERROR_PORT_STATE_UNEXPECTED ||
225 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) { 248 rv == ports::ERROR_PORT_CANNOT_SEND_PEER) {
226 result = MOJO_RESULT_INVALID_ARGUMENT; 249 result = MOJO_RESULT_INVALID_ARGUMENT;
227 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) { 250 } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
228 base::AutoLock lock(signal_lock_); 251 base::AutoLock lock(signal_lock_);
229 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 252 NotifyObserversForStateChangeNoLock(request_context);
230 result = MOJO_RESULT_FAILED_PRECONDITION; 253 result = MOJO_RESULT_FAILED_PRECONDITION;
231 } else { 254 } else {
232 NOTREACHED(); 255 NOTREACHED();
233 result = MOJO_RESULT_UNKNOWN; 256 result = MOJO_RESULT_UNKNOWN;
234 } 257 }
235 cancel_transit = true; 258 cancel_transit = true;
236 } else { 259 } else {
237 DCHECK(!message); 260 DCHECK(!message);
238 } 261 }
239 } 262 }
240 263
241 if (cancel_transit) { 264 if (cancel_transit) {
242 // We ended up not sending the message. Release all the platform handles. 265 // We ended up not sending the message. Release all the platform handles.
243 // Their dipatchers retain ownership when transit is canceled, so these are 266 // Their dipatchers retain ownership when transit is canceled, so these are
244 // not actually leaking. 267 // not actually leaking.
245 DCHECK(message); 268 DCHECK(message);
246 Channel::MessagePtr m = message->TakeChannelMessage(); 269 Channel::MessagePtr m = message->TakeChannelMessage();
247 ScopedPlatformHandleVectorPtr handles = m->TakeHandles(); 270 ScopedPlatformHandleVectorPtr handles = m->TakeHandles();
248 if (handles) 271 if (handles)
249 handles->clear(); 272 handles->clear();
250 } 273 }
251 274
252 return result; 275 return result;
253 } 276 }
254 277
255 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, 278 MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
256 uint32_t* num_bytes, 279 uint32_t* num_bytes,
257 MojoHandle* handles, 280 MojoHandle* handles,
258 uint32_t* num_handles, 281 uint32_t* num_handles,
259 MojoReadMessageFlags flags) { 282 MojoReadMessageFlags flags,
283 RequestContext* request_context) {
260 { 284 {
261 base::AutoLock lock(signal_lock_); 285 base::AutoLock lock(signal_lock_);
262 // We can't read from a port that's closed or in transit! 286 // We can't read from a port that's closed or in transit!
263 if (port_closed_ || in_transit_) 287 if (port_closed_ || in_transit_)
264 return MOJO_RESULT_INVALID_ARGUMENT; 288 return MOJO_RESULT_INVALID_ARGUMENT;
265 } 289 }
266 290
267 bool no_space = false; 291 bool no_space = false;
268 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD; 292 bool may_discard = flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD;
269 293
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
329 353
330 if (!ports_message) { 354 if (!ports_message) {
331 // No message was available in queue. 355 // No message was available in queue.
332 356
333 if (rv == ports::OK) 357 if (rv == ports::OK)
334 return MOJO_RESULT_SHOULD_WAIT; 358 return MOJO_RESULT_SHOULD_WAIT;
335 359
336 // Peer is closed and there are no more messages to read. 360 // Peer is closed and there are no more messages to read.
337 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); 361 DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
338 base::AutoLock lock(signal_lock_); 362 base::AutoLock lock(signal_lock_);
339 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 363 NotifyObserversForStateChangeNoLock(request_context);
340 return MOJO_RESULT_FAILED_PRECONDITION; 364 return MOJO_RESULT_FAILED_PRECONDITION;
341 } 365 }
342 366
343 // Alright! We have a message and the caller has provided sufficient storage 367 // Alright! We have a message and the caller has provided sufficient storage
344 // in which to receive it. 368 // in which to receive it.
345 369
346 scoped_ptr<PortsMessage> message( 370 scoped_ptr<PortsMessage> message(
347 static_cast<PortsMessage*>(ports_message.release())); 371 static_cast<PortsMessage*>(ports_message.release()));
348 372
349 const MessageHeader* header = 373 const MessageHeader* header =
(...skipping 128 matching lines...) Expand 10 before | Expand all | Expand 10 after
478 bool MessagePipeDispatcher::EndSerialize(void* destination, 502 bool MessagePipeDispatcher::EndSerialize(void* destination,
479 ports::PortName* ports, 503 ports::PortName* ports,
480 PlatformHandle* handles) { 504 PlatformHandle* handles) {
481 SerializedState* state = static_cast<SerializedState*>(destination); 505 SerializedState* state = static_cast<SerializedState*>(destination);
482 state->pipe_id = pipe_id_; 506 state->pipe_id = pipe_id_;
483 state->endpoint = static_cast<int8_t>(endpoint_); 507 state->endpoint = static_cast<int8_t>(endpoint_);
484 ports[0] = port_.name(); 508 ports[0] = port_.name();
485 return true; 509 return true;
486 } 510 }
487 511
488 bool MessagePipeDispatcher::BeginTransit() { 512 bool MessagePipeDispatcher::BeginTransit(RequestContext* request_context) {
489 base::AutoLock lock(signal_lock_); 513 base::AutoLock lock(signal_lock_);
490 if (in_transit_ || port_closed_) 514 if (in_transit_ || port_closed_)
491 return false; 515 return false;
492 in_transit_ = true; 516 in_transit_ = true;
493 return in_transit_; 517 return in_transit_;
494 } 518 }
495 519
496 void MessagePipeDispatcher::CompleteTransitAndClose() { 520 void MessagePipeDispatcher::CompleteTransitAndClose(
521 RequestContext* request_context) {
497 node_controller_->SetPortObserver(port_, nullptr); 522 node_controller_->SetPortObserver(port_, nullptr);
498 523
499 base::AutoLock lock(signal_lock_); 524 base::AutoLock lock(signal_lock_);
500 in_transit_ = false; 525 in_transit_ = false;
501 port_transferred_ = true; 526 port_transferred_ = true;
502 CloseNoLock(); 527 CloseNoLock(request_context);
503 } 528 }
504 529
505 void MessagePipeDispatcher::CancelTransit() { 530 void MessagePipeDispatcher::CancelTransit(RequestContext* request_context) {
506 base::AutoLock lock(signal_lock_); 531 base::AutoLock lock(signal_lock_);
507 in_transit_ = false; 532 in_transit_ = false;
508 533
509 // Something may have happened while we were waiting for potential transit. 534 // Something may have happened while we were waiting for potential transit.
510 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 535 NotifyObserversForStateChangeNoLock(request_context);
511 } 536 }
512 537
513 // static 538 // static
514 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize( 539 scoped_refptr<Dispatcher> MessagePipeDispatcher::Deserialize(
515 const void* data, 540 const void* data,
516 size_t num_bytes, 541 size_t num_bytes,
517 const ports::PortName* ports, 542 const ports::PortName* ports,
518 size_t num_ports, 543 size_t num_ports,
519 PlatformHandle* handles, 544 PlatformHandle* handles,
520 size_t num_handles) { 545 size_t num_handles) {
521 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState)) 546 if (num_ports != 1 || num_handles || num_bytes != sizeof(SerializedState))
522 return nullptr; 547 return nullptr;
523 548
524 const SerializedState* state = static_cast<const SerializedState*>(data); 549 const SerializedState* state = static_cast<const SerializedState*>(data);
525 550
526 ports::PortRef port; 551 ports::PortRef port;
527 CHECK_EQ( 552 CHECK_EQ(
528 ports::OK, 553 ports::OK,
529 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port)); 554 internal::g_core->GetNodeController()->node()->GetPort(ports[0], &port));
530 555
531 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port, 556 return new MessagePipeDispatcher(internal::g_core->GetNodeController(), port,
532 state->pipe_id, state->endpoint); 557 state->pipe_id, state->endpoint);
533 } 558 }
534 559
535 MessagePipeDispatcher::~MessagePipeDispatcher() { 560 MessagePipeDispatcher::~MessagePipeDispatcher() {
536 DCHECK(port_closed_ && !in_transit_); 561 DCHECK(port_closed_ && !in_transit_);
537 } 562 }
538 563
539 MojoResult MessagePipeDispatcher::CloseNoLock() { 564 MojoResult MessagePipeDispatcher::CloseNoLock(RequestContext* request_context) {
540 signal_lock_.AssertAcquired(); 565 signal_lock_.AssertAcquired();
541 if (port_closed_ || in_transit_) 566 if (port_closed_ || in_transit_)
542 return MOJO_RESULT_INVALID_ARGUMENT; 567 return MOJO_RESULT_INVALID_ARGUMENT;
543 568
544 port_closed_ = true; 569 port_closed_ = true;
570 watchers_.CancelAll(request_context);
545 awakables_.CancelAll(); 571 awakables_.CancelAll();
546 572
547 if (!port_transferred_) { 573 if (!port_transferred_) {
548 base::AutoUnlock unlock(signal_lock_); 574 base::AutoUnlock unlock(signal_lock_);
549 node_controller_->ClosePort(port_); 575 node_controller_->ClosePort(port_);
550 } 576 }
551 577
552 return MOJO_RESULT_OK; 578 return MOJO_RESULT_OK;
553 } 579 }
554 580
(...skipping 17 matching lines...) Expand all
572 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; 598 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
573 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; 599 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
574 } else { 600 } else {
575 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 601 rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
576 } 602 }
577 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED; 603 rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
578 return rv; 604 return rv;
579 } 605 }
580 606
581 void MessagePipeDispatcher::OnPortStatusChanged() { 607 void MessagePipeDispatcher::OnPortStatusChanged() {
608 // Create a new RequestContext that will run its finalizers after
609 // |signal_lock_| is released.
610 RequestContext request_context;
611
582 base::AutoLock lock(signal_lock_); 612 base::AutoLock lock(signal_lock_);
583 613
584 // We stop observing our port as soon as it's transferred, but this can race 614 // We stop observing our port as soon as it's transferred, but this can race
585 // with events which are raised right before that happens. This is fine to 615 // with events which are raised right before that happens. This is fine to
586 // ignore. 616 // ignore.
587 if (port_transferred_) 617 if (port_transferred_)
588 return; 618 return;
589 619
590 #if !defined(NDEBUG) 620 #if !defined(NDEBUG)
591 ports::PortStatus port_status; 621 ports::PortStatus port_status;
592 node_controller_->node()->GetStatus(port_, &port_status); 622 node_controller_->node()->GetStatus(port_, &port_status);
593 if (port_status.has_messages) { 623 if (port_status.has_messages) {
594 ports::ScopedMessage unused; 624 ports::ScopedMessage unused;
595 size_t message_size = 0; 625 size_t message_size = 0;
596 node_controller_->node()->GetMessageIf( 626 node_controller_->node()->GetMessageIf(
597 port_, [&message_size](const ports::Message& message) { 627 port_, [&message_size](const ports::Message& message) {
598 message_size = message.num_payload_bytes(); 628 message_size = message.num_payload_bytes();
599 return false; 629 return false;
600 }, &unused); 630 }, &unused);
601 DVLOG(1) << "New message detected on message pipe " << pipe_id_ 631 DVLOG(1) << "New message detected on message pipe " << pipe_id_
602 << " endpoint " << endpoint_ << " [port=" << port_.name() 632 << " endpoint " << endpoint_ << " [port=" << port_.name()
603 << "; size=" << message_size << "]"; 633 << "; size=" << message_size << "]";
604 } 634 }
605 if (port_status.peer_closed) { 635 if (port_status.peer_closed) {
606 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_ 636 DVLOG(1) << "Peer closure detected on message pipe " << pipe_id_
607 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]"; 637 << " endpoint " << endpoint_ << " [port=" << port_.name() << "]";
608 } 638 }
609 #endif 639 #endif
610 640
611 awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); 641 NotifyObserversForStateChangeNoLock(&request_context);
642 }
643
644 void MessagePipeDispatcher::NotifyObserversForStateChangeNoLock(
645 RequestContext* request_context) {
646 HandleSignalsState state = GetHandleSignalsStateNoLock();
647 awakables_.AwakeForStateChange(state);
648 watchers_.NotifyOfStateChange(state, request_context);
612 } 649 }
613 650
614 } // namespace edk 651 } // namespace edk
615 } // namespace mojo 652 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698