| Index: mojo/system/message_pipe.cc
|
| diff --git a/mojo/system/message_pipe.cc b/mojo/system/message_pipe.cc
|
| index bfb06466efedc59b6e1a20c7d12f02e0d9231841..857cdbcd2ba019221160e244eede76e8f3f2e8ea 100644
|
| --- a/mojo/system/message_pipe.cc
|
| +++ b/mojo/system/message_pipe.cc
|
| @@ -6,22 +6,15 @@
|
|
|
| #include "base/logging.h"
|
| #include "base/stl_util.h"
|
| +#include "mojo/system/channel.h"
|
| #include "mojo/system/local_message_pipe_endpoint.h"
|
| #include "mojo/system/message_in_transit.h"
|
| #include "mojo/system/message_pipe_endpoint.h"
|
| +#include "mojo/system/proxy_message_pipe_endpoint.h"
|
|
|
| namespace mojo {
|
| namespace system {
|
|
|
| -namespace {
|
| -
|
| -unsigned DestinationPortFromSourcePort(unsigned port) {
|
| - DCHECK(port == 0 || port == 1);
|
| - return port ^ 1;
|
| -}
|
| -
|
| -} // namespace
|
| -
|
| MessagePipe::MessagePipe(scoped_ptr<MessagePipeEndpoint> endpoint_0,
|
| scoped_ptr<MessagePipeEndpoint> endpoint_1) {
|
| endpoints_[0].reset(endpoint_0.release());
|
| @@ -33,6 +26,12 @@ MessagePipe::MessagePipe() {
|
| endpoints_[1].reset(new LocalMessagePipeEndpoint());
|
| }
|
|
|
| +// static
|
| +unsigned MessagePipe::GetPeerPort(unsigned port) {
|
| + DCHECK(port == 0 || port == 1);
|
| + return port ^ 1;
|
| +}
|
| +
|
| void MessagePipe::CancelAllWaiters(unsigned port) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| @@ -44,38 +43,36 @@ void MessagePipe::CancelAllWaiters(unsigned port) {
|
| void MessagePipe::Close(unsigned port) {
|
| DCHECK(port == 0 || port == 1);
|
|
|
| - unsigned destination_port = DestinationPortFromSourcePort(port);
|
| + unsigned destination_port = GetPeerPort(port);
|
|
|
| base::AutoLock locker(lock_);
|
| DCHECK(endpoints_[port].get());
|
|
|
| endpoints_[port]->Close();
|
| - if (endpoints_[destination_port].get())
|
| - endpoints_[destination_port]->OnPeerClose();
|
| + bool should_destroy_destination = endpoints_[destination_port].get() ?
|
| + !endpoints_[destination_port]->OnPeerClose() : false;
|
|
|
| endpoints_[port].reset();
|
| + if (should_destroy_destination) {
|
| + endpoints_[destination_port]->Close();
|
| + endpoints_[destination_port].reset();
|
| + }
|
| }
|
|
|
| +// TODO(vtl): Support sending handles.
|
| // TODO(vtl): Handle flags.
|
| MojoResult MessagePipe::WriteMessage(
|
| unsigned port,
|
| const void* bytes, uint32_t num_bytes,
|
| - const MojoHandle* handles, uint32_t num_handles,
|
| + const MojoHandle* /*handles*/, uint32_t /*num_handles*/,
|
| MojoWriteMessageFlags flags) {
|
| DCHECK(port == 0 || port == 1);
|
| -
|
| - unsigned destination_port = DestinationPortFromSourcePort(port);
|
| -
|
| - base::AutoLock locker(lock_);
|
| - DCHECK(endpoints_[port].get());
|
| -
|
| - // The destination port need not be open, unlike the source port.
|
| - if (!endpoints_[destination_port].get())
|
| - return MOJO_RESULT_FAILED_PRECONDITION;
|
| -
|
| - return endpoints_[destination_port]->EnqueueMessage(bytes, num_bytes,
|
| - handles, num_handles,
|
| - flags);
|
| + return EnqueueMessage(
|
| + GetPeerPort(port),
|
| + MessageInTransit::Create(
|
| + MessageInTransit::kTypeMessagePipeEndpoint,
|
| + MessageInTransit::kSubtypeMessagePipeEndpointData,
|
| + bytes, num_bytes));
|
| }
|
|
|
| MojoResult MessagePipe::ReadMessage(unsigned port,
|
| @@ -113,6 +110,54 @@ void MessagePipe::RemoveWaiter(unsigned port, Waiter* waiter) {
|
| endpoints_[port]->RemoveWaiter(waiter);
|
| }
|
|
|
| +MojoResult MessagePipe::EnqueueMessage(unsigned port,
|
| + MessageInTransit* message) {
|
| + DCHECK(port == 0 || port == 1);
|
| + DCHECK(message);
|
| +
|
| + if (message->type() == MessageInTransit::kTypeMessagePipe)
|
| + return HandleControlMessage(port, message);
|
| +
|
| + DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipeEndpoint);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(endpoints_[GetPeerPort(port)].get());
|
| +
|
| + // The destination port need not be open, unlike the source port.
|
| + if (!endpoints_[port].get()) {
|
| + message->Destroy();
|
| + return MOJO_RESULT_FAILED_PRECONDITION;
|
| + }
|
| +
|
| + return endpoints_[port]->EnqueueMessage(message);
|
| +}
|
| +
|
| +void MessagePipe::Attach(unsigned port,
|
| + scoped_refptr<Channel> channel,
|
| + MessageInTransit::EndpointId local_id) {
|
| + DCHECK(port == 0 || port == 1);
|
| + DCHECK(channel.get());
|
| + DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(endpoints_[port].get());
|
| +
|
| + endpoints_[port]->Attach(channel, local_id);
|
| +}
|
| +
|
| +void MessagePipe::Run(unsigned port, MessageInTransit::EndpointId remote_id) {
|
| + DCHECK(port == 0 || port == 1);
|
| + DCHECK_NE(remote_id, MessageInTransit::kInvalidEndpointId);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(endpoints_[port].get());
|
| +
|
| + if (!endpoints_[port]->Run(remote_id)) {
|
| + endpoints_[port]->Close();
|
| + endpoints_[port].reset();
|
| + }
|
| +}
|
| +
|
| MessagePipe::~MessagePipe() {
|
| // Owned by the dispatchers. The owning dispatchers should only release us via
|
| // their |Close()| method, which should inform us of being closed via our
|
| @@ -121,5 +166,37 @@ MessagePipe::~MessagePipe() {
|
| DCHECK(!endpoints_[1].get());
|
| }
|
|
|
| +MojoResult MessagePipe::HandleControlMessage(unsigned port,
|
| + MessageInTransit* message) {
|
| + DCHECK(port == 0 || port == 1);
|
| + DCHECK(message);
|
| + DCHECK_EQ(message->type(), MessageInTransit::kTypeMessagePipe);
|
| +
|
| + MojoResult rv = MOJO_RESULT_OK;
|
| + switch (message->subtype()) {
|
| + case MessageInTransit::kSubtypeMessagePipePeerClosed: {
|
| + unsigned source_port = GetPeerPort(port);
|
| +
|
| + base::AutoLock locker(lock_);
|
| + DCHECK(endpoints_[source_port].get());
|
| +
|
| + endpoints_[source_port]->Close();
|
| + if (endpoints_[port].get())
|
| + endpoints_[port]->OnPeerClose();
|
| +
|
| + endpoints_[source_port].reset();
|
| + break;
|
| + }
|
| + default:
|
| + LOG(WARNING) << "Unrecognized MessagePipe control message subtype "
|
| + << message->subtype();
|
| + rv = MOJO_RESULT_UNKNOWN;
|
| + break;
|
| + }
|
| +
|
| + message->Destroy();
|
| + return rv;
|
| +}
|
| +
|
| } // namespace system
|
| } // namespace mojo
|
|
|