Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
index 474e99d4955fc3c0e6cdc98f045af90e9d44e988..c908e3ae8bf7d4f9668d9fa3836d1e8b79f8f97b 100644 |
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
@@ -80,7 +80,6 @@ |
node_controller_(node_controller), |
control_port_(control_port), |
pipe_id_(pipe_id), |
- watchers_(this), |
shared_ring_buffer_(shared_ring_buffer) { |
if (initialized) { |
base::AutoLock lock(lock_); |
@@ -98,19 +97,40 @@ |
return CloseNoLock(); |
} |
+ |
+MojoResult DataPipeConsumerDispatcher::Watch( |
+ MojoHandleSignals signals, |
+ const Watcher::WatchCallback& callback, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ return awakable_list_.AddWatcher( |
+ signals, callback, context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ |
+ return awakable_list_.RemoveWatcher(context); |
+} |
+ |
MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
uint32_t* num_bytes, |
MojoReadDataFlags flags) { |
base::AutoLock lock(lock_); |
+ new_data_available_ = false; |
if (!shared_ring_buffer_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
if (in_two_phase_read_) |
return MOJO_RESULT_BUSY; |
- |
- const bool had_new_data = new_data_available_; |
- new_data_available_ = false; |
if ((flags & MOJO_READ_DATA_FLAG_QUERY)) { |
if ((flags & MOJO_READ_DATA_FLAG_PEEK) || |
@@ -120,9 +140,6 @@ |
DVLOG_IF(2, elements) |
<< "Query mode: ignoring non-null |elements|"; |
*num_bytes = static_cast<uint32_t>(bytes_available_); |
- |
- if (had_new_data) |
- watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return MOJO_RESULT_OK; |
} |
@@ -145,16 +162,12 @@ |
all_or_none ? max_num_bytes_to_read : 0; |
if (min_num_bytes_to_read > bytes_available_) { |
- if (had_new_data) |
- watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
: MOJO_RESULT_OUT_OF_RANGE; |
} |
uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_); |
if (bytes_to_read == 0) { |
- if (had_new_data) |
- watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
: MOJO_RESULT_SHOULD_WAIT; |
} |
@@ -186,10 +199,6 @@ |
NotifyRead(bytes_to_read); |
} |
- // We may have just read the last available data and thus changed the signals |
- // state. |
- watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
- |
return MOJO_RESULT_OK; |
} |
@@ -197,6 +206,7 @@ |
uint32_t* buffer_num_bytes, |
MojoReadDataFlags flags) { |
base::AutoLock lock(lock_); |
+ new_data_available_ = false; |
if (!shared_ring_buffer_ || in_transit_) |
return MOJO_RESULT_INVALID_ARGUMENT; |
@@ -209,12 +219,7 @@ |
(flags & MOJO_READ_DATA_FLAG_PEEK)) |
return MOJO_RESULT_INVALID_ARGUMENT; |
- const bool had_new_data = new_data_available_; |
- new_data_available_ = false; |
- |
if (bytes_available_ == 0) { |
- if (had_new_data) |
- watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
: MOJO_RESULT_SHOULD_WAIT; |
} |
@@ -231,9 +236,6 @@ |
*buffer = data + read_offset_; |
*buffer_num_bytes = bytes_to_read; |
two_phase_max_bytes_read_ = bytes_to_read; |
- |
- if (had_new_data) |
- watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
return MOJO_RESULT_OK; |
} |
@@ -271,7 +273,6 @@ |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
- watchers_.NotifyState(new_state); |
return rv; |
} |
@@ -279,24 +280,6 @@ |
HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
base::AutoLock lock(lock_); |
return GetHandleSignalsStateNoLock(); |
-} |
- |
-MojoResult DataPipeConsumerDispatcher::AddWatcherRef( |
- const scoped_refptr<WatcherDispatcher>& watcher, |
- uintptr_t context) { |
- base::AutoLock lock(lock_); |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
-} |
- |
-MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( |
- WatcherDispatcher* watcher, |
- uintptr_t context) { |
- base::AutoLock lock(lock_); |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- return watchers_.Remove(watcher, context); |
} |
MojoResult DataPipeConsumerDispatcher::AddAwakable( |
@@ -481,7 +464,6 @@ |
shared_ring_buffer_ = nullptr; |
awakable_list_.CancelAll(); |
- watchers_.NotifyClosed(); |
if (!transferred_) { |
base::AutoUnlock unlock(lock_); |
node_controller_->ClosePort(control_port_); |
@@ -599,9 +581,7 @@ |
new_data_available_ = true; |
if (peer_closed_ != was_peer_closed || has_new_data) { |
- HandleSignalsState state = GetHandleSignalsStateNoLock(); |
- awakable_list_.AwakeForStateChange(state); |
- watchers_.NotifyState(state); |
+ awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
} |
} |