| Index: mojo/edk/system/message_pipe_dispatcher.cc
|
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
|
| index c7b4a32da6bad0e0cbea48ddc8b3e60683a01966..2406fe9a66a5135ccdb2392af5f3c7038479eebc 100644
|
| --- a/mojo/edk/system/message_pipe_dispatcher.cc
|
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc
|
| @@ -90,11 +90,33 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
|
| return Type::MESSAGE_PIPE;
|
| }
|
|
|
| -MojoResult MessagePipeDispatcher::Close() {
|
| +MojoResult MessagePipeDispatcher::Close(RequestContext* request_context) {
|
| base::AutoLock lock(signal_lock_);
|
| DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
|
| << " [port=" << port_.name() << "]";
|
| - return CloseNoLock();
|
| + return CloseNoLock(request_context);
|
| +}
|
| +
|
| +MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
|
| + const WatchCallback& callback,
|
| + uintptr_t context,
|
| + RequestContext* request_context) {
|
| + base::AutoLock lock(signal_lock_);
|
| +
|
| + if (port_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + return watchers_.Add(signals, callback, context, state, request_context);
|
| +}
|
| +
|
| +MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
|
| + base::AutoLock lock(signal_lock_);
|
| +
|
| + if (port_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + return watchers_.Remove(context);
|
| }
|
|
|
| MojoResult MessagePipeDispatcher::WriteMessage(
|
| @@ -102,7 +124,8 @@ MojoResult MessagePipeDispatcher::WriteMessage(
|
| uint32_t num_bytes,
|
| const DispatcherInTransit* dispatchers,
|
| uint32_t num_dispatchers,
|
| - MojoWriteMessageFlags flags) {
|
| + MojoWriteMessageFlags flags,
|
| + RequestContext* request_context) {
|
|
|
| {
|
| base::AutoLock lock(signal_lock_);
|
| @@ -226,7 +249,7 @@ MojoResult MessagePipeDispatcher::WriteMessage(
|
| result = MOJO_RESULT_INVALID_ARGUMENT;
|
| } else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
|
| base::AutoLock lock(signal_lock_);
|
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + NotifyObserversForStateChangeNoLock(request_context);
|
| result = MOJO_RESULT_FAILED_PRECONDITION;
|
| } else {
|
| NOTREACHED();
|
| @@ -256,7 +279,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| uint32_t* num_bytes,
|
| MojoHandle* handles,
|
| uint32_t* num_handles,
|
| - MojoReadMessageFlags flags) {
|
| + MojoReadMessageFlags flags,
|
| + RequestContext* request_context) {
|
| {
|
| base::AutoLock lock(signal_lock_);
|
| // We can't read from a port that's closed or in transit!
|
| @@ -336,7 +360,7 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
|
| // Peer is closed and there are no more messages to read.
|
| DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
|
| base::AutoLock lock(signal_lock_);
|
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + NotifyObserversForStateChangeNoLock(request_context);
|
| return MOJO_RESULT_FAILED_PRECONDITION;
|
| }
|
|
|
| @@ -485,7 +509,7 @@ bool MessagePipeDispatcher::EndSerialize(void* destination,
|
| return true;
|
| }
|
|
|
| -bool MessagePipeDispatcher::BeginTransit() {
|
| +bool MessagePipeDispatcher::BeginTransit(RequestContext* request_context) {
|
| base::AutoLock lock(signal_lock_);
|
| if (in_transit_ || port_closed_)
|
| return false;
|
| @@ -493,21 +517,22 @@ bool MessagePipeDispatcher::BeginTransit() {
|
| return in_transit_;
|
| }
|
|
|
| -void MessagePipeDispatcher::CompleteTransitAndClose() {
|
| +void MessagePipeDispatcher::CompleteTransitAndClose(
|
| + RequestContext* request_context) {
|
| node_controller_->SetPortObserver(port_, nullptr);
|
|
|
| base::AutoLock lock(signal_lock_);
|
| in_transit_ = false;
|
| port_transferred_ = true;
|
| - CloseNoLock();
|
| + CloseNoLock(request_context);
|
| }
|
|
|
| -void MessagePipeDispatcher::CancelTransit() {
|
| +void MessagePipeDispatcher::CancelTransit(RequestContext* request_context) {
|
| base::AutoLock lock(signal_lock_);
|
| in_transit_ = false;
|
|
|
| // Something may have happened while we were waiting for potential transit.
|
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + NotifyObserversForStateChangeNoLock(request_context);
|
| }
|
|
|
| // static
|
| @@ -536,12 +561,13 @@ MessagePipeDispatcher::~MessagePipeDispatcher() {
|
| DCHECK(port_closed_ && !in_transit_);
|
| }
|
|
|
| -MojoResult MessagePipeDispatcher::CloseNoLock() {
|
| +MojoResult MessagePipeDispatcher::CloseNoLock(RequestContext* request_context) {
|
| signal_lock_.AssertAcquired();
|
| if (port_closed_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| port_closed_ = true;
|
| + watchers_.CancelAll(request_context);
|
| awakables_.CancelAll();
|
|
|
| if (!port_transferred_) {
|
| @@ -579,6 +605,10 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
|
| }
|
|
|
| void MessagePipeDispatcher::OnPortStatusChanged() {
|
| + // Create a new RequestContext that will run its finalizers after
|
| + // |signal_lock_| is released.
|
| + RequestContext request_context;
|
| +
|
| base::AutoLock lock(signal_lock_);
|
|
|
| // We stop observing our port as soon as it's transferred, but this can race
|
| @@ -608,7 +638,14 @@ void MessagePipeDispatcher::OnPortStatusChanged() {
|
| }
|
| #endif
|
|
|
| - awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + NotifyObserversForStateChangeNoLock(&request_context);
|
| +}
|
| +
|
| +void MessagePipeDispatcher::NotifyObserversForStateChangeNoLock(
|
| + RequestContext* request_context) {
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakables_.AwakeForStateChange(state);
|
| + watchers_.NotifyOfStateChange(state, request_context);
|
| }
|
|
|
| } // namespace edk
|
|
|