Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1331)

Unified Diff: mojo/edk/system/data_pipe_producer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe_producer_dispatcher.cc
diff --git a/mojo/edk/system/data_pipe_producer_dispatcher.cc b/mojo/edk/system/data_pipe_producer_dispatcher.cc
index 8c1993aa5d36dd474b904eabeb5f1169465e5a9c..676196ed3dbe78e36b61684de3156f59c3ecddc9 100644
--- a/mojo/edk/system/data_pipe_producer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_producer_dispatcher.cc
@@ -79,6 +79,7 @@ DataPipeProducerDispatcher::DataPipeProducerDispatcher(
node_controller_(node_controller),
control_port_(control_port),
pipe_id_(pipe_id),
+ watchers_(this),
shared_ring_buffer_(shared_ring_buffer),
available_capacity_(options_.capacity_num_bytes) {
if (initialized) {
@@ -97,28 +98,6 @@ MojoResult DataPipeProducerDispatcher::Close() {
return CloseNoLock();
}
-MojoResult DataPipeProducerDispatcher::Watch(
- MojoHandleSignals signals,
- const Watcher::WatchCallback& callback,
- uintptr_t context) {
- base::AutoLock lock(lock_);
-
- if (is_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakable_list_.AddWatcher(
- signals, callback, context, GetHandleSignalsStateNoLock());
-}
-
-MojoResult DataPipeProducerDispatcher::CancelWatch(uintptr_t context) {
- base::AutoLock lock(lock_);
-
- if (is_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakable_list_.RemoveWatcher(context);
-}
-
MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
uint32_t* num_bytes,
MojoWriteDataFlags flags) {
@@ -139,8 +118,15 @@ MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) &&
(*num_bytes > available_capacity_)) {
+ if (flags & MOJO_WRITE_DATA_FLAG_CLEAR_SIGNAL) {
+ suppress_writable_signal_ = true;
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ }
// Don't return "should wait" since you can't wait for a specified amount of
// data.
+ //
+ // TODO(rockot): Reconsider this case now that we have support for minimum
+ // signaling capacity.
return MOJO_RESULT_OUT_OF_RANGE;
}
@@ -179,6 +165,7 @@ MojoResult DataPipeProducerDispatcher::WriteData(const void* elements,
HandleSignalsState new_state = GetHandleSignalsStateNoLock();
if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
+ watchers_.NotifyState(new_state);
base::AutoUnlock unlock(lock_);
NotifyWrite(num_bytes_to_write);
@@ -193,6 +180,12 @@ MojoResult DataPipeProducerDispatcher::BeginWriteData(
base::AutoLock lock(lock_);
if (!shared_ring_buffer_ || in_transit_)
return MOJO_RESULT_INVALID_ARGUMENT;
+
+ // These flags may not be used in two-phase mode.
+ if ((flags & MOJO_WRITE_DATA_FLAG_ALL_OR_NONE) ||
+ (flags & MOJO_WRITE_DATA_FLAG_CLEAR_SIGNAL))
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
if (in_two_phase_write_)
return MOJO_RESULT_BUSY;
if (peer_closed_)
@@ -251,6 +244,7 @@ MojoResult DataPipeProducerDispatcher::EndWriteData(
HandleSignalsState new_state = GetHandleSignalsStateNoLock();
if (new_state.satisfies(MOJO_HANDLE_SIGNAL_WRITABLE))
awakable_list_.AwakeForStateChange(new_state);
+ watchers_.NotifyState(new_state);
return rv;
}
@@ -260,6 +254,24 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsState() const {
return GetHandleSignalsStateNoLock();
}
+MojoResult DataPipeProducerDispatcher::AddWatcherRef(
+ const scoped_refptr<WatcherDispatcher>& watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult DataPipeProducerDispatcher::RemoveWatcherRef(
+ WatcherDispatcher* watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Remove(watcher, context);
+}
+
MojoResult DataPipeProducerDispatcher::AddAwakable(
Awakable* awakable,
MojoHandleSignals signals,
@@ -356,7 +368,10 @@ void DataPipeProducerDispatcher::CancelTransit() {
DCHECK(in_transit_);
in_transit_ = false;
buffer_handle_for_transit_.reset();
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ awakable_list_.AwakeForStateChange(state);
+ watchers_.NotifyState(state);
}
// static
@@ -440,6 +455,7 @@ MojoResult DataPipeProducerDispatcher::CloseNoLock() {
shared_ring_buffer_ = nullptr;
awakable_list_.CancelAll();
+ watchers_.NotifyClosed();
if (!transferred_) {
base::AutoUnlock unlock(lock_);
node_controller_->ClosePort(control_port_);
@@ -454,7 +470,7 @@ HandleSignalsState DataPipeProducerDispatcher::GetHandleSignalsStateNoLock()
HandleSignalsState rv;
if (!peer_closed_) {
if (!in_two_phase_write_ && shared_ring_buffer_ &&
- available_capacity_ > 0)
+ available_capacity_ > 0 && !suppress_writable_signal_)
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_WRITABLE;
} else {
@@ -541,7 +557,10 @@ void DataPipeProducerDispatcher::UpdateSignalsStateNoLock() {
if (peer_closed_ != was_peer_closed ||
available_capacity_ != previous_capacity) {
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ suppress_writable_signal_ = false;
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ awakable_list_.AwakeForStateChange(state);
+ watchers_.NotifyState(state);
}
}

Powered by Google App Engine
This is Rietveld 408576698