Index: mojo/system/message_pipe.cc |
diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc |
index bfb06466efedc59b6e1a20c7d12f02e0d9231841..857cdbcd2ba019221160e244eede76e8f3f2e8ea 100644 |
--- a/mojo/system/message_pipe.cc |
+++ b/mojo/system/message_pipe.cc |
@@ -6,22 +6,15 @@ |
#include "base/logging.h" |
#include "base/stl_util.h" |
+#include "mojo/system/channel.h" |
#include "mojo/system/local_message_pipe_endpoint.h" |
#include "mojo/system/message_in_transit.h" |
#include "mojo/system/message_pipe_endpoint.h" |
+#include "mojo/system/proxy_message_pipe_endpoint.h" |
namespace mojo { |
namespace system { |
-namespace { |
- |
-unsigned DestinationPortFromSourcePort(unsigned port) { |
- DCHECK(port == 0 || port == 1); |
- return port ^ 1; |
-} |
- |
-} // namespace |
- |
MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0, |
scoped_ptr<MessagePipeEndpoint> endpoint_1) { |
endpoints_[0].reset(endpoint_0.release()); |
@@ -33,6 +26,12 @@ MessagePipe::MessagePipe() { |
endpoints_[1].reset(new LocalMessagePipeEndpoint()); |
} |
+// static |
+unsigned MessagePipe::GetPeerPort(unsigned port) { |
+ DCHECK(port == 0 || port == 1); |
+ return port ^ 1; |
+} |
+ |
void MessagePipe::CancelAllWaiters(unsigned port) { |
DCHECK(port == 0 || port == 1); |
@@ -44,38 +43,36 @@ void MessagePipe::CancelAllWaiters(unsigned port) { |
void MessagePipe::Close(unsigned port) { |
DCHECK(port == 0 || port == 1); |
- unsigned destination_port = DestinationPortFromSourcePort(port); |
+ unsigned destination_port = GetPeerPort(port); |
base::AutoLock locker(lock_); |
DCHECK(endpoints_[port].get()); |
endpoints_[port]->Close(); |
- if (endpoints_[destination_port].get()) |
- endpoints_[destination_port]->OnPeerClose(); |
+ bool should_destroy_destination = endpoints_[destination_port].get() ? |
+ !endpoints_[destination_port]->OnPeerClose() : false; |
endpoints_[port].reset(); |
+ if (should_destroy_destination) { |
+ endpoints_[destination_port]->Close(); |
+ endpoints_[destination_port].reset(); |
+ } |
} |
+// TODO(vtl): Support sending handles. |
// TODO(vtl): Handle flags. |
MojoResult MessagePipe::WriteMessage( |
unsigned port, |
const void* bytes, uint32_t num_bytes, |
- const MojoHandle* handles, uint32_t num_handles, |
+ const MojoHandle* /*handles*/, uint32_t /*num_handles*/, |
MojoWriteMessageFlags flags) { |
DCHECK(port == 0 || port == 1); |
- |
- unsigned destination_port = DestinationPortFromSourcePort(port); |
- |
- base::AutoLock locker(lock_); |
- DCHECK(endpoints_[port].get()); |
- |
- // The destination port need not be open, unlike the source port. |
- if (!endpoints_[destination_port].get()) |
- return MOJO_RESULT_FAILED_PRECONDITION; |
- |
- return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes, |
- handles, num_handles, |
- flags); |
+ return EnqueueMessage( |
+ GetPeerPort(port), |
+ MessageInTransit::Create( |
+ MessageInTransit::kTypeMessagePipeEndpoint, |
+ MessageInTransit::kSubtypeMessagePipeEndpointData, |
+ bytes, num_bytes)); |
} |
MojoResult MessagePipe::ReadMessage(unsigned port, |
@@ -113,6 +110,54 @@ void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) { |
endpoints_[port]->RemoveWaiter(waiter); |
} |
+MojoResult MessagePipe::EnqueueMessage(unsigned port, |
+ MessageInTransit* message) { |
+ DCHECK(port == 0 || port == 1); |
+ DCHECK(message); |
+ |
+ if (message->type() == MessageInTransit::kTypeMessagePipe) |
+ return HandleControlMessage(port, message); |
+ |
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(endpoints_[GetPeerPort(port)].get()); |
+ |
+ // The destination port need not be open, unlike the source port. |
+ if (!endpoints_[port].get()) { |
+ message->Destroy(); |
+ return MOJO_RESULT_FAILED_PRECONDITION; |
+ } |
+ |
+ return endpoints_[port]->EnqueueMessage(message); |
+} |
+ |
+void MessagePipe::Attach(unsigned port, |
+ scoped_refptr<Channel> channel, |
+ MessageInTransit::EndpointId local_id) { |
+ DCHECK(port == 0 || port == 1); |
+ DCHECK(channel.get()); |
+ DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(endpoints_[port].get()); |
+ |
+ endpoints_[port]->Attach(channel, local_id); |
+} |
+ |
+void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) { |
+ DCHECK(port == 0 || port == 1); |
+ DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(endpoints_[port].get()); |
+ |
+ if (!endpoints_[port]->Run(remote_id)) { |
+ endpoints_[port]->Close(); |
+ endpoints_[port].reset(); |
+ } |
+} |
+ |
MessagePipe::~MessagePipe() { |
// Owned by the dispatchers. The owning dispatchers should only release us via |
// their |Close()| method, which should inform us of being closed via our |
@@ -121,5 +166,37 @@ MessagePipe::~MessagePipe() { |
DCHECK(!endpoints_[1].get()); |
} |
+MojoResult MessagePipe::HandleControlMessage(unsigned port, |
+ MessageInTransit* message) { |
+ DCHECK(port == 0 || port == 1); |
+ DCHECK(message); |
+ DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe); |
+ |
+ MojoResult rv = MOJO_RESULT_OK; |
+ switch (message->subtype()) { |
+ case MessageInTransit::kSubtypeMessagePipePeerClosed: { |
+ unsigned source_port = GetPeerPort(port); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(endpoints_[source_port].get()); |
+ |
+ endpoints_[source_port]->Close(); |
+ if (endpoints_[port].get()) |
+ endpoints_[port]->OnPeerClose(); |
+ |
+ endpoints_[source_port].reset(); |
+ break; |
+ } |
+ default: |
+ LOG(WARNING) << "Unrecognized MessagePipe control message subtype " |
+ << message->subtype(); |
+ rv = MOJO_RESULT_UNKNOWN; |
+ break; |
+ } |
+ |
+ message->Destroy(); |
+ return rv; |
+} |
+ |
} // namespace system |
} // namespace mojo |