Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index c7b4a32da6bad0e0cbea48ddc8b3e60683a01966..2406fe9a66a5135ccdb2392af5f3c7038479eebc 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -90,11 +90,33 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const { |
return Type::MESSAGE_PIPE; |
} |
-MojoResult MessagePipeDispatcher::Close() { |
+MojoResult MessagePipeDispatcher::Close(RequestContext* request_context) { |
base::AutoLock lock(signal_lock_); |
DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_ |
<< " [port=" << port_.name() << "]"; |
- return CloseNoLock(); |
+ return CloseNoLock(request_context); |
+} |
+ |
+MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, |
+ const WatchCallback& callback, |
+ uintptr_t context, |
+ RequestContext* request_context) { |
+ base::AutoLock lock(signal_lock_); |
+ |
+ if (port_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ return watchers_.Add(signals, callback, context, state, request_context); |
+} |
+ |
+MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
+ base::AutoLock lock(signal_lock_); |
+ |
+ if (port_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ return watchers_.Remove(context); |
} |
MojoResult MessagePipeDispatcher::WriteMessage( |
@@ -102,7 +124,8 @@ MojoResult MessagePipeDispatcher::WriteMessage( |
uint32_t num_bytes, |
const DispatcherInTransit* dispatchers, |
uint32_t num_dispatchers, |
- MojoWriteMessageFlags flags) { |
+ MojoWriteMessageFlags flags, |
+ RequestContext* request_context) { |
{ |
base::AutoLock lock(signal_lock_); |
@@ -226,7 +249,7 @@ MojoResult MessagePipeDispatcher::WriteMessage( |
result = MOJO_RESULT_INVALID_ARGUMENT; |
} else if (rv == ports::ERROR_PORT_PEER_CLOSED) { |
base::AutoLock lock(signal_lock_); |
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ NotifyObserversForStateChangeNoLock(request_context); |
result = MOJO_RESULT_FAILED_PRECONDITION; |
} else { |
NOTREACHED(); |
@@ -256,7 +279,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
uint32_t* num_bytes, |
MojoHandle* handles, |
uint32_t* num_handles, |
- MojoReadMessageFlags flags) { |
+ MojoReadMessageFlags flags, |
+ RequestContext* request_context) { |
{ |
base::AutoLock lock(signal_lock_); |
// We can't read from a port that's closed or in transit! |
@@ -336,7 +360,7 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes, |
// Peer is closed and there are no more messages to read. |
DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED); |
base::AutoLock lock(signal_lock_); |
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ NotifyObserversForStateChangeNoLock(request_context); |
return MOJO_RESULT_FAILED_PRECONDITION; |
} |
@@ -485,7 +509,7 @@ bool MessagePipeDispatcher::EndSerialize(void* destination, |
return true; |
} |
-bool MessagePipeDispatcher::BeginTransit() { |
+bool MessagePipeDispatcher::BeginTransit(RequestContext* request_context) { |
base::AutoLock lock(signal_lock_); |
if (in_transit_ || port_closed_) |
return false; |
@@ -493,21 +517,22 @@ bool MessagePipeDispatcher::BeginTransit() { |
return in_transit_; |
} |
-void MessagePipeDispatcher::CompleteTransitAndClose() { |
+void MessagePipeDispatcher::CompleteTransitAndClose( |
+ RequestContext* request_context) { |
node_controller_->SetPortObserver(port_, nullptr); |
base::AutoLock lock(signal_lock_); |
in_transit_ = false; |
port_transferred_ = true; |
- CloseNoLock(); |
+ CloseNoLock(request_context); |
} |
-void MessagePipeDispatcher::CancelTransit() { |
+void MessagePipeDispatcher::CancelTransit(RequestContext* request_context) { |
base::AutoLock lock(signal_lock_); |
in_transit_ = false; |
// Something may have happened while we were waiting for potential transit. |
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ NotifyObserversForStateChangeNoLock(request_context); |
} |
// static |
@@ -536,12 +561,13 @@ MessagePipeDispatcher::~MessagePipeDispatcher() { |
DCHECK(port_closed_ && !in_transit_); |
} |
-MojoResult MessagePipeDispatcher::CloseNoLock() { |
+MojoResult MessagePipeDispatcher::CloseNoLock(RequestContext* request_context) { |
signal_lock_.AssertAcquired(); |
if (port_closed_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
port_closed_ = true; |
+ watchers_.CancelAll(request_context); |
awakables_.CancelAll(); |
if (!port_transferred_) { |
@@ -579,6 +605,10 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const { |
} |
void MessagePipeDispatcher::OnPortStatusChanged() { |
+ // Create a new RequestContext that will run its finalizers after |
+ // |signal_lock_| is released. |
+ RequestContext request_context; |
+ |
base::AutoLock lock(signal_lock_); |
// We stop observing our port as soon as it's transferred, but this can race |
@@ -608,7 +638,14 @@ void MessagePipeDispatcher::OnPortStatusChanged() { |
} |
#endif |
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ NotifyObserversForStateChangeNoLock(&request_context); |
+} |
+ |
+void MessagePipeDispatcher::NotifyObserversForStateChangeNoLock( |
+ RequestContext* request_context) { |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakables_.AwakeForStateChange(state); |
+ watchers_.NotifyOfStateChange(state, request_context); |
} |
} // namespace edk |