| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
 | 
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
 | 
| index 8c1993aa5d36dd474b904eabeb5f1169465e5a9c..71c61963e23f60d50dbd858b2473600c0085bf45 100644
 | 
| --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
 | 
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
 | 
| @@ -79,6 +79,7 @@ DataPipeProducerDispatcher::DataPipeProducerDispatcher(
 | 
|        node_controller_(node_controller),
 | 
|        control_port_(control_port),
 | 
|        pipe_id_(pipe_id),
 | 
| +      watchers_(this),
 | 
|        shared_ring_buffer_(shared_ring_buffer),
 | 
|        available_capacity_(options_.capacity_num_bytes) {
 | 
|    if (initialized) {
 | 
| @@ -97,28 +98,6 @@ MojoResult DataPipeProducerDispatcher::Close() {
 | 
|    return CloseNoLock();
 | 
|  }
 | 
|  
 | 
| -MojoResult DataPipeProducerDispatcher::Watch(
 | 
| -    MojoHandleSignals signals,
 | 
| -    const Watcher::WatchCallback& callback,
 | 
| -    uintptr_t context) {
 | 
| -  base::AutoLock lock(lock_);
 | 
| -
 | 
| -  if (is_closed_ || in_transit_)
 | 
| -    return MOJO_RESULT_INVALID_ARGUMENT;
 | 
| -
 | 
| -  return awakable_list_.AddWatcher(
 | 
| -      signals, callback, context, GetHandleSignalsStateNoLock());
 | 
| -}
 | 
| -
 | 
| -MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
 | 
| -  base::AutoLock lock(lock_);
 | 
| -
 | 
| -  if (is_closed_ || in_transit_)
 | 
| -    return MOJO_RESULT_INVALID_ARGUMENT;
 | 
| -
 | 
| -  return awakable_list_.RemoveWatcher(context);
 | 
| -}
 | 
| -
 | 
|  MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
 | 
|                                                   uint32_t* num_bytes,
 | 
|                                                   MojoWriteDataFlags flags) {
 | 
| @@ -179,6 +158,7 @@ MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
 | 
|    HandleSignalsState new_state = GetHandleSignalsStateNoLock();
 | 
|    if (!new_state.equals(old_state))
 | 
|      awakable_list_.AwakeForStateChange(new_state);
 | 
| +  watchers_.NotifyState(new_state);
 | 
|  
 | 
|    base::AutoUnlock unlock(lock_);
 | 
|    NotifyWrite(num_bytes_to_write);
 | 
| @@ -193,6 +173,11 @@ MojoResult DataPipeProducerDispatcher::BeginWriteData(
 | 
|    base::AutoLock lock(lock_);
 | 
|    if (!shared_ring_buffer_ || in_transit_)
 | 
|      return MOJO_RESULT_INVALID_ARGUMENT;
 | 
| +
 | 
| +  // These flags may not be used in two-phase mode.
 | 
| +  if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)
 | 
| +    return MOJO_RESULT_INVALID_ARGUMENT;
 | 
| +
 | 
|    if (in_two_phase_write_)
 | 
|      return MOJO_RESULT_BUSY;
 | 
|    if (peer_closed_)
 | 
| @@ -251,6 +236,7 @@ MojoResult DataPipeProducerDispatcher::EndWriteData(
 | 
|    HandleSignalsState new_state = GetHandleSignalsStateNoLock();
 | 
|    if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
 | 
|      awakable_list_.AwakeForStateChange(new_state);
 | 
| +  watchers_.NotifyState(new_state);
 | 
|  
 | 
|    return rv;
 | 
|  }
 | 
| @@ -260,6 +246,24 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
 | 
|    return GetHandleSignalsStateNoLock();
 | 
|  }
 | 
|  
 | 
| +MojoResult DataPipeProducerDispatcher::AddWatcherRef(
 | 
| +    const scoped_refptr<WatcherDispatcher>& watcher,
 | 
| +    uintptr_t context) {
 | 
| +  base::AutoLock lock(lock_);
 | 
| +  if (is_closed_ || in_transit_)
 | 
| +    return MOJO_RESULT_INVALID_ARGUMENT;
 | 
| +  return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
 | 
| +}
 | 
| +
 | 
| +MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
 | 
| +    WatcherDispatcher* watcher,
 | 
| +    uintptr_t context) {
 | 
| +  base::AutoLock lock(lock_);
 | 
| +  if (is_closed_ || in_transit_)
 | 
| +    return MOJO_RESULT_INVALID_ARGUMENT;
 | 
| +  return watchers_.Remove(watcher, context);
 | 
| +}
 | 
| +
 | 
|  MojoResult DataPipeProducerDispatcher::AddAwakable(
 | 
|      Awakable* awakable,
 | 
|      MojoHandleSignals signals,
 | 
| @@ -356,7 +360,10 @@ void DataPipeProducerDispatcher::CancelTransit() {
 | 
|    DCHECK(in_transit_);
 | 
|    in_transit_ = false;
 | 
|    buffer_handle_for_transit_.reset();
 | 
| -  awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
 | 
| +
 | 
| +  HandleSignalsState state = GetHandleSignalsStateNoLock();
 | 
| +  awakable_list_.AwakeForStateChange(state);
 | 
| +  watchers_.NotifyState(state);
 | 
|  }
 | 
|  
 | 
|  // static
 | 
| @@ -440,6 +447,7 @@ MojoResult DataPipeProducerDispatcher::CloseNoLock() {
 | 
|    shared_ring_buffer_ = nullptr;
 | 
|  
 | 
|    awakable_list_.CancelAll();
 | 
| +  watchers_.NotifyClosed();
 | 
|    if (!transferred_) {
 | 
|      base::AutoUnlock unlock(lock_);
 | 
|      node_controller_->ClosePort(control_port_);
 | 
| @@ -453,8 +461,7 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
 | 
|    lock_.AssertAcquired();
 | 
|    HandleSignalsState rv;
 | 
|    if (!peer_closed_) {
 | 
| -    if (!in_two_phase_write_ && shared_ring_buffer_ &&
 | 
| -        available_capacity_ > 0)
 | 
| +    if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0)
 | 
|        rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
 | 
|      rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
 | 
|    } else {
 | 
| @@ -541,7 +548,9 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
 | 
|  
 | 
|    if (peer_closed_ != was_peer_closed ||
 | 
|        available_capacity_ != previous_capacity) {
 | 
| -    awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
 | 
| +    HandleSignalsState state = GetHandleSignalsStateNoLock();
 | 
| +    awakable_list_.AwakeForStateChange(state);
 | 
| +    watchers_.NotifyState(state);
 | 
|    }
 | 
|  }
 | 
|  
 | 
| 
 |