Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(109)

Unified Diff: mojo/system/message_pipe.cc

Issue 60103005: Mojo: First stab at making MessagePipes work across OS pipes. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebased Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/system/message_pipe.cc
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
index bfb06466efedc59b6e1a20c7d12f02e0d9231841..857cdbcd2ba019221160e244eede76e8f3f2e8ea 100644
--- a/mojo/system/message_pipe.cc
+++ b/mojo/system/message_pipe.cc
@@ -6,22 +6,15 @@
#include "base/logging.h"
#include "base/stl_util.h"
+#include "mojo/system/channel.h"
#include "mojo/system/local_message_pipe_endpoint.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/message_pipe_endpoint.h"
+#include "mojo/system/proxy_message_pipe_endpoint.h"
namespace mojo {
namespace system {
-namespace {
-
-unsigned DestinationPortFromSourcePort(unsigned port) {
- DCHECK(port == 0 || port == 1);
- return port ^ 1;
-}
-
-} // namespace
-
MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
scoped_ptr<MessagePipeEndpoint> endpoint_1) {
endpoints_[0].reset(endpoint_0.release());
@@ -33,6 +26,12 @@ MessagePipe::MessagePipe() {
endpoints_[1].reset(new LocalMessagePipeEndpoint());
}
+// static
+unsigned MessagePipe::GetPeerPort(unsigned port) {
+ DCHECK(port == 0 || port == 1);
+ return port ^ 1;
+}
+
void MessagePipe::CancelAllWaiters(unsigned port) {
DCHECK(port == 0 || port == 1);
@@ -44,38 +43,36 @@ void MessagePipe::CancelAllWaiters(unsigned port) {
void MessagePipe::Close(unsigned port) {
DCHECK(port == 0 || port == 1);
- unsigned destination_port = DestinationPortFromSourcePort(port);
+ unsigned destination_port = GetPeerPort(port);
base::AutoLock locker(lock_);
DCHECK(endpoints_[port].get());
endpoints_[port]->Close();
- if (endpoints_[destination_port].get())
- endpoints_[destination_port]->OnPeerClose();
+ bool should_destroy_destination = endpoints_[destination_port].get() ?
+ !endpoints_[destination_port]->OnPeerClose() : false;
endpoints_[port].reset();
+ if (should_destroy_destination) {
+ endpoints_[destination_port]->Close();
+ endpoints_[destination_port].reset();
+ }
}
+// TODO(vtl): Support sending handles.
// TODO(vtl): Handle flags.
MojoResult MessagePipe::WriteMessage(
unsigned port,
const void* bytes, uint32_t num_bytes,
- const MojoHandle* handles, uint32_t num_handles,
+ const MojoHandle* /*handles*/, uint32_t /*num_handles*/,
MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
-
- unsigned destination_port = DestinationPortFromSourcePort(port);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port].get());
-
- // The destination port need not be open, unlike the source port.
- if (!endpoints_[destination_port].get())
- return MOJO_RESULT_FAILED_PRECONDITION;
-
- return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
- handles, num_handles,
- flags);
+ return EnqueueMessage(
+ GetPeerPort(port),
+ MessageInTransit::Create(
+ MessageInTransit::kTypeMessagePipeEndpoint,
+ MessageInTransit::kSubtypeMessagePipeEndpointData,
+ bytes, num_bytes));
}
MojoResult MessagePipe::ReadMessage(unsigned port,
@@ -113,6 +110,54 @@ void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
endpoints_[port]->RemoveWaiter(waiter);
}
+MojoResult MessagePipe::EnqueueMessage(unsigned port,
+ MessageInTransit* message) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK(message);
+
+ if (message->type() == MessageInTransit::kTypeMessagePipe)
+ return HandleControlMessage(port, message);
+
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[GetPeerPort(port)].get());
+
+ // The destination port need not be open, unlike the source port.
+ if (!endpoints_[port].get()) {
+ message->Destroy();
+ return MOJO_RESULT_FAILED_PRECONDITION;
+ }
+
+ return endpoints_[port]->EnqueueMessage(message);
+}
+
+void MessagePipe::Attach(unsigned port,
+ scoped_refptr<Channel> channel,
+ MessageInTransit::EndpointId local_id) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK(channel.get());
+ DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[port].get());
+
+ endpoints_[port]->Attach(channel, local_id);
+}
+
+void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[port].get());
+
+ if (!endpoints_[port]->Run(remote_id)) {
+ endpoints_[port]->Close();
+ endpoints_[port].reset();
+ }
+}
+
MessagePipe::~MessagePipe() {
// Owned by the dispatchers. The owning dispatchers should only release us via
// their |Close()| method, which should inform us of being closed via our
@@ -121,5 +166,37 @@ MessagePipe::~MessagePipe() {
DCHECK(!endpoints_[1].get());
}
+MojoResult MessagePipe::HandleControlMessage(unsigned port,
+ MessageInTransit* message) {
+ DCHECK(port == 0 || port == 1);
+ DCHECK(message);
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe);
+
+ MojoResult rv = MOJO_RESULT_OK;
+ switch (message->subtype()) {
+ case MessageInTransit::kSubtypeMessagePipePeerClosed: {
+ unsigned source_port = GetPeerPort(port);
+
+ base::AutoLock locker(lock_);
+ DCHECK(endpoints_[source_port].get());
+
+ endpoints_[source_port]->Close();
+ if (endpoints_[port].get())
+ endpoints_[port]->OnPeerClose();
+
+ endpoints_[source_port].reset();
+ break;
+ }
+ default:
+ LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
+ << message->subtype();
+ rv = MOJO_RESULT_UNKNOWN;
+ break;
+ }
+
+ message->Destroy();
+ return rv;
+}
+
} // namespace system
} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698