Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
index c908e3ae8bf7d4f9668d9fa3836d1e8b79f8f97b..474e99d4955fc3c0e6cdc98f045af90e9d44e988 100644 |
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
@@ -80,6 +80,7 @@ DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
node_controller_(node_controller), |
control_port_(control_port), |
pipe_id_(pipe_id), |
+ watchers_(this), |
shared_ring_buffer_(shared_ring_buffer) { |
if (initialized) { |
base::AutoLock lock(lock_); |
@@ -97,34 +98,10 @@ MojoResult DataPipeConsumerDispatcher::Close() { |
return CloseNoLock(); |
} |
- |
-MojoResult DataPipeConsumerDispatcher::Watch( |
- MojoHandleSignals signals, |
- const Watcher::WatchCallback& callback, |
- uintptr_t context) { |
- base::AutoLock lock(lock_); |
- |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakable_list_.AddWatcher( |
- signals, callback, context, GetHandleSignalsStateNoLock()); |
-} |
- |
-MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { |
- base::AutoLock lock(lock_); |
- |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakable_list_.RemoveWatcher(context); |
-} |
- |
MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
uint32_t* num_bytes, |
MojoReadDataFlags flags) { |
base::AutoLock lock(lock_); |
- new_data_available_ = false; |
if (!shared_ring_buffer_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
@@ -132,6 +109,9 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
if (in_two_phase_read_) |
return MOJO_RESULT_BUSY; |
+ const bool had_new_data = new_data_available_; |
+ new_data_available_ = false; |
+ |
if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { |
if ((flags & MOJO_READ_DATA_FLAG_PEEK) || |
(flags & MOJO_READ_DATA_FLAG_DISCARD)) |
@@ -140,6 +120,9 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
DVLOG_IF(2, elements) |
<< "Query mode: ignoring non-null |elements|"; |
*num_bytes = static_cast<uint32_t>(bytes_available_); |
+ |
+ if (had_new_data) |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return MOJO_RESULT_OK; |
} |
@@ -162,12 +145,16 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
all_or_none ? max_num_bytes_to_read : 0; |
if (min_num_bytes_to_read > bytes_available_) { |
+ if (had_new_data) |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
: MOJO_RESULT_OUT_OF_RANGE; |
} |
uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); |
if (bytes_to_read == 0) { |
+ if (had_new_data) |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
: MOJO_RESULT_SHOULD_WAIT; |
} |
@@ -199,6 +186,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
NotifyRead(bytes_to_read); |
} |
+ // We may have just read the last available data and thus changed the signals |
+ // state. |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
+ |
return MOJO_RESULT_OK; |
} |
@@ -206,7 +197,6 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
uint32_t* buffer_num_bytes, |
MojoReadDataFlags flags) { |
base::AutoLock lock(lock_); |
- new_data_available_ = false; |
if (!shared_ring_buffer_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
@@ -219,7 +209,12 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
(flags & MOJO_READ_DATA_FLAG_PEEK)) |
return MOJO_RESULT_INVALID_ARGUMENT; |
+ const bool had_new_data = new_data_available_; |
+ new_data_available_ = false; |
+ |
if (bytes_available_ == 0) { |
+ if (had_new_data) |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
: MOJO_RESULT_SHOULD_WAIT; |
} |
@@ -237,6 +232,9 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
*buffer_num_bytes = bytes_to_read; |
two_phase_max_bytes_read_ = bytes_to_read; |
+ if (had_new_data) |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
+ |
return MOJO_RESULT_OK; |
} |
@@ -273,6 +271,7 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
+ watchers_.NotifyState(new_state); |
return rv; |
} |
@@ -282,6 +281,24 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
return GetHandleSignalsStateNoLock(); |
} |
+MojoResult DataPipeConsumerDispatcher::AddWatcherRef( |
+ const scoped_refptr<WatcherDispatcher>& watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( |
+ WatcherDispatcher* watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Remove(watcher, context); |
+} |
+ |
MojoResult DataPipeConsumerDispatcher::AddAwakable( |
Awakable* awakable, |
MojoHandleSignals signals, |
@@ -464,6 +481,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
shared_ring_buffer_ = nullptr; |
awakable_list_.CancelAll(); |
+ watchers_.NotifyClosed(); |
if (!transferred_) { |
base::AutoUnlock unlock(lock_); |
node_controller_->ClosePort(control_port_); |
@@ -581,7 +599,9 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { |
new_data_available_ = true; |
if (peer_closed_ != was_peer_closed || has_new_data) { |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakable_list_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
} |