Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(643)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1748503002: [mojo-edk] Add MojoWatch and MojoCancelWatch APIs (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/message_pipe_dispatcher.cc
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index c7b4a32da6bad0e0cbea48ddc8b3e60683a01966..2406fe9a66a5135ccdb2392af5f3c7038479eebc 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -90,11 +90,33 @@ Dispatcher::Type MessagePipeDispatcher::GetType() const {
return Type::MESSAGE_PIPE;
}
-MojoResult MessagePipeDispatcher::Close() {
+MojoResult MessagePipeDispatcher::Close(RequestContext* request_context) {
base::AutoLock lock(signal_lock_);
DVLOG(1) << "Closing message pipe " << pipe_id_ << " endpoint " << endpoint_
<< " [port=" << port_.name() << "]";
- return CloseNoLock();
+ return CloseNoLock(request_context);
+}
+
+MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
+ const WatchCallback& callback,
+ uintptr_t context,
+ RequestContext* request_context) {
+ base::AutoLock lock(signal_lock_);
+
+ if (port_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ return watchers_.Add(signals, callback, context, state, request_context);
+}
+
+MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
+ base::AutoLock lock(signal_lock_);
+
+ if (port_closed_ || in_transit_)
+ return MOJO_RESULT_INVALID_ARGUMENT;
+
+ return watchers_.Remove(context);
}
MojoResult MessagePipeDispatcher::WriteMessage(
@@ -102,7 +124,8 @@ MojoResult MessagePipeDispatcher::WriteMessage(
uint32_t num_bytes,
const DispatcherInTransit* dispatchers,
uint32_t num_dispatchers,
- MojoWriteMessageFlags flags) {
+ MojoWriteMessageFlags flags,
+ RequestContext* request_context) {
{
base::AutoLock lock(signal_lock_);
@@ -226,7 +249,7 @@ MojoResult MessagePipeDispatcher::WriteMessage(
result = MOJO_RESULT_INVALID_ARGUMENT;
} else if (rv == ports::ERROR_PORT_PEER_CLOSED) {
base::AutoLock lock(signal_lock_);
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ NotifyObserversForStateChangeNoLock(request_context);
result = MOJO_RESULT_FAILED_PRECONDITION;
} else {
NOTREACHED();
@@ -256,7 +279,8 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
uint32_t* num_bytes,
MojoHandle* handles,
uint32_t* num_handles,
- MojoReadMessageFlags flags) {
+ MojoReadMessageFlags flags,
+ RequestContext* request_context) {
{
base::AutoLock lock(signal_lock_);
// We can't read from a port that's closed or in transit!
@@ -336,7 +360,7 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
// Peer is closed and there are no more messages to read.
DCHECK_EQ(rv, ports::ERROR_PORT_PEER_CLOSED);
base::AutoLock lock(signal_lock_);
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ NotifyObserversForStateChangeNoLock(request_context);
return MOJO_RESULT_FAILED_PRECONDITION;
}
@@ -485,7 +509,7 @@ bool MessagePipeDispatcher::EndSerialize(void* destination,
return true;
}
-bool MessagePipeDispatcher::BeginTransit() {
+bool MessagePipeDispatcher::BeginTransit(RequestContext* request_context) {
base::AutoLock lock(signal_lock_);
if (in_transit_ || port_closed_)
return false;
@@ -493,21 +517,22 @@ bool MessagePipeDispatcher::BeginTransit() {
return in_transit_;
}
-void MessagePipeDispatcher::CompleteTransitAndClose() {
+void MessagePipeDispatcher::CompleteTransitAndClose(
+ RequestContext* request_context) {
node_controller_->SetPortObserver(port_, nullptr);
base::AutoLock lock(signal_lock_);
in_transit_ = false;
port_transferred_ = true;
- CloseNoLock();
+ CloseNoLock(request_context);
}
-void MessagePipeDispatcher::CancelTransit() {
+void MessagePipeDispatcher::CancelTransit(RequestContext* request_context) {
base::AutoLock lock(signal_lock_);
in_transit_ = false;
// Something may have happened while we were waiting for potential transit.
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ NotifyObserversForStateChangeNoLock(request_context);
}
// static
@@ -536,12 +561,13 @@ MessagePipeDispatcher::~MessagePipeDispatcher() {
DCHECK(port_closed_ && !in_transit_);
}
-MojoResult MessagePipeDispatcher::CloseNoLock() {
+MojoResult MessagePipeDispatcher::CloseNoLock(RequestContext* request_context) {
signal_lock_.AssertAcquired();
if (port_closed_ || in_transit_)
return MOJO_RESULT_INVALID_ARGUMENT;
port_closed_ = true;
+ watchers_.CancelAll(request_context);
awakables_.CancelAll();
if (!port_transferred_) {
@@ -579,6 +605,10 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
}
void MessagePipeDispatcher::OnPortStatusChanged() {
+ // Create a new RequestContext that will run its finalizers after
+ // |signal_lock_| is released.
+ RequestContext request_context;
+
base::AutoLock lock(signal_lock_);
// We stop observing our port as soon as it's transferred, but this can race
@@ -608,7 +638,14 @@ void MessagePipeDispatcher::OnPortStatusChanged() {
}
#endif
- awakables_.AwakeForStateChange(GetHandleSignalsStateNoLock());
+ NotifyObserversForStateChangeNoLock(&request_context);
+}
+
+void MessagePipeDispatcher::NotifyObserversForStateChangeNoLock(
+ RequestContext* request_context) {
+ HandleSignalsState state = GetHandleSignalsStateNoLock();
+ awakables_.AwakeForStateChange(state);
+ watchers_.NotifyOfStateChange(state, request_context);
}
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698