Index: mojo/edk/system/data_pipe_producer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
index 71c61963e23f60d50dbd858b2473600c0085bf45..8c1993aa5d36dd474b904eabeb5f1169465e5a9c 100644 |
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
@@ -79,7 +79,6 @@ |
node_controller_(node_controller), |
control_port_(control_port), |
pipe_id_(pipe_id), |
- watchers_(this), |
shared_ring_buffer_(shared_ring_buffer), |
available_capacity_(options_.capacity_num_bytes) { |
if (initialized) { |
@@ -98,6 +97,28 @@ |
return CloseNoLock(); |
} |
+MojoResult DataPipeProducerDispatcher::Watch( |
+ MojoHandleSignals signals, |
+ const Watcher::WatchCallback& callback, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ return awakable_list_.AddWatcher( |
+ signals, callback, context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ return awakable_list_.RemoveWatcher(context); |
+} |
+ |
MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
uint32_t* num_bytes, |
MojoWriteDataFlags flags) { |
@@ -158,7 +179,6 @@ |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
- watchers_.NotifyState(new_state); |
base::AutoUnlock unlock(lock_); |
NotifyWrite(num_bytes_to_write); |
@@ -173,11 +193,6 @@ |
base::AutoLock lock(lock_); |
if (!shared_ring_buffer_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- // These flags may not be used in two-phase mode. |
- if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
if (in_two_phase_write_) |
return MOJO_RESULT_BUSY; |
if (peer_closed_) |
@@ -236,7 +251,6 @@ |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
awakable_list_.AwakeForStateChange(new_state); |
- watchers_.NotifyState(new_state); |
return rv; |
} |
@@ -244,24 +258,6 @@ |
HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
base::AutoLock lock(lock_); |
return GetHandleSignalsStateNoLock(); |
-} |
- |
-MojoResult DataPipeProducerDispatcher::AddWatcherRef( |
- const scoped_refptr<WatcherDispatcher>& watcher, |
- uintptr_t context) { |
- base::AutoLock lock(lock_); |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
-} |
- |
-MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( |
- WatcherDispatcher* watcher, |
- uintptr_t context) { |
- base::AutoLock lock(lock_); |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- return watchers_.Remove(watcher, context); |
} |
MojoResult DataPipeProducerDispatcher::AddAwakable( |
@@ -360,10 +356,7 @@ |
DCHECK(in_transit_); |
in_transit_ = false; |
buffer_handle_for_transit_.reset(); |
- |
- HandleSignalsState state = GetHandleSignalsStateNoLock(); |
- awakable_list_.AwakeForStateChange(state); |
- watchers_.NotifyState(state); |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
} |
// static |
@@ -447,7 +440,6 @@ |
shared_ring_buffer_ = nullptr; |
awakable_list_.CancelAll(); |
- watchers_.NotifyClosed(); |
if (!transferred_) { |
base::AutoUnlock unlock(lock_); |
node_controller_->ClosePort(control_port_); |
@@ -461,7 +453,8 @@ |
lock_.AssertAcquired(); |
HandleSignalsState rv; |
if (!peer_closed_) { |
- if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0) |
+ if (!in_two_phase_write_ && shared_ring_buffer_ && |
+ available_capacity_ > 0) |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
} else { |
@@ -548,9 +541,7 @@ |
if (peer_closed_ != was_peer_closed || |
available_capacity_ != previous_capacity) { |
- HandleSignalsState state = GetHandleSignalsStateNoLock(); |
- awakable_list_.AwakeForStateChange(state); |
- watchers_.NotifyState(state); |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
} |
} |