Index: mojo/edk/system/data_pipe_producer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
index 8c1993aa5d36dd474b904eabeb5f1169465e5a9c..71c61963e23f60d50dbd858b2473600c0085bf45 100644 |
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc |
@@ -79,6 +79,7 @@ DataPipeProducerDispatcher::DataPipeProducerDispatcher( |
node_controller_(node_controller), |
control_port_(control_port), |
pipe_id_(pipe_id), |
+ watchers_(this), |
shared_ring_buffer_(shared_ring_buffer), |
available_capacity_(options_.capacity_num_bytes) { |
if (initialized) { |
@@ -97,28 +98,6 @@ MojoResult DataPipeProducerDispatcher::Close() { |
return CloseNoLock(); |
} |
-MojoResult DataPipeProducerDispatcher::Watch( |
- MojoHandleSignals signals, |
- const Watcher::WatchCallback& callback, |
- uintptr_t context) { |
- base::AutoLock lock(lock_); |
- |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakable_list_.AddWatcher( |
- signals, callback, context, GetHandleSignalsStateNoLock()); |
-} |
- |
-MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) { |
- base::AutoLock lock(lock_); |
- |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakable_list_.RemoveWatcher(context); |
-} |
- |
MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
uint32_t* num_bytes, |
MojoWriteDataFlags flags) { |
@@ -179,6 +158,7 @@ MojoResult DataPipeProducerDispatcher::WriteData(const void* elements, |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
+ watchers_.NotifyState(new_state); |
base::AutoUnlock unlock(lock_); |
NotifyWrite(num_bytes_to_write); |
@@ -193,6 +173,11 @@ MojoResult DataPipeProducerDispatcher::BeginWriteData( |
base::AutoLock lock(lock_); |
if (!shared_ring_buffer_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ // These flags may not be used in two-phase mode. |
+ if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
if (in_two_phase_write_) |
return MOJO_RESULT_BUSY; |
if (peer_closed_) |
@@ -251,6 +236,7 @@ MojoResult DataPipeProducerDispatcher::EndWriteData( |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) |
awakable_list_.AwakeForStateChange(new_state); |
+ watchers_.NotifyState(new_state); |
return rv; |
} |
@@ -260,6 +246,24 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const { |
return GetHandleSignalsStateNoLock(); |
} |
+MojoResult DataPipeProducerDispatcher::AddWatcherRef( |
+ const scoped_refptr<WatcherDispatcher>& watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult DataPipeProducerDispatcher::RemoveWatcherRef( |
+ WatcherDispatcher* watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Remove(watcher, context); |
+} |
+ |
MojoResult DataPipeProducerDispatcher::AddAwakable( |
Awakable* awakable, |
MojoHandleSignals signals, |
@@ -356,7 +360,10 @@ void DataPipeProducerDispatcher::CancelTransit() { |
DCHECK(in_transit_); |
in_transit_ = false; |
buffer_handle_for_transit_.reset(); |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakable_list_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
// static |
@@ -440,6 +447,7 @@ MojoResult DataPipeProducerDispatcher::CloseNoLock() { |
shared_ring_buffer_ = nullptr; |
awakable_list_.CancelAll(); |
+ watchers_.NotifyClosed(); |
if (!transferred_) { |
base::AutoUnlock unlock(lock_); |
node_controller_->ClosePort(control_port_); |
@@ -453,8 +461,7 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock() |
lock_.AssertAcquired(); |
HandleSignalsState rv; |
if (!peer_closed_) { |
- if (!in_two_phase_write_ && shared_ring_buffer_ && |
- available_capacity_ > 0) |
+ if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0) |
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE; |
} else { |
@@ -541,7 +548,9 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() { |
if (peer_closed_ != was_peer_closed || |
available_capacity_ != previous_capacity) { |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakable_list_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
} |