| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| index 474e99d4955fc3c0e6cdc98f045af90e9d44e988..c908e3ae8bf7d4f9668d9fa3836d1e8b79f8f97b 100644
|
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| @@ -80,7 +80,6 @@
|
| node_controller_(node_controller),
|
| control_port_(control_port),
|
| pipe_id_(pipe_id),
|
| - watchers_(this),
|
| shared_ring_buffer_(shared_ring_buffer) {
|
| if (initialized) {
|
| base::AutoLock lock(lock_);
|
| @@ -98,19 +97,40 @@
|
| return CloseNoLock();
|
| }
|
|
|
| +
|
| +MojoResult DataPipeConsumerDispatcher::Watch(
|
| + MojoHandleSignals signals,
|
| + const Watcher::WatchCallback& callback,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + return awakable_list_.AddWatcher(
|
| + signals, callback, context, GetHandleSignalsStateNoLock());
|
| +}
|
| +
|
| +MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + return awakable_list_.RemoveWatcher(context);
|
| +}
|
| +
|
| MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| uint32_t* num_bytes,
|
| MojoReadDataFlags flags) {
|
| base::AutoLock lock(lock_);
|
| + new_data_available_ = false;
|
|
|
| if (!shared_ring_buffer_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| if (in_two_phase_read_)
|
| return MOJO_RESULT_BUSY;
|
| -
|
| - const bool had_new_data = new_data_available_;
|
| - new_data_available_ = false;
|
|
|
| if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
|
| if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
|
| @@ -120,9 +140,6 @@
|
| DVLOG_IF(2, elements)
|
| << "Query mode: ignoring non-null |elements|";
|
| *num_bytes = static_cast<uint32_t>(bytes_available_);
|
| -
|
| - if (had_new_data)
|
| - watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -145,16 +162,12 @@
|
| all_or_none ? max_num_bytes_to_read : 0;
|
|
|
| if (min_num_bytes_to_read > bytes_available_) {
|
| - if (had_new_data)
|
| - watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| : MOJO_RESULT_OUT_OF_RANGE;
|
| }
|
|
|
| uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
|
| if (bytes_to_read == 0) {
|
| - if (had_new_data)
|
| - watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| : MOJO_RESULT_SHOULD_WAIT;
|
| }
|
| @@ -186,10 +199,6 @@
|
| NotifyRead(bytes_to_read);
|
| }
|
|
|
| - // We may have just read the last available data and thus changed the signals
|
| - // state.
|
| - watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| -
|
| return MOJO_RESULT_OK;
|
| }
|
|
|
| @@ -197,6 +206,7 @@
|
| uint32_t* buffer_num_bytes,
|
| MojoReadDataFlags flags) {
|
| base::AutoLock lock(lock_);
|
| + new_data_available_ = false;
|
| if (!shared_ring_buffer_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| @@ -209,12 +219,7 @@
|
| (flags & MOJO_READ_DATA_FLAG_PEEK))
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - const bool had_new_data = new_data_available_;
|
| - new_data_available_ = false;
|
| -
|
| if (bytes_available_ == 0) {
|
| - if (had_new_data)
|
| - watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
| return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
|
| : MOJO_RESULT_SHOULD_WAIT;
|
| }
|
| @@ -231,9 +236,6 @@
|
| *buffer = data + read_offset_;
|
| *buffer_num_bytes = bytes_to_read;
|
| two_phase_max_bytes_read_ = bytes_to_read;
|
| -
|
| - if (had_new_data)
|
| - watchers_.NotifyState(GetHandleSignalsStateNoLock());
|
|
|
| return MOJO_RESULT_OK;
|
| }
|
| @@ -271,7 +273,6 @@
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| - watchers_.NotifyState(new_state);
|
|
|
| return rv;
|
| }
|
| @@ -279,24 +280,6 @@
|
| HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
|
| base::AutoLock lock(lock_);
|
| return GetHandleSignalsStateNoLock();
|
| -}
|
| -
|
| -MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
|
| - const scoped_refptr<WatcherDispatcher>& watcher,
|
| - uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| - return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
|
| -}
|
| -
|
| -MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
|
| - WatcherDispatcher* watcher,
|
| - uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| - return watchers_.Remove(watcher, context);
|
| }
|
|
|
| MojoResult DataPipeConsumerDispatcher::AddAwakable(
|
| @@ -481,7 +464,6 @@
|
| shared_ring_buffer_ = nullptr;
|
|
|
| awakable_list_.CancelAll();
|
| - watchers_.NotifyClosed();
|
| if (!transferred_) {
|
| base::AutoUnlock unlock(lock_);
|
| node_controller_->ClosePort(control_port_);
|
| @@ -599,9 +581,7 @@
|
| new_data_available_ = true;
|
|
|
| if (peer_closed_ != was_peer_closed || has_new_data) {
|
| - HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| - awakable_list_.AwakeForStateChange(state);
|
| - watchers_.NotifyState(state);
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| }
|
| }
|
|
|
|
|