Chromium Code Reviews| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| index ec8d2d855e4bbb6f62940b91f5176b99e2ab1d46..ed324d6fee3efe2a6df69182cb14d82d59408cbf 100644 |
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc |
| @@ -80,6 +80,7 @@ DataPipeConsumerDispatcher::DataPipeConsumerDispatcher( |
| node_controller_(node_controller), |
| control_port_(control_port), |
| pipe_id_(pipe_id), |
| + watchers_(this), |
| shared_ring_buffer_(shared_ring_buffer) { |
| if (initialized) { |
| base::AutoLock lock(lock_); |
| @@ -97,29 +98,6 @@ MojoResult DataPipeConsumerDispatcher::Close() { |
| return CloseNoLock(); |
| } |
| - |
| -MojoResult DataPipeConsumerDispatcher::Watch( |
| - MojoHandleSignals signals, |
| - const Watcher::WatchCallback& callback, |
| - uintptr_t context) { |
| - base::AutoLock lock(lock_); |
| - |
| - if (is_closed_ || in_transit_) |
| - return MOJO_RESULT_INVALID_ARGUMENT; |
| - |
| - return awakable_list_.AddWatcher( |
| - signals, callback, context, GetHandleSignalsStateNoLock()); |
| -} |
| - |
| -MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) { |
| - base::AutoLock lock(lock_); |
| - |
| - if (is_closed_ || in_transit_) |
| - return MOJO_RESULT_INVALID_ARGUMENT; |
| - |
| - return awakable_list_.RemoveWatcher(context); |
| -} |
| - |
| MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
| uint32_t* num_bytes, |
| MojoReadDataFlags flags) { |
| @@ -160,6 +138,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
| all_or_none ? max_num_bytes_to_read : 0; |
| if (min_num_bytes_to_read > bytes_available_) { |
| + if (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL) { |
| + suppress_readable_signal_ = true; |
| + watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| + } |
| return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION |
| : MOJO_RESULT_OUT_OF_RANGE; |
| } |
| @@ -197,6 +179,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements, |
| NotifyRead(bytes_to_read); |
| } |
| + // We may have just read the last available data and thus changed the signals |
| + // state. |
| + watchers_.NotifyState(GetHandleSignalsStateNoLock()); |
| + |
| return MOJO_RESULT_OK; |
| } |
| @@ -213,7 +199,8 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer, |
| // These flags may not be used in two-phase mode. |
| if ((flags & MOJO_READ_DATA_FLAG_DISCARD) || |
| (flags & MOJO_READ_DATA_FLAG_QUERY) || |
| - (flags & MOJO_READ_DATA_FLAG_PEEK)) |
| + (flags & MOJO_READ_DATA_FLAG_PEEK) || |
| + (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL)) |
| return MOJO_RESULT_INVALID_ARGUMENT; |
| if (bytes_available_ == 0) { |
| @@ -270,6 +257,7 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) { |
| HandleSignalsState new_state = GetHandleSignalsStateNoLock(); |
| if (!new_state.equals(old_state)) |
| awakable_list_.AwakeForStateChange(new_state); |
| + watchers_.NotifyState(new_state); |
| return rv; |
| } |
| @@ -279,6 +267,24 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const { |
| return GetHandleSignalsStateNoLock(); |
| } |
| +MojoResult DataPipeConsumerDispatcher::AddWatcherRef( |
| + const scoped_refptr<WatcherDispatcher>& watcher, |
| + uintptr_t context) { |
| + base::AutoLock lock(lock_); |
| + if (is_closed_ || in_transit_) |
| + return MOJO_RESULT_INVALID_ARGUMENT; |
| + return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock()); |
| +} |
| + |
| +MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef( |
| + WatcherDispatcher* watcher, |
| + uintptr_t context) { |
| + base::AutoLock lock(lock_); |
| + if (is_closed_ || in_transit_) |
| + return MOJO_RESULT_INVALID_ARGUMENT; |
| + return watchers_.Remove(watcher, context); |
| +} |
| + |
| MojoResult DataPipeConsumerDispatcher::AddAwakable( |
| Awakable* awakable, |
| MojoHandleSignals signals, |
| @@ -460,6 +466,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() { |
| shared_ring_buffer_ = nullptr; |
| awakable_list_.CancelAll(); |
| + watchers_.NotifyClosed(); |
| if (!transferred_) { |
| base::AutoUnlock unlock(lock_); |
| node_controller_->ClosePort(control_port_); |
| @@ -474,7 +481,7 @@ DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const { |
| HandleSignalsState rv; |
| if (shared_ring_buffer_ && bytes_available_) { |
| - if (!in_two_phase_read_) |
| + if (!in_two_phase_read_ && bytes_available_ && !suppress_readable_signal_) |
|
yzshen1
2017/03/11 00:44:58
nit: there is no need to check |bytes_available_|
Ken Rockot(use gerrit already)
2017/03/12 22:24:13
Done
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE; |
| } else if (!peer_closed_ && shared_ring_buffer_) { |
| @@ -565,7 +572,10 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() { |
| if (peer_closed_ != was_peer_closed || |
| bytes_available_ != previous_bytes_available) { |
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock()); |
| + suppress_readable_signal_ = false; |
| + HandleSignalsState state = GetHandleSignalsStateNoLock(); |
| + awakable_list_.AwakeForStateChange(state); |
| + watchers_.NotifyState(state); |
| } |
| } |