| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| index 71c61963e23f60d50dbd858b2473600c0085bf45..8c1993aa5d36dd474b904eabeb5f1169465e5a9c 100644
|
| --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| @@ -79,7 +79,6 @@
|
| node_controller_(node_controller),
|
| control_port_(control_port),
|
| pipe_id_(pipe_id),
|
| - watchers_(this),
|
| shared_ring_buffer_(shared_ring_buffer),
|
| available_capacity_(options_.capacity_num_bytes) {
|
| if (initialized) {
|
| @@ -98,6 +97,28 @@
|
| return CloseNoLock();
|
| }
|
|
|
| +MojoResult DataPipeProducerDispatcher::Watch(
|
| + MojoHandleSignals signals,
|
| + const Watcher::WatchCallback& callback,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + return awakable_list_.AddWatcher(
|
| + signals, callback, context, GetHandleSignalsStateNoLock());
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + return awakable_list_.RemoveWatcher(context);
|
| +}
|
| +
|
| MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
|
| uint32_t* num_bytes,
|
| MojoWriteDataFlags flags) {
|
| @@ -158,7 +179,6 @@
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| - watchers_.NotifyState(new_state);
|
|
|
| base::AutoUnlock unlock(lock_);
|
| NotifyWrite(num_bytes_to_write);
|
| @@ -173,11 +193,6 @@
|
| base::AutoLock lock(lock_);
|
| if (!shared_ring_buffer_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - // These flags may not be used in two-phase mode.
|
| - if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| if (in_two_phase_write_)
|
| return MOJO_RESULT_BUSY;
|
| if (peer_closed_)
|
| @@ -236,7 +251,6 @@
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| - watchers_.NotifyState(new_state);
|
|
|
| return rv;
|
| }
|
| @@ -244,24 +258,6 @@
|
| HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
|
| base::AutoLock lock(lock_);
|
| return GetHandleSignalsStateNoLock();
|
| -}
|
| -
|
| -MojoResult DataPipeProducerDispatcher::AddWatcherRef(
|
| - const scoped_refptr<WatcherDispatcher>& watcher,
|
| - uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| - return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
|
| -}
|
| -
|
| -MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
|
| - WatcherDispatcher* watcher,
|
| - uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| - return watchers_.Remove(watcher, context);
|
| }
|
|
|
| MojoResult DataPipeProducerDispatcher::AddAwakable(
|
| @@ -360,10 +356,7 @@
|
| DCHECK(in_transit_);
|
| in_transit_ = false;
|
| buffer_handle_for_transit_.reset();
|
| -
|
| - HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| - awakable_list_.AwakeForStateChange(state);
|
| - watchers_.NotifyState(state);
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| }
|
|
|
| // static
|
| @@ -447,7 +440,6 @@
|
| shared_ring_buffer_ = nullptr;
|
|
|
| awakable_list_.CancelAll();
|
| - watchers_.NotifyClosed();
|
| if (!transferred_) {
|
| base::AutoUnlock unlock(lock_);
|
| node_controller_->ClosePort(control_port_);
|
| @@ -461,7 +453,8 @@
|
| lock_.AssertAcquired();
|
| HandleSignalsState rv;
|
| if (!peer_closed_) {
|
| - if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0)
|
| + if (!in_two_phase_write_ && shared_ring_buffer_ &&
|
| + available_capacity_ > 0)
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| } else {
|
| @@ -548,9 +541,7 @@
|
|
|
| if (peer_closed_ != was_peer_closed ||
|
| available_capacity_ != previous_capacity) {
|
| - HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| - awakable_list_.AwakeForStateChange(state);
|
| - watchers_.NotifyState(state);
|
| + awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| }
|
| }
|
|
|
|
|