Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
index ec8d2d855e4bbb6f62940b91f5176b99e2ab1d46..ed324d6fee3efe2a6df69182cb14d82d59408cbf 100644 |
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
@@ -80,6 +80,7 @@ DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
node_controller_(node_controller), |
control_port_(control_port), |
pipe_id_(pipe_id), |
+ watchers_(this), |
shared_ring_buffer_(shared_ring_buffer) { |
if (initialized) { |
base::AutoLock lock(lock_); |
@@ -97,29 +98,6 @@ MojoResult DataPipeConsumerDispatcher::Close() { |
return CloseNoLock(); |
} |
- |
-MojoResult DataPipeConsumerDispatcher::Watch( |
- MojoHandleSignals signals, |
- const Watcher::WatchCallback& callback, |
- uintptr_t context) { |
- base::AutoLock lock(lock_); |
- |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakable_list_.AddWatcher( |
- signals, callback, context, GetHandleSignalsStateNoLock()); |
-} |
- |
-MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { |
- base::AutoLock lock(lock_); |
- |
- if (is_closed_ || in_transit_) |
- return MOJO_RESULT_INVALID_ARGUMENT; |
- |
- return awakable_list_.RemoveWatcher(context); |
-} |
- |
MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
uint32_t* num_bytes, |
MojoReadDataFlags flags) { |
@@ -160,6 +138,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
all_or_none ? max_num_bytes_to_read : 0; |
if (min_num_bytes_to_read > bytes_available_) { |
+ if (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL) { |
+ suppress_readable_signal_ = true; |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
+ } |
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
: MOJO_RESULT_OUT_OF_RANGE; |
} |
@@ -197,6 +179,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
NotifyRead(bytes_to_read); |
} |
+ // We may have just read the last available data and thus changed the signals |
+ // state. |
+ watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
+ |
return MOJO_RESULT_OK; |
} |
@@ -213,7 +199,8 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
// These flags may not be used in two-phase mode. |
if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
(flags & MOJO_READ_DATA_FLAG_QUERY) || |
- (flags & MOJO_READ_DATA_FLAG_PEEK)) |
+ (flags & MOJO_READ_DATA_FLAG_PEEK) || |
+ (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL)) |
return MOJO_RESULT_INVALID_ARGUMENT; |
if (bytes_available_ == 0) { |
@@ -270,6 +257,7 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { |
HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
if (!new_state.equals(old_state)) |
awakable_list_.AwakeForStateChange(new_state); |
+ watchers_.NotifyState(new_state); |
return rv; |
} |
@@ -279,6 +267,24 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
return GetHandleSignalsStateNoLock(); |
} |
+MojoResult DataPipeConsumerDispatcher::AddWatcherRef( |
+ const scoped_refptr<WatcherDispatcher>& watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
+} |
+ |
+MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( |
+ WatcherDispatcher* watcher, |
+ uintptr_t context) { |
+ base::AutoLock lock(lock_); |
+ if (is_closed_ || in_transit_) |
+ return MOJO_RESULT_INVALID_ARGUMENT; |
+ return watchers_.Remove(watcher, context); |
+} |
+ |
MojoResult DataPipeConsumerDispatcher::AddAwakable( |
Awakable* awakable, |
MojoHandleSignals signals, |
@@ -460,6 +466,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
shared_ring_buffer_ = nullptr; |
awakable_list_.CancelAll(); |
+ watchers_.NotifyClosed(); |
if (!transferred_) { |
base::AutoUnlock unlock(lock_); |
node_controller_->ClosePort(control_port_); |
@@ -474,7 +481,7 @@ DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
HandleSignalsState rv; |
if (shared_ring_buffer_ && bytes_available_) { |
- if (!in_two_phase_read_) |
+ if (!in_two_phase_read_ && bytes_available_ && !suppress_readable_signal_) |
yzshen1
2017/03/11 00:44:58
nit: there is no need to check |bytes_available_|
Ken Rockot(use gerrit already)
2017/03/12 22:24:13
Done
|
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
} else if (!peer_closed_ && shared_ring_buffer_) { |
@@ -565,7 +572,10 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { |
if (peer_closed_ != was_peer_closed || |
bytes_available_ != previous_bytes_available) { |
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
+ suppress_readable_signal_ = false; |
+ HandleSignalsState state = GetHandleSignalsStateNoLock(); |
+ awakable_list_.AwakeForStateChange(state); |
+ watchers_.NotifyState(state); |
} |
} |