Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(617)

Unified Diff: mojo/edk/system/message_pipe.cc

Issue 782693004: Update mojo sdk to rev f6c8ec07c01deebc13178d516225fd12695c3dc2 (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: hack mojo_system_impl gypi for android :| Created 6 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/message_pipe.cc
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
index ee5036fba2320e299d1e46b29c5016bc82a806ac..3e243b648419e5c25a7e74ecf69398e4db5aac82 100644
--- a/mojo/edk/system/message_pipe.cc
+++ b/mojo/edk/system/message_pipe.cc
@@ -8,6 +8,7 @@
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
+#include "mojo/edk/system/endpoint_relayer.h"
#include "mojo/edk/system/local_message_pipe_endpoint.h"
#include "mojo/edk/system/message_in_transit.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
@@ -41,7 +42,7 @@ MessagePipe* MessagePipe::CreateLocalLocal() {
// static
MessagePipe* MessagePipe::CreateLocalProxy(
scoped_refptr<ChannelEndpoint>* channel_endpoint) {
- DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
+ DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
MessagePipe* message_pipe = new MessagePipe();
message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
*channel_endpoint = new ChannelEndpoint(message_pipe, 1);
@@ -53,7 +54,7 @@ MessagePipe* MessagePipe::CreateLocalProxy(
// static
MessagePipe* MessagePipe::CreateProxyLocal(
scoped_refptr<ChannelEndpoint>* channel_endpoint) {
- DCHECK(!channel_endpoint->get()); // Not technically wrong, but unlikely.
+ DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
MessagePipe* message_pipe = new MessagePipe();
*channel_endpoint = new ChannelEndpoint(message_pipe, 0);
message_pipe->endpoints_[0].reset(
@@ -74,7 +75,7 @@ bool MessagePipe::Deserialize(Channel* channel,
size_t size,
scoped_refptr<MessagePipe>* message_pipe,
unsigned* port) {
- DCHECK(!message_pipe->get()); // Not technically wrong, but unlikely.
+ DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
if (size != sizeof(SerializedMessagePipe)) {
LOG(ERROR) << "Invalid serialized message pipe";
@@ -84,7 +85,7 @@ bool MessagePipe::Deserialize(Channel* channel,
const SerializedMessagePipe* s =
static_cast<const SerializedMessagePipe*>(source);
*message_pipe = channel->PassIncomingMessagePipe(s->receiver_endpoint_id);
- if (!message_pipe->get()) {
+ if (!*message_pipe) {
LOG(ERROR) << "Failed to deserialize message pipe (ID = "
<< s->receiver_endpoint_id << ")";
return false;
@@ -104,18 +105,18 @@ MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
return endpoints_[port]->GetType();
}
-void MessagePipe::CancelAllWaiters(unsigned port) {
+void MessagePipe::CancelAllAwakables(unsigned port) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
DCHECK(endpoints_[port]);
- endpoints_[port]->CancelAllWaiters();
+ endpoints_[port]->CancelAllAwakables();
}
void MessagePipe::Close(unsigned port) {
DCHECK(port == 0 || port == 1);
- unsigned destination_port = GetPeerPort(port);
+ unsigned peer_port = GetPeerPort(port);
base::AutoLock locker(lock_);
// The endpoint's |OnPeerClose()| may have been called first and returned
@@ -124,9 +125,9 @@ void MessagePipe::Close(unsigned port) {
return;
endpoints_[port]->Close();
- if (endpoints_[destination_port]) {
- if (!endpoints_[destination_port]->OnPeerClose())
- endpoints_[destination_port].reset();
+ if (endpoints_[peer_port]) {
+ if (!endpoints_[peer_port]->OnPeerClose())
+ endpoints_[peer_port].reset();
}
endpoints_[port].reset();
}
@@ -139,7 +140,9 @@ MojoResult MessagePipe::WriteMessage(
std::vector<DispatcherTransport>* transports,
MojoWriteMessageFlags flags) {
DCHECK(port == 0 || port == 1);
- return EnqueueMessage(
+
+ base::AutoLock locker(lock_);
+ return EnqueueMessageNoLock(
GetPeerPort(port),
make_scoped_ptr(new MessageInTransit(
MessageInTransit::kTypeEndpoint,
@@ -171,28 +174,29 @@ HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
return endpoints_[port]->GetHandleSignalsState();
}
-MojoResult MessagePipe::AddWaiter(unsigned port,
- Waiter* waiter,
- MojoHandleSignals signals,
- uint32_t context,
- HandleSignalsState* signals_state) {
+MojoResult MessagePipe::AddAwakable(unsigned port,
+ Awakable* awakable,
+ MojoHandleSignals signals,
+ uint32_t context,
+ HandleSignalsState* signals_state) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
DCHECK(endpoints_[port]);
- return endpoints_[port]->AddWaiter(waiter, signals, context, signals_state);
+ return endpoints_[port]->AddAwakable(awakable, signals, context,
+ signals_state);
}
-void MessagePipe::RemoveWaiter(unsigned port,
- Waiter* waiter,
- HandleSignalsState* signals_state) {
+void MessagePipe::RemoveAwakable(unsigned port,
+ Awakable* awakable,
+ HandleSignalsState* signals_state) {
DCHECK(port == 0 || port == 1);
base::AutoLock locker(lock_);
DCHECK(endpoints_[port]);
- endpoints_[port]->RemoveWaiter(waiter, signals_state);
+ endpoints_[port]->RemoveAwakable(awakable, signals_state);
}
void MessagePipe::StartSerialize(unsigned /*port*/,
@@ -243,16 +247,17 @@ bool MessagePipe::EndSerialize(
// We also pass its |ChannelEndpoint| to the channel, which then decides
// what to do. We have no reason to continue to exist.
//
- // TODO(vtl): Factor some of this out to |ChannelEndpoint|.
+ // TODO(vtl): Factor some of this out to |ChannelEndpoint| (or |Channel|).
- if (!endpoints_[GetPeerPort(port)]) {
+ unsigned peer_port = GetPeerPort(port);
+ if (!endpoints_[peer_port]) {
// Case 1.
channel_endpoint = new ChannelEndpoint(
nullptr, 0, static_cast<LocalMessagePipeEndpoint*>(
endpoints_[port].get())->message_queue());
endpoints_[port]->Close();
endpoints_[port].reset();
- } else if (endpoints_[GetPeerPort(port)]->GetType() ==
+ } else if (endpoints_[peer_port]->GetType() ==
MessagePipeEndpoint::kTypeLocal) {
// Case 2.
channel_endpoint = new ChannelEndpoint(
@@ -263,15 +268,44 @@ bool MessagePipe::EndSerialize(
new ProxyMessagePipeEndpoint(channel_endpoint.get()));
} else {
// Case 3.
- // TODO(vtl): Temporarily the same as case 2.
DLOG(WARNING) << "Direct message pipe passing across multiple channels "
"not yet implemented; will proxy";
+
+ // Create an |EndpointRelayer| to replace ourselves (rather than having a
+ // |MessagePipe| object that exists solely to relay messages between two
+ // |ChannelEndpoint|s, owned by the |Channel| through them.
+ //
+ // This reduces overhead somewhat, and more importantly restores some
+ // invariants, e.g., that |MessagePipe|s are owned by dispatchers.
+ //
+ // TODO(vtl): If we get the |Channel| to own/track the relayer directly,
+ // then possibly we could make |ChannelEndpoint|'s |client_| pointer a raw
+ // pointer (and not have the |Channel| owning the relayer via its
+ // |ChannelEndpoint|s.
+ //
+ // TODO(vtl): This is not obviously the right place for (all of) this
+ // logic, nor is it obviously factored correctly.
+
+ DCHECK_EQ(endpoints_[peer_port]->GetType(),
+ MessagePipeEndpoint::kTypeProxy);
+ ProxyMessagePipeEndpoint* peer_endpoint =
+ static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
+ scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
+ peer_endpoint->ReleaseChannelEndpoint();
+
+ scoped_refptr<EndpointRelayer> relayer(new EndpointRelayer());
+ // We'll assign our peer port's endpoint to the relayer's port 1, and this
+ // port's endpoint to the relayer's port 0.
channel_endpoint = new ChannelEndpoint(
- this, port, static_cast<LocalMessagePipeEndpoint*>(
- endpoints_[port].get())->message_queue());
+ relayer.get(), 0, static_cast<LocalMessagePipeEndpoint*>(
+ endpoints_[port].get())->message_queue());
+ relayer->Init(channel_endpoint.get(), peer_channel_endpoint.get());
+ peer_channel_endpoint->ReplaceClient(relayer.get(), 1);
+
endpoints_[port]->Close();
- endpoints_[port].reset(
- new ProxyMessagePipeEndpoint(channel_endpoint.get()));
+ endpoints_[port].reset();
+ // No need to call |Close()| after |ReleaseChannelEndpoint()|.
+ endpoints_[peer_port].reset();
}
}
@@ -287,16 +321,27 @@ bool MessagePipe::EndSerialize(
return true;
}
-void MessagePipe::OnReadMessage(unsigned port,
- scoped_ptr<MessageInTransit> message) {
+bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
+ base::AutoLock locker(lock_);
+
+ if (!endpoints_[port]) {
+ // This will happen only on the rare occasion that the call to
+ // |OnReadMessage()| is racing with us calling
+ // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
+ // and the |ChannelEndpoint| can retry (calling the new client's
+ // |OnReadMessage()|).
+ return false;
+ }
+
// This is called when the |ChannelEndpoint| for the
// |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
// We need to pass this message on to its peer port (typically a
// |LocalMessagePipeEndpoint|).
- MojoResult result =
- EnqueueMessage(GetPeerPort(port), message.Pass(), nullptr);
+ MojoResult result = EnqueueMessageNoLock(GetPeerPort(port),
+ make_scoped_ptr(message), nullptr);
DLOG_IF(WARNING, result != MOJO_RESULT_OK)
- << "EnqueueMessage() failed (result = " << result << ")";
+ << "EnqueueMessageNoLock() failed (result = " << result << ")";
+ return true;
}
void MessagePipe::OnDetachFromChannel(unsigned port) {
@@ -314,7 +359,7 @@ MessagePipe::~MessagePipe() {
DCHECK(!endpoints_[1]);
}
-MojoResult MessagePipe::EnqueueMessage(
+MojoResult MessagePipe::EnqueueMessageNoLock(
unsigned port,
scoped_ptr<MessageInTransit> message,
std::vector<DispatcherTransport>* transports) {
@@ -322,8 +367,6 @@ MojoResult MessagePipe::EnqueueMessage(
DCHECK(message);
DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
-
- base::AutoLock locker(lock_);
DCHECK(endpoints_[GetPeerPort(port)]);
// The destination port need not be open, unlike the source port.

Powered by Google App Engine
This is Rietveld 408576698