| Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| index ec8d2d855e4bbb6f62940b91f5176b99e2ab1d46..63abf35d77fc66873ce16012345a280dbebe0cd8 100644
|
| --- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
|
| @@ -97,8 +97,7 @@ MojoResult DataPipeConsumerDispatcher::Close() {
|
| return CloseNoLock();
|
| }
|
|
|
| -
|
| -MojoResult DataPipeConsumerDispatcher::Watch(
|
| +MojoResult DataPipeConsumerDispatcher::RegisterWatcher(
|
| MojoHandleSignals signals,
|
| const Watcher::WatchCallback& callback,
|
| uintptr_t context) {
|
| @@ -107,17 +106,26 @@ MojoResult DataPipeConsumerDispatcher::Watch(
|
| if (is_closed_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - return awakable_list_.AddWatcher(
|
| - signals, callback, context, GetHandleSignalsStateNoLock());
|
| + return watchers_.Add(signals, callback, context,
|
| + GetHandleSignalsStateNoLock());
|
| }
|
|
|
| -MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
|
| +MojoResult DataPipeConsumerDispatcher::ArmWatcher(uintptr_t context) {
|
| base::AutoLock lock(lock_);
|
|
|
| if (is_closed_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
|
|
| - return awakable_list_.RemoveWatcher(context);
|
| + return watchers_.Arm(context, GetHandleSignalsStateNoLock());
|
| +}
|
| +
|
| +MojoResult DataPipeConsumerDispatcher::UnregisterWatcher(uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| +
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + return watchers_.Remove(context);
|
| }
|
|
|
| MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
|
| @@ -268,8 +276,10 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
|
| two_phase_max_bytes_read_ = 0;
|
|
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| - if (!new_state.equals(old_state))
|
| + if (!new_state.equals(old_state)) {
|
| awakable_list_.AwakeForStateChange(new_state);
|
| + watchers_.NotifyState(new_state);
|
| + }
|
|
|
| return rv;
|
| }
|
| @@ -460,6 +470,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
|
| shared_ring_buffer_ = nullptr;
|
|
|
| awakable_list_.CancelAll();
|
| + watchers_.NotifyClosed();
|
| if (!transferred_) {
|
| base::AutoUnlock unlock(lock_);
|
| node_controller_->ClosePort(control_port_);
|
| @@ -565,7 +576,9 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
|
|
|
| if (peer_closed_ != was_peer_closed ||
|
| bytes_available_ != previous_bytes_available) {
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakable_list_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
| }
|
|
|
|
|