| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| index c908e3ae8bf7d4f9668d9fa3836d1e8b79f8f97b..474e99d4955fc3c0e6cdc98f045af90e9d44e988 100644
|
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| @@ -80,6 +80,7 @@ DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
|
| node_controller_(node_controller),
|
| control_port_(control_port),
|
| pipe_id_(pipe_id),
|
| + watchers_(this),
|
| shared_ring_buffer_(shared_ring_buffer) {
|
| if (initialized) {
|
| base::AutoLock lock(lock_);
|
| @@ -97,34 +98,10 @@ MojoResult DataPipeConsumerDispatcher::Close() {
|
| return CloseNoLock();
|
| }
|
|
|
| -
|
| -MojoResult DataPipeConsumerDispatcher::Watch(
|
| - MojoHandleSignals signals,
|
| - const Watcher::WatchCallback& callback,
|
| - uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| -
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - return awakable_list_.AddWatcher(
|
| - signals, callback, context, GetHandleSignalsStateNoLock());
|
| -}
|
| -
|
| -MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| -
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - return awakable_list_.RemoveWatcher(context);
|
| -}
|
| -
|
| MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| uint32_t* num_bytes,
|
| MojoReadDataFlags flags) {
|
| base::AutoLock lock(lock_);
|
| - new_data_available_ = false;
|
|
|
| if (!shared_ring_buffer_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| @@ -132,6 +109,9 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| if (in_two_phase_read_)
|
| return MOJO_RESULT_BUSY;
|
|
|
| + const bool had_new_data = new_data_available_;
|
| + new_data_available_ = false;
|
| +
|
| if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
|
| if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
|
| (flags & MOJO_READ_DATA_FLAG_DISCARD))
|
| @@ -140,6 +120,9 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| DVLOG_IF(2, elements)
|
| << "Query mode: ignoring non-null |elements|";
|
| *num_bytes = static_cast<uint32_t>(bytes_available_);
|
| +
|
| + if (had_new_data)
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -162,12 +145,16 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| all_or_none ? max_num_bytes_to_read : 0;
|
|
|
| if (min_num_bytes_to_read > bytes_available_) {
|
| + if (had_new_data)
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| : MOJO_RESULT_OUT_OF_RANGE;
|
| }
|
|
|
| uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
|
| if (bytes_to_read == 0) {
|
| + if (had_new_data)
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| : MOJO_RESULT_SHOULD_WAIT;
|
| }
|
| @@ -199,6 +186,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| NotifyRead(bytes_to_read);
|
| }
|
|
|
| + // We may have just read the last available data and thus changed the signals
|
| + // state.
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| +
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -206,7 +197,6 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
|
| uint32_t* buffer_num_bytes,
|
| MojoReadDataFlags flags) {
|
| base::AutoLock lock(lock_);
|
| - new_data_available_ = false;
|
| if (!shared_ring_buffer_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| @@ -219,7 +209,12 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
|
| (flags & MOJO_READ_DATA_FLAG_PEEK))
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| + const bool had_new_data = new_data_available_;
|
| + new_data_available_ = false;
|
| +
|
| if (bytes_available_ == 0) {
|
| + if (had_new_data)
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| : MOJO_RESULT_SHOULD_WAIT;
|
| }
|
| @@ -237,6 +232,9 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
|
| *buffer_num_bytes = bytes_to_read;
|
| two_phase_max_bytes_read_ = bytes_to_read;
|
|
|
| + if (had_new_data)
|
| + watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| +
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -273,6 +271,7 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| + watchers_.NotifyState(new_state);
|
|
|
| return rv;
|
| }
|
| @@ -282,6 +281,24 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
|
| return GetHandleSignalsStateNoLock();
|
| }
|
|
|
| +MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
|
| + const scoped_refptr<WatcherDispatcher>& watcher,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
|
| +}
|
| +
|
| +MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
|
| + WatcherDispatcher* watcher,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + return watchers_.Remove(watcher, context);
|
| +}
|
| +
|
| MojoResult DataPipeConsumerDispatcher::AddAwakable(
|
| Awakable* awakable,
|
| MojoHandleSignals signals,
|
| @@ -464,6 +481,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
|
| shared_ring_buffer_ = nullptr;
|
|
|
| awakable_list_.CancelAll();
|
| + watchers_.NotifyClosed();
|
| if (!transferred_) {
|
| base::AutoUnlock unlock(lock_);
|
| node_controller_->ClosePort(control_port_);
|
| @@ -581,7 +599,9 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
|
| new_data_available_ = true;
|
|
|
| if (peer_closed_ != was_peer_closed || has_new_data) {
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakable_list_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
| }
|
|
|
|
|