Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
index ec8d2d855e4bbb6f62940b91f5176b99e2ab1d46..63abf35d77fc66873ce16012345a280dbebe0cd8 100644 |
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
@@ -97,8 +97,7 @@ MojoResult DataPipeConsumerDispatcher::Close() { |
return CloseNoLock(); |
} |
- |
-MojoResult DataPipeConsumerDispatcher::Watch( |
+MojoResult DataPipeConsumerDispatcher::RegisterWatcher( |
MojoHandleSignals signals, |
const Watcher::WatchCallback& callback, |
uintptr_t context) { |
@@ -107,17 +106,26 @@ MojoResult DataPipeConsumerDispatcher::Watch( |
if (is_closed_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- return awakable_list_.AddWatcher( |
- signals, callback, context, GetHandleSignalsStateNoLock()); |
+ return watchers_.Add(signals, callback, context, |
+ GetHandleSignalsStateNoLock()); |
} |
-MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { |
+MojoResult DataPipeConsumerDispatcher::ArmWatcher(uintptr_t context) { |
base::AutoLock lock(lock_); |
if (is_closed_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- return awakable_list_.RemoveWatcher(context); |
+ return watchers_.Arm(context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult DataPipeConsumerDispatcher::UnregisterWatcher(uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ return watchers_.Remove(context); |
} |
MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
@@ -268,8 +276,10 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { |
two_phase_max_bytes_read_ = 0; |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
- if (!new_state.equals(old_state)) |
+ if (!new_state.equals(old_state)) { |
awakable_list_.AwakeForStateChange(new_state); |
+ watchers_.NotifyState(new_state); |
+ } |
return rv; |
} |
@@ -460,6 +470,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
shared_ring_buffer_ = nullptr; |
awakable_list_.CancelAll(); |
+ watchers_.NotifyClosed(); |
if (!transferred_) { |
base::AutoUnlock unlock(lock_); |
node_controller_->ClosePort(control_port_); |
@@ -565,7 +576,9 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { |
if (peer_closed_ != was_peer_closed || |
bytes_available_ != previous_bytes_available) { |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakable_list_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
} |