Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(359)

Unified Diff: mojo/edk/system/message_pipe_dispatcher.cc

Issue 1785843002: [mojo] Implement pipe fusion API (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/message_pipe_dispatcher.cc
diff --git a/mojo/edk/system/message_pipe_dispatcher.cc b/mojo/edk/system/message_pipe_dispatcher.cc
index c630b8b928176d6861289479cdbc032f8181e43d..29a6c69575e9ad911398b0c579f98833950db08c 100644
--- a/mojo/edk/system/message_pipe_dispatcher.cc
+++ b/mojo/edk/system/message_pipe_dispatcher.cc
@@ -99,6 +99,47 @@ MessagePipeDispatcher::MessagePipeDispatcher(NodeController* node_controller,
make_scoped_refptr(new PortObserverThunk(this)));
}
+bool MessagePipeDispatcher::BeginFuse() {
+ base::AutoLock lock(signal_lock_);
+ if (port_closed_ || in_transit_ || is_fusing_)
+ return false;
+ is_fusing_ = true;
+ return true;
+}
+
+void MessagePipeDispatcher::CancelFuse() {
+ base::AutoLock lock(signal_lock_);
+ CHECK(is_fusing_);
+ is_fusing_ = false;
+}
+
+bool MessagePipeDispatcher::CompleteFuse(MessagePipeDispatcher* other) {
+ node_controller_->SetPortObserver(port_, nullptr);
+ node_controller_->SetPortObserver(other->port_, nullptr);
+
+ ports::PortRef port0;
+ {
+ base::AutoLock lock(signal_lock_);
+ CHECK(is_fusing_);
+ port0 = port_;
+ port_closed_ = true;
+ awakables_.CancelAll();
+ }
+
+ ports::PortRef port1;
+ {
+ base::AutoLock lock(other->signal_lock_);
+ CHECK(other->is_fusing_);
+ port1 = other->port_;
+ other->port_closed_ = true;
+ other->awakables_.CancelAll();
+ }
+
+ // Both ports are always closed by this call.
+ int rv = node_controller_->MergeLocalPorts(port0, port1);
+ return rv == ports::OK;
+}
+
Dispatcher::Type MessagePipeDispatcher::GetType() const {
return Type::MESSAGE_PIPE;
}
@@ -115,7 +156,7 @@ MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
uintptr_t context) {
base::AutoLock lock(signal_lock_);
- if (port_closed_ || in_transit_)
+ if (port_closed_ || in_transit_ || is_fusing_)
return MOJO_RESULT_INVALID_ARGUMENT;
return awakables_.AddWatcher(
@@ -125,7 +166,7 @@ MojoResult MessagePipeDispatcher::Watch(MojoHandleSignals signals,
MojoResult MessagePipeDispatcher::CancelWatch(uintptr_t context) {
base::AutoLock lock(signal_lock_);
- if (port_closed_ || in_transit_)
+ if (port_closed_ || in_transit_ || is_fusing_)
return MOJO_RESULT_INVALID_ARGUMENT;
return awakables_.RemoveWatcher(context);
@@ -140,7 +181,7 @@ MojoResult MessagePipeDispatcher::WriteMessage(
{
base::AutoLock lock(signal_lock_);
- if (port_closed_ || in_transit_)
+ if (port_closed_ || in_transit_ || is_fusing_)
return MOJO_RESULT_INVALID_ARGUMENT;
}
@@ -294,7 +335,7 @@ MojoResult MessagePipeDispatcher::ReadMessage(void* bytes,
{
base::AutoLock lock(signal_lock_);
// We can't read from a port that's closed or in transit!
- if (port_closed_ || in_transit_)
+ if (port_closed_ || in_transit_ || is_fusing_)
return MOJO_RESULT_INVALID_ARGUMENT;
}
@@ -449,7 +490,7 @@ MojoResult MessagePipeDispatcher::AddAwakable(
HandleSignalsState* signals_state) {
base::AutoLock lock(signal_lock_);
- if (port_closed_ || in_transit_) {
+ if (port_closed_ || in_transit_ || is_fusing_) {
if (signals_state)
*signals_state = HandleSignalsState();
return MOJO_RESULT_INVALID_ARGUMENT;
@@ -487,7 +528,7 @@ MojoResult MessagePipeDispatcher::AddAwakable(
void MessagePipeDispatcher::RemoveAwakable(Awakable* awakable,
HandleSignalsState* signals_state) {
base::AutoLock lock(signal_lock_);
- if (port_closed_ || in_transit_) {
+ if (port_closed_ || in_transit_ || is_fusing_) {
if (signals_state)
*signals_state = HandleSignalsState();
} else if (signals_state) {
@@ -522,7 +563,7 @@ bool MessagePipeDispatcher::EndSerialize(void* destination,
bool MessagePipeDispatcher::BeginTransit() {
base::AutoLock lock(signal_lock_);
- if (in_transit_ || port_closed_)
+ if (in_transit_ || port_closed_ || is_fusing_)
return false;
in_transit_ = true;
return in_transit_;
@@ -573,7 +614,7 @@ MessagePipeDispatcher::~MessagePipeDispatcher() {
MojoResult MessagePipeDispatcher::CloseNoLock() {
signal_lock_.AssertAcquired();
- if (port_closed_ || in_transit_)
+ if (port_closed_ || in_transit_ || is_fusing_)
return MOJO_RESULT_INVALID_ARGUMENT;
port_closed_ = true;

Powered by Google App Engine
This is Rietveld 408576698