| Index: mojo/edk/system/message_pipe_dispatcher.cc | 
| diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc | 
| index f27336b56f4d074a0e5e32ec2a592fcccff363a3..a9ae683d9bad683e56e60217ef57d5e26fdb80bb 100644 | 
| --- a/mojo/edk/system/message_pipe_dispatcher.cc | 
| +++ b/mojo/edk/system/message_pipe_dispatcher.cc | 
| @@ -183,6 +183,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 
| port0 = port_; | 
| port_closed_.Set(true); | 
| awakables_.CancelAll(); | 
| +    watchers_.NotifyClosed(); | 
| } | 
|  | 
| ports::PortRef port1; | 
| @@ -191,6 +192,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) { | 
| port1 = other->port_; | 
| other->port_closed_.Set(true); | 
| other->awakables_.CancelAll(); | 
| +    other->watchers_.NotifyClosed(); | 
| } | 
|  | 
| // Both ports are always closed by this call. | 
| @@ -209,25 +211,35 @@ MojoResult MessagePipeDispatcher::Close() { | 
| return CloseNoLock(); | 
| } | 
|  | 
| -MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals, | 
| -                                        const Watcher::WatchCallback& callback, | 
| -                                        uintptr_t context) { | 
| +MojoResult MessagePipeDispatcher::RegisterWatcher( | 
| +    MojoHandleSignals signals, | 
| +    const Watcher::WatchCallback& callback, | 
| +    uintptr_t context) { | 
| +  base::AutoLock lock(signal_lock_); | 
| + | 
| +  if (port_closed_ || in_transit_) | 
| +    return MOJO_RESULT_INVALID_ARGUMENT; | 
| + | 
| +  return watchers_.Add(signals, callback, context, | 
| +                       GetHandleSignalsStateNoLock()); | 
| +} | 
| + | 
| +MojoResult MessagePipeDispatcher::ArmWatcher(uintptr_t context) { | 
| base::AutoLock lock(signal_lock_); | 
|  | 
| if (port_closed_ || in_transit_) | 
| return MOJO_RESULT_INVALID_ARGUMENT; | 
|  | 
| -  return awakables_.AddWatcher( | 
| -      signals, callback, context, GetHandleSignalsStateNoLock()); | 
| +  return watchers_.Arm(context, GetHandleSignalsStateNoLock()); | 
| } | 
|  | 
| -MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) { | 
| +MojoResult MessagePipeDispatcher::UnregisterWatcher(uintptr_t context) { | 
| base::AutoLock lock(signal_lock_); | 
|  | 
| if (port_closed_ || in_transit_) | 
| return MOJO_RESULT_INVALID_ARGUMENT; | 
|  | 
| -  return awakables_.RemoveWatcher(context); | 
| +  return watchers_.Remove(context); | 
| } | 
|  | 
| MojoResult MessagePipeDispatcher::WriteMessage( | 
| @@ -286,6 +298,11 @@ MojoResult MessagePipeDispatcher::ReadMessage( | 
| &no_space, &invalid_message); | 
| int rv = node_controller_->node()->GetMessage(port_, &ports_message, &filter); | 
|  | 
| +  { | 
| +    base::AutoLock lock(signal_lock_); | 
| +    watchers_.NotifyState(GetHandleSignalsStateNoLock()); | 
| +  } | 
| + | 
| if (invalid_message) | 
| return MOJO_RESULT_UNKNOWN; | 
|  | 
| @@ -496,7 +513,9 @@ void MessagePipeDispatcher::CancelTransit() { | 
| in_transit_.Set(false); | 
|  | 
| // Something may have happened while we were waiting for potential transit. | 
| -  awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 
| +  HandleSignalsState state = GetHandleSignalsStateNoLock(); | 
| +  awakables_.AwakeForStateChange(state); | 
| +  watchers_.NotifyState(state); | 
| } | 
|  | 
| // static | 
| @@ -532,6 +551,7 @@ MojoResult MessagePipeDispatcher::CloseNoLock() { | 
|  | 
| port_closed_.Set(true); | 
| awakables_.CancelAll(); | 
| +  watchers_.NotifyClosed(); | 
|  | 
| if (!port_transferred_) { | 
| base::AutoUnlock unlock(signal_lock_); | 
| @@ -596,7 +616,9 @@ void MessagePipeDispatcher::OnPortStatusChanged() { | 
| } | 
| #endif | 
|  | 
| -  awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock()); | 
| +  HandleSignalsState state = GetHandleSignalsStateNoLock(); | 
| +  awakables_.AwakeForStateChange(state); | 
| +  watchers_.NotifyState(state); | 
| } | 
|  | 
| }  // namespace edk | 
|  |