Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(578)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 2725133002: Mojo: Armed Watchers (Closed)
Patch Set: . Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/message_pipe_dispatcher.cc
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index f27336b56f4d074a0e5e32ec2a592fcccff363a3..76f412306d042ce7d072b4bcf8c2eb7e4cb43de7 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -164,7 +164,8 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
: node_controller_(node_controller),
port_(port),
pipe_id_(pipe_id),
- endpoint_(endpoint) {
+ endpoint_(endpoint),
+ watchers_(this) {
DVLOG(2) << "Creating new MessagePipeDispatcher for port " << port.name()
<< " [pipe_id=" << pipe_id << "; endpoint=" << endpoint << "]";
@@ -183,6 +184,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
port0 = port_;
port_closed_.Set(true);
awakables_.CancelAll();
+ watchers_.NotifyClosed();
}
ports::PortRef port1;
@@ -191,6 +193,7 @@ bool MessagePipeDispatcher::Fuse(MessagePipeDispatcher* other) {
port1 = other->port_;
other->port_closed_.Set(true);
other->awakables_.CancelAll();
+ other->watchers_.NotifyClosed();
}
// Both ports are always closed by this call.
@@ -209,27 +212,6 @@ MojoResult MessagePipeDispatcher::Close() {
return CloseNoLock();
}
-MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
- const Watcher::WatchCallback& callback,
- uintptr_t context) {
- base::AutoLock lock(signal_lock_);
-
- if (port_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakables_.AddWatcher(
- signals, callback, context, GetHandleSignalsStateNoLock());
-}
-
-MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
- base::AutoLock lock(signal_lock_);
-
- if (port_closed_ || in_transit_)
- return MOJO_RESULT_INVALID_ARGUMENT;
-
- return awakables_.RemoveWatcher(context);
-}
-
MojoResult MessagePipeDispatcher::WriteMessage(
std::unique_ptr<MessageForTransit> message,
MojoWriteMessageFlags flags) {
@@ -299,6 +281,12 @@ MojoResult MessagePipeDispatcher::ReadMessage(
}
if (no_space) {
+ if (may_discard) {
+ // May have been the last message on the pipe. Need to update signals just
+ // in case.
+ base::AutoLock lock(signal_lock_);
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ }
// |*num_handles| (and/or |*num_bytes| if |read_any_size| is false) wasn't
// sufficient to hold this message's data. The message will still be in
// queue unless MOJO_READ_MESSAGE_FLAG_MAY_DISCARD was set.
@@ -319,6 +307,13 @@ MojoResult MessagePipeDispatcher::ReadMessage(
// Alright! We have a message and the caller has provided sufficient storage
// in which to receive it.
+ {
+ // We need to update anyone watching our signals in case that was the last
+ // available message.
+ base::AutoLock lock(signal_lock_);
+ watchers_.NotifyState(GetHandleSignalsStateNoLock());
+ }
+
std::unique_ptr<PortsMessage> msg(
static_cast<PortsMessage*>(ports_message.release()));
@@ -396,6 +391,23 @@ MessagePipeDispatcher::GetHandleSignalsState() const {
return GetHandleSignalsStateNoLock();
}
+MojoResult MessagePipeDispatcher::AddWatcherRef(
+ const scoped_refptr<WatcherDispatcher>& watcher,
+ uintptr_t context) {
+ base::AutoLock lock(signal_lock_);
+ if (port_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Add(watcher, context, GetHandleSignalsStateNoLock());
+}
+
+MojoResult MessagePipeDispatcher::RemoveWatcherRef(WatcherDispatcher* watcher,
+ uintptr_t context) {
+ base::AutoLock lock(signal_lock_);
+ if (port_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+ return watchers_.Remove(watcher, context);
+}
+
MojoResult MessagePipeDispatcher::AddAwakable(
Awakable* awakable,
MojoHandleSignals signals,
@@ -496,7 +508,9 @@ void MessagePipeDispatcher::CancelTransit() {
in_transit_.Set(false);
// Something may have happened while we were waiting for potential transit.
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ awakables_.AwakeForStateChange(state);
+ watchers_.NotifyState(state);
}
// static
@@ -532,6 +546,7 @@ MojoResult MessagePipeDispatcher::CloseNoLock() {
port_closed_.Set(true);
awakables_.CancelAll();
+ watchers_.NotifyClosed();
if (!port_transferred_) {
base::AutoUnlock unlock(signal_lock_);
@@ -596,7 +611,9 @@ void MessagePipeDispatcher::OnPortStatusChanged() {
}
#endif
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ awakables_.AwakeForStateChange(state);
+ watchers_.NotifyState(state);
}
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698