| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| index 8c1993aa5d36dd474b904eabeb5f1169465e5a9c..64ea3d5c3a78c04e4b56b520b63bc248f77d843d 100644
|
| --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| @@ -97,7 +97,7 @@ MojoResult DataPipeProducerDispatcher::Close() {
|
| return CloseNoLock();
|
| }
|
|
|
| -MojoResult DataPipeProducerDispatcher::Watch(
|
| +MojoResult DataPipeProducerDispatcher::RegisterWatcher(
|
| MojoHandleSignals signals,
|
| const Watcher::WatchCallback& callback,
|
| uintptr_t context) {
|
| @@ -106,17 +106,26 @@ MojoResult DataPipeProducerDispatcher::Watch(
|
| if (is_closed_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - return awakable_list_.AddWatcher(
|
| - signals, callback, context, GetHandleSignalsStateNoLock());
|
| + return watchers_.Add(signals, callback, context,
|
| + GetHandleSignalsStateNoLock());
|
| }
|
|
|
| -MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
|
| +MojoResult DataPipeProducerDispatcher::ArmWatcher(uintptr_t context) {
|
| base::AutoLock lock(lock_);
|
|
|
| if (is_closed_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - return awakable_list_.RemoveWatcher(context);
|
| + return watchers_.Arm(context, GetHandleSignalsStateNoLock());
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::UnregisterWatcher(uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + return watchers_.Remove(context);
|
| }
|
|
|
| MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
|
| @@ -177,8 +186,10 @@ MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
|
| options_.capacity_num_bytes;
|
|
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| - if (!new_state.equals(old_state))
|
| + if (!new_state.equals(old_state)) {
|
| awakable_list_.AwakeForStateChange(new_state);
|
| + watchers_.NotifyState(new_state);
|
| + }
|
|
|
| base::AutoUnlock unlock(lock_);
|
| NotifyWrite(num_bytes_to_write);
|
| @@ -249,8 +260,10 @@ MojoResult DataPipeProducerDispatcher::EndWriteData(
|
| // If we're now writable, we *became* writable (since we weren't writable
|
| // during the two-phase write), so awake producer awakables.
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| - if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| + if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE)) {
|
| awakable_list_.AwakeForStateChange(new_state);
|
| + watchers_.NotifyState(new_state);
|
| + }
|
|
|
| return rv;
|
| }
|
| @@ -356,7 +369,10 @@ void DataPipeProducerDispatcher::CancelTransit() {
|
| DCHECK(in_transit_);
|
| in_transit_ = false;
|
| buffer_handle_for_transit_.reset();
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| +
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakable_list_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
|
|
| // static
|
| @@ -440,6 +456,7 @@ MojoResult DataPipeProducerDispatcher::CloseNoLock() {
|
| shared_ring_buffer_ = nullptr;
|
|
|
| awakable_list_.CancelAll();
|
| + watchers_.NotifyClosed();
|
| if (!transferred_) {
|
| base::AutoUnlock unlock(lock_);
|
| node_controller_->ClosePort(control_port_);
|
| @@ -541,7 +558,9 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
|
|
|
| if (peer_closed_ != was_peer_closed ||
|
| available_capacity_ != previous_capacity) {
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakable_list_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
| }
|
|
|
|
|