Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(406)

Unified Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
index ec8d2d855e4bbb6f62940b91f5176b99e2ab1d46..ed324d6fee3efe2a6df69182cb14d82d59408cbf 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -80,6 +80,7 @@ DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
node_controller_(node_controller),
control_port_(control_port),
pipe_id_(pipe_id),
+ watchers_(this),
shared_ring_buffer_(shared_ring_buffer) {
if (initialized) {
base::AutoLock lock(lock_);
@@ -97,29 +98,6 @@ MojoResult DataPipeConsumerDispatcher::Close() {
return CloseNoLock();
}
-
-MojoResult DataPipeConsumerDispatcher::Watch(
- MojoHandleSignals signals,
- const Watcher::WatchCallback& callback,
- uintptr_t context) {
- base::AutoLock lock(lock_);
-
- if (is_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakable_list_.AddWatcher(
- signals, callback, context, GetHandleSignalsStateNoLock());
-}
-
-MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
- base::AutoLock lock(lock_);
-
- if (is_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakable_list_.RemoveWatcher(context);
-}
-
MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
uint32_t* num_bytes,
MojoReadDataFlags flags) {
@@ -160,6 +138,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
all_or_none ? max_num_bytes_to_read : 0;
if (min_num_bytes_to_read > bytes_available_) {
+ if (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL) {
+ suppress_readable_signal_ = true;
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ }
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
: MOJO_RESULT_OUT_OF_RANGE;
}
@@ -197,6 +179,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
NotifyRead(bytes_to_read);
}
+ // We may have just read the last available data and thus changed the signals
+ // state.
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+
return MOJO_RESULT_OK;
}
@@ -213,7 +199,8 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
// These flags may not be used in two-phase mode.
if ((flags & MOJO_READ_DATA_FLAG_DISCARD) ||
(flags & MOJO_READ_DATA_FLAG_QUERY) ||
- (flags & MOJO_READ_DATA_FLAG_PEEK))
+ (flags & MOJO_READ_DATA_FLAG_PEEK) ||
+ (flags & MOJO_READ_DATA_FLAG_CLEAR_SIGNAL))
return MOJO_RESULT_INVALID_ARGUMENT;
if (bytes_available_ == 0) {
@@ -270,6 +257,7 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
HandleSignalsState new_state = GetHandleSignalsStateNoLock();
if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
+ watchers_.NotifyState(new_state);
return rv;
}
@@ -279,6 +267,24 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
return GetHandleSignalsStateNoLock();
}
+MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
+ const scoped_refptr<WatcherDispatcher>& watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
+ WatcherDispatcher* watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Remove(watcher, context);
+}
+
MojoResult DataPipeConsumerDispatcher::AddAwakable(
Awakable* awakable,
MojoHandleSignals signals,
@@ -460,6 +466,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
shared_ring_buffer_ = nullptr;
awakable_list_.CancelAll();
+ watchers_.NotifyClosed();
if (!transferred_) {
base::AutoUnlock unlock(lock_);
node_controller_->ClosePort(control_port_);
@@ -474,7 +481,7 @@ DataPipeConsumerDispatcher::GetHandleSignalsStateNoLock() const {
HandleSignalsState rv;
if (shared_ring_buffer_ && bytes_available_) {
- if (!in_two_phase_read_)
+ if (!in_two_phase_read_ && bytes_available_ && !suppress_readable_signal_)
yzshen1 2017/03/11 00:44:58 nit: there is no need to check |bytes_available_|
Ken Rockot(use gerrit already) 2017/03/12 22:24:13 Done
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_READABLE;
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_READABLE;
} else if (!peer_closed_ && shared_ring_buffer_) {
@@ -565,7 +572,10 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
if (peer_closed_ != was_peer_closed ||
bytes_available_ != previous_bytes_available) {
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ suppress_readable_signal_ = false;
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ awakable_list_.AwakeForStateChange(state);
+ watchers_.NotifyState(state);
}
}

Powered by Google App Engine
This is Rietveld 408576698