Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(427)

Unified Diff: mojo/edk/system/data_pipe_consumer_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.h ('k') | mojo/edk/system/data_pipe_producer_dispatcher.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/data_pipe_consumer_dispatcher.cc
diff --git a/mojo/edk/system/data_pipe_consumer_dispatcher.cc b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
index c908e3ae8bf7d4f9668d9fa3836d1e8b79f8f97b..474e99d4955fc3c0e6cdc98f045af90e9d44e988 100644
--- a/mojo/edk/system/data_pipe_consumer_dispatcher.cc
+++ b/mojo/edk/system/data_pipe_consumer_dispatcher.cc
@@ -80,6 +80,7 @@ DataPipeConsumerDispatcher::DataPipeConsumerDispatcher(
node_controller_(node_controller),
control_port_(control_port),
pipe_id_(pipe_id),
+ watchers_(this),
shared_ring_buffer_(shared_ring_buffer) {
if (initialized) {
base::AutoLock lock(lock_);
@@ -97,34 +98,10 @@ MojoResult DataPipeConsumerDispatcher::Close() {
return CloseNoLock();
}
-
-MojoResult DataPipeConsumerDispatcher::Watch(
- MojoHandleSignals signals,
- const Watcher::WatchCallback& callback,
- uintptr_t context) {
- base::AutoLock lock(lock_);
-
- if (is_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakable_list_.AddWatcher(
- signals, callback, context, GetHandleSignalsStateNoLock());
-}
-
-MojoResult DataPipeConsumerDispatcher::CancelWatch(uintptr_t context) {
- base::AutoLock lock(lock_);
-
- if (is_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakable_list_.RemoveWatcher(context);
-}
-
MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
uint32_t* num_bytes,
MojoReadDataFlags flags) {
base::AutoLock lock(lock_);
- new_data_available_ = false;
if (!shared_ring_buffer_ || in_transit_)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -132,6 +109,9 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
if (in_two_phase_read_)
return MOJO_RESULT_BUSY;
+ const bool had_new_data = new_data_available_;
+ new_data_available_ = false;
+
if ((flags & MOJO_READ_DATA_FLAG_QUERY)) {
if ((flags & MOJO_READ_DATA_FLAG_PEEK) ||
(flags & MOJO_READ_DATA_FLAG_DISCARD))
@@ -140,6 +120,9 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
DVLOG_IF(2, elements)
<< "Query mode: ignoring non-null |elements|";
*num_bytes = static_cast<uint32_t>(bytes_available_);
+
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
return MOJO_RESULT_OK;
}
@@ -162,12 +145,16 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
all_or_none ? max_num_bytes_to_read : 0;
if (min_num_bytes_to_read > bytes_available_) {
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
: MOJO_RESULT_OUT_OF_RANGE;
}
uint32_t bytes_to_read = std::min(max_num_bytes_to_read, bytes_available_);
if (bytes_to_read == 0) {
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
: MOJO_RESULT_SHOULD_WAIT;
}
@@ -199,6 +186,10 @@ MojoResult DataPipeConsumerDispatcher::ReadData(void* elements,
NotifyRead(bytes_to_read);
}
+ // We may have just read the last available data and thus changed the signals
+ // state.
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+
return MOJO_RESULT_OK;
}
@@ -206,7 +197,6 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
uint32_t* buffer_num_bytes,
MojoReadDataFlags flags) {
base::AutoLock lock(lock_);
- new_data_available_ = false;
if (!shared_ring_buffer_ || in_transit_)
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -219,7 +209,12 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
(flags & MOJO_READ_DATA_FLAG_PEEK))
return MOJO_RESULT_INVALID_ARGUMENT;
+ const bool had_new_data = new_data_available_;
+ new_data_available_ = false;
+
if (bytes_available_ == 0) {
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
return peer_closed_ ? MOJO_RESULT_FAILED_PRECONDITION
: MOJO_RESULT_SHOULD_WAIT;
}
@@ -237,6 +232,9 @@ MojoResult DataPipeConsumerDispatcher::BeginReadData(const void** buffer,
*buffer_num_bytes = bytes_to_read;
two_phase_max_bytes_read_ = bytes_to_read;
+ if (had_new_data)
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+
return MOJO_RESULT_OK;
}
@@ -273,6 +271,7 @@ MojoResult DataPipeConsumerDispatcher::EndReadData(uint32_t num_bytes_read) {
HandleSignalsState new_state = GetHandleSignalsStateNoLock();
if (!new_state.equals(old_state))
awakable_list_.AwakeForStateChange(new_state);
+ watchers_.NotifyState(new_state);
return rv;
}
@@ -282,6 +281,24 @@ HandleSignalsState DataPipeConsumerDispatcher::GetHandleSignalsState() const {
return GetHandleSignalsStateNoLock();
}
+MojoResult DataPipeConsumerDispatcher::AddWatcherRef(
+ const scoped_refptr<WatcherDispatcher>& watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult DataPipeConsumerDispatcher::RemoveWatcherRef(
+ WatcherDispatcher* watcher,
+ uintptr_t context) {
+ base::AutoLock lock(lock_);
+ if (is_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Remove(watcher, context);
+}
+
MojoResult DataPipeConsumerDispatcher::AddAwakable(
Awakable* awakable,
MojoHandleSignals signals,
@@ -464,6 +481,7 @@ MojoResult DataPipeConsumerDispatcher::CloseNoLock() {
shared_ring_buffer_ = nullptr;
awakable_list_.CancelAll();
+ watchers_.NotifyClosed();
if (!transferred_) {
base::AutoUnlock unlock(lock_);
node_controller_->ClosePort(control_port_);
@@ -581,7 +599,9 @@ void DataPipeConsumerDispatcher::UpdateSignalsStateNoLock() {
new_data_available_ = true;
if (peer_closed_ != was_peer_closed || has_new_data) {
- awakable_list_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ awakable_list_.AwakeForStateChange(state);
+ watchers_.NotifyState(state);
}
}
« no previous file with comments | « mojo/edk/system/data_pipe_consumer_dispatcher.h ('k') | mojo/edk/system/data_pipe_producer_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698