Index: mojo/edk/system/message_pipe_dispatcher.cc |
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc |
index f27336b56f4d074a0e5e32ec2a592fcccff363a3..76f412306d042ce7d072b4bcf8c2eb7e4cb43de7 100644 |
--- a/mojo/edk/system/message_pipe_dispatcher.cc |
+++ b/mojo/edk/system/message_pipe_dispatcher.cc |
@@ -164,7 +164,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, |
: node_controller_(node_controller), |
port_(port), |
pipe_id_(pipe_id), |
- endpoint_(endpoint) { |
+ endpoint_(endpoint), |
+ watchers_(this) { |
DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() |
<< " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; |
@@ -183,6 +184,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { |
port0 = port_; |
port_closed_.Set(true); |
awakables_.CancelAll(); |
+ watchers_.NotifyClosed(); |
} |
ports::PortRef port1; |
@@ -191,6 +193,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { |
port1 = other->port_; |
other->port_closed_.Set(true); |
other->awakables_.CancelAll(); |
+ other->watchers_.NotifyClosed(); |
} |
// Both ports are always closed by this call. |
@@ -209,27 +212,6 @@ MojoResult MessagePipeDispatcher::Close() { |
return CloseNoLock(); |
} |
-MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, |
- const Watcher::WatchCallback& callback, |
- uintptr_t context) { |
- base::AutoLock lock(signal_lock_); |
- |
- if (port_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakables_.AddWatcher( |
- signals, callback, context, GetHandleSignalsStateNoLock()); |
-} |
- |
-MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { |
- base::AutoLock lock(signal_lock_); |
- |
- if (port_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakables_.RemoveWatcher(context); |
-} |
- |
MojoResult MessagePipeDispatcher::WriteMessage( |
std::unique_ptr<MessageForTransit> message, |
MojoWriteMessageFlags flags) { |
@@ -299,6 +281,12 @@ MojoResult MessagePipeDispatcher::ReadMessage( |
} |
if (no_space) { |
+ if (may_discard) { |
+ // May have been the last message on the pipe. Need to update signals just |
+ // in case. |
+ base::AutoLock lock(signal_lock_); |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
+ } |
// |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't |
// sufficient to hold this message's data. The message will still be in |
// queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. |
@@ -319,6 +307,13 @@ MojoResult MessagePipeDispatcher::ReadMessage( |
// Alright! We have a message and the caller has provided sufficient storage |
// in which to receive it. |
+ { |
+ // We need to update anyone watching our signals in case that was the last |
+ // available message. |
+ base::AutoLock lock(signal_lock_); |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
+ } |
+ |
std::unique_ptr<PortsMessage> msg( |
static_cast<PortsMessage*>(ports_message.release())); |
@@ -396,6 +391,23 @@ MessagePipeDispatcher::GetHandleSignalsState() const { |
return GetHandleSignalsStateNoLock(); |
} |
+MojoResult MessagePipeDispatcher::AddWatcherRef( |
+ const scoped_refptr<WatcherDispatcher>& watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(signal_lock_); |
+ if (port_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(signal_lock_); |
+ if (port_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Remove(watcher, context); |
+} |
+ |
MojoResult MessagePipeDispatcher::AddAwakable( |
Awakable* awakable, |
MojoHandleSignals signals, |
@@ -496,7 +508,9 @@ void MessagePipeDispatcher::CancelTransit() { |
in_transit_.Set(false); |
// Something may have happened while we were waiting for potential transit. |
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakables_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
// static |
@@ -532,6 +546,7 @@ MojoResult MessagePipeDispatcher::CloseNoLock() { |
port_closed_.Set(true); |
awakables_.CancelAll(); |
+ watchers_.NotifyClosed(); |
if (!port_transferred_) { |
base::AutoUnlock unlock(signal_lock_); |
@@ -596,7 +611,9 @@ void MessagePipeDispatcher::OnPortStatusChanged() { |
} |
#endif |
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakables_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
} // namespace edk |