| Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| index 8c1993aa5d36dd474b904eabeb5f1169465e5a9c..71c61963e23f60d50dbd858b2473600c0085bf45 100644
|
| --- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| +++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
|
| @@ -79,6 +79,7 @@ DataPipeProducerDispatcher::DataPipeProducerDispatcher(
|
| node_controller_(node_controller),
|
| control_port_(control_port),
|
| pipe_id_(pipe_id),
|
| + watchers_(this),
|
| shared_ring_buffer_(shared_ring_buffer),
|
| available_capacity_(options_.capacity_num_bytes) {
|
| if (initialized) {
|
| @@ -97,28 +98,6 @@ MojoResult DataPipeProducerDispatcher::Close() {
|
| return CloseNoLock();
|
| }
|
|
|
| -MojoResult DataPipeProducerDispatcher::Watch(
|
| - MojoHandleSignals signals,
|
| - const Watcher::WatchCallback& callback,
|
| - uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| -
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - return awakable_list_.AddWatcher(
|
| - signals, callback, context, GetHandleSignalsStateNoLock());
|
| -}
|
| -
|
| -MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
|
| - base::AutoLock lock(lock_);
|
| -
|
| - if (is_closed_ || in_transit_)
|
| - return MOJO_RESULT_INVALID_ARGUMENT;
|
| -
|
| - return awakable_list_.RemoveWatcher(context);
|
| -}
|
| -
|
| MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
|
| uint32_t* num_bytes,
|
| MojoWriteDataFlags flags) {
|
| @@ -179,6 +158,7 @@ MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (!new_state.equals(old_state))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| + watchers_.NotifyState(new_state);
|
|
|
| base::AutoUnlock unlock(lock_);
|
| NotifyWrite(num_bytes_to_write);
|
| @@ -193,6 +173,11 @@ MojoResult DataPipeProducerDispatcher::BeginWriteData(
|
| base::AutoLock lock(lock_);
|
| if (!shared_ring_buffer_ || in_transit_)
|
| return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| + // These flags may not be used in two-phase mode.
|
| + if (flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| +
|
| if (in_two_phase_write_)
|
| return MOJO_RESULT_BUSY;
|
| if (peer_closed_)
|
| @@ -251,6 +236,7 @@ MojoResult DataPipeProducerDispatcher::EndWriteData(
|
| HandleSignalsState new_state = GetHandleSignalsStateNoLock();
|
| if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
|
| awakable_list_.AwakeForStateChange(new_state);
|
| + watchers_.NotifyState(new_state);
|
|
|
| return rv;
|
| }
|
| @@ -260,6 +246,24 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
|
| return GetHandleSignalsStateNoLock();
|
| }
|
|
|
| +MojoResult DataPipeProducerDispatcher::AddWatcherRef(
|
| + const scoped_refptr<WatcherDispatcher>& watcher,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
|
| +}
|
| +
|
| +MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
|
| + WatcherDispatcher* watcher,
|
| + uintptr_t context) {
|
| + base::AutoLock lock(lock_);
|
| + if (is_closed_ || in_transit_)
|
| + return MOJO_RESULT_INVALID_ARGUMENT;
|
| + return watchers_.Remove(watcher, context);
|
| +}
|
| +
|
| MojoResult DataPipeProducerDispatcher::AddAwakable(
|
| Awakable* awakable,
|
| MojoHandleSignals signals,
|
| @@ -356,7 +360,10 @@ void DataPipeProducerDispatcher::CancelTransit() {
|
| DCHECK(in_transit_);
|
| in_transit_ = false;
|
| buffer_handle_for_transit_.reset();
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| +
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakable_list_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
|
|
| // static
|
| @@ -440,6 +447,7 @@ MojoResult DataPipeProducerDispatcher::CloseNoLock() {
|
| shared_ring_buffer_ = nullptr;
|
|
|
| awakable_list_.CancelAll();
|
| + watchers_.NotifyClosed();
|
| if (!transferred_) {
|
| base::AutoUnlock unlock(lock_);
|
| node_controller_->ClosePort(control_port_);
|
| @@ -453,8 +461,7 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
|
| lock_.AssertAcquired();
|
| HandleSignalsState rv;
|
| if (!peer_closed_) {
|
| - if (!in_two_phase_write_ && shared_ring_buffer_ &&
|
| - available_capacity_ > 0)
|
| + if (!in_two_phase_write_ && shared_ring_buffer_ && available_capacity_ > 0)
|
| rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
|
| } else {
|
| @@ -541,7 +548,9 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
|
|
|
| if (peer_closed_ != was_peer_closed ||
|
| available_capacity_ != previous_capacity) {
|
| - awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
|
| + HandleSignalsState state = GetHandleSignalsStateNoLock();
|
| + awakable_list_.AwakeForStateChange(state);
|
| + watchers_.NotifyState(state);
|
| }
|
| }
|
|
|
|
|