| Index: mojo/edk/system/message_pipe_dispatcher.cc | 
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc | 
| index f27336b56f4d074a0e5e32ec2a592fcccff363a3..76f412306d042ce7d072b4bcf8c2eb7e4cb43de7 100644 | 
| --- a/mojo/edk/system/message_pipe_dispatcher.cc | 
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc | 
| @@ -164,7 +164,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller, | 
| : node_controller_(node_controller), | 
| port_(port), | 
| pipe_id_(pipe_id), | 
| -      endpoint_(endpoint) { | 
| +      endpoint_(endpoint), | 
| +      watchers_(this) { | 
| DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name() | 
| << " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]"; | 
|  | 
| @@ -183,6 +184,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 
| port0 = port_; | 
| port_closed_.Set(true); | 
| awakables_.CancelAll(); | 
| +    watchers_.NotifyClosed(); | 
| } | 
|  | 
| ports::PortRef port1; | 
| @@ -191,6 +193,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 
| port1 = other->port_; | 
| other->port_closed_.Set(true); | 
| other->awakables_.CancelAll(); | 
| +    other->watchers_.NotifyClosed(); | 
| } | 
|  | 
| // Both ports are always closed by this call. | 
| @@ -209,27 +212,6 @@ MojoResult MessagePipeDispatcher::Close() { | 
| return CloseNoLock(); | 
| } | 
|  | 
| -MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, | 
| -                                        const Watcher::WatchCallback& callback, | 
| -                                        uintptr_t context) { | 
| -  base::AutoLock lock(signal_lock_); | 
| - | 
| -  if (port_closed_ || in_transit_) | 
| -    return MOJO_RESULT_INVALID_ARGUMENT; | 
| - | 
| -  return awakables_.AddWatcher( | 
| -      signals, callback, context, GetHandleSignalsStateNoLock()); | 
| -} | 
| - | 
| -MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | 
| -  base::AutoLock lock(signal_lock_); | 
| - | 
| -  if (port_closed_ || in_transit_) | 
| -    return MOJO_RESULT_INVALID_ARGUMENT; | 
| - | 
| -  return awakables_.RemoveWatcher(context); | 
| -} | 
| - | 
| MojoResult MessagePipeDispatcher::WriteMessage( | 
| std::unique_ptr<MessageForTransit> message, | 
| MojoWriteMessageFlags flags) { | 
| @@ -299,6 +281,12 @@ MojoResult MessagePipeDispatcher::ReadMessage( | 
| } | 
|  | 
| if (no_space) { | 
| +    if (may_discard) { | 
| +      // May have been the last message on the pipe. Need to update signals just | 
| +      // in case. | 
| +      base::AutoLock lock(signal_lock_); | 
| +      watchers_.NotifyState(GetHandleSignalsStateNoLock()); | 
| +    } | 
| // |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't | 
| // sufficient to hold this message's data. The message will still be in | 
| // queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set. | 
| @@ -319,6 +307,13 @@ MojoResult MessagePipeDispatcher::ReadMessage( | 
| // Alright! We have a message and the caller has provided sufficient storage | 
| // in which to receive it. | 
|  | 
| +  { | 
| +    // We need to update anyone watching our signals in case that was the last | 
| +    // available message. | 
| +    base::AutoLock lock(signal_lock_); | 
| +    watchers_.NotifyState(GetHandleSignalsStateNoLock()); | 
| +  } | 
| + | 
| std::unique_ptr<PortsMessage> msg( | 
| static_cast<PortsMessage*>(ports_message.release())); | 
|  | 
| @@ -396,6 +391,23 @@ MessagePipeDispatcher::GetHandleSignalsState() const { | 
| return GetHandleSignalsStateNoLock(); | 
| } | 
|  | 
| +MojoResult MessagePipeDispatcher::AddWatcherRef( | 
| +    const scoped_refptr<WatcherDispatcher>& watcher, | 
| +    uintptr_t context) { | 
| +  base::AutoLock lock(signal_lock_); | 
| +  if (port_closed_ || in_transit_) | 
| +    return MOJO_RESULT_INVALID_ARGUMENT; | 
| +  return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); | 
| +} | 
| + | 
| +MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher, | 
| +                                                   uintptr_t context) { | 
| +  base::AutoLock lock(signal_lock_); | 
| +  if (port_closed_ || in_transit_) | 
| +    return MOJO_RESULT_INVALID_ARGUMENT; | 
| +  return watchers_.Remove(watcher, context); | 
| +} | 
| + | 
| MojoResult MessagePipeDispatcher::AddAwakable( | 
| Awakable* awakable, | 
| MojoHandleSignals signals, | 
| @@ -496,7 +508,9 @@ void MessagePipeDispatcher::CancelTransit() { | 
| in_transit_.Set(false); | 
|  | 
| // Something may have happened while we were waiting for potential transit. | 
| -  awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 
| +  HandleSignalsState state = GetHandleSignalsStateNoLock(); | 
| +  awakables_.AwakeForStateChange(state); | 
| +  watchers_.NotifyState(state); | 
| } | 
|  | 
| // static | 
| @@ -532,6 +546,7 @@ MojoResult MessagePipeDispatcher::CloseNoLock() { | 
|  | 
| port_closed_.Set(true); | 
| awakables_.CancelAll(); | 
| +  watchers_.NotifyClosed(); | 
|  | 
| if (!port_transferred_) { | 
| base::AutoUnlock unlock(signal_lock_); | 
| @@ -596,7 +611,9 @@ void MessagePipeDispatcher::OnPortStatusChanged() { | 
| } | 
| #endif | 
|  | 
| -  awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 
| +  HandleSignalsState state = GetHandleSignalsStateNoLock(); | 
| +  awakables_.AwakeForStateChange(state); | 
| +  watchers_.NotifyState(state); | 
| } | 
|  | 
| }  // namespace edk | 
|  |