Index: mojo/edk/system/data_pipe_producer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
index 8c1993aa5d36dd474b904eabeb5f1169465e5a9c..64ea3d5c3a78c04e4b56b520b63bc248f77d843d 100644 |
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
@@ -97,7 +97,7 @@ MojoResult DataPipeProducerDispatcher::Close() { |
return CloseNoLock(); |
} |
-MojoResult DataPipeProducerDispatcher::Watch( |
+MojoResult DataPipeProducerDispatcher::RegisterWatcher( |
MojoHandleSignals signals, |
const Watcher::WatchCallback& callback, |
uintptr_t context) { |
@@ -106,17 +106,26 @@ MojoResult DataPipeProducerDispatcher::Watch( |
if (is_closed_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- return awakable_list_.AddWatcher( |
- signals, callback, context, GetHandleSignalsStateNoLock()); |
+ return watchers_.Add(signals, callback, context, |
+ GetHandleSignalsStateNoLock()); |
} |
-MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { |
+MojoResult DataPipeProducerDispatcher::ArmWatcher(uintptr_t context) { |
base::AutoLock lock(lock_); |
if (is_closed_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- return awakable_list_.RemoveWatcher(context); |
+ return watchers_.Arm(context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult DataPipeProducerDispatcher::UnregisterWatcher(uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ return watchers_.Remove(context); |
} |
MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
@@ -177,8 +186,10 @@ MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
options_.capacity_num_bytes; |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
- if (!new_state.equals(old_state)) |
+ if (!new_state.equals(old_state)) { |
awakable_list_.AwakeForStateChange(new_state); |
+ watchers_.NotifyState(new_state); |
+ } |
base::AutoUnlock unlock(lock_); |
NotifyWrite(num_bytes_to_write); |
@@ -249,8 +260,10 @@ MojoResult DataPipeProducerDispatcher::EndWriteData( |
// If we're now writable, we *became* writable (since we weren't writable |
// during the two-phase write), so awake producer awakables. |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
- if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
+ if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) { |
awakable_list_.AwakeForStateChange(new_state); |
+ watchers_.NotifyState(new_state); |
+ } |
return rv; |
} |
@@ -356,7 +369,10 @@ void DataPipeProducerDispatcher::CancelTransit() { |
DCHECK(in_transit_); |
in_transit_ = false; |
buffer_handle_for_transit_.reset(); |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakable_list_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
// static |
@@ -440,6 +456,7 @@ MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
shared_ring_buffer_ = nullptr; |
awakable_list_.CancelAll(); |
+ watchers_.NotifyClosed(); |
if (!transferred_) { |
base::AutoUnlock unlock(lock_); |
node_controller_->ClosePort(control_port_); |
@@ -541,7 +558,9 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { |
if (peer_closed_ != was_peer_closed || |
available_capacity_ != previous_capacity) { |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakable_list_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
} |