Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1029)

Unified Diff: mojo/edk/system/message_pipe.cc

Issue 814543006: Move //mojo/{public, edk} underneath //third_party (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Rebase Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/message_pipe.cc
diff --git a/mojo/edk/system/message_pipe.cc b/mojo/edk/system/message_pipe.cc
deleted file mode 100644
index 554401e875bd2ab8103ae07931406abe6b823928..0000000000000000000000000000000000000000
--- a/mojo/edk/system/message_pipe.cc
+++ /dev/null
@@ -1,384 +0,0 @@
-// Copyright 2013 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "mojo/edk/system/message_pipe.h"
-
-#include "base/logging.h"
-#include "mojo/edk/system/channel.h"
-#include "mojo/edk/system/channel_endpoint.h"
-#include "mojo/edk/system/channel_endpoint_id.h"
-#include "mojo/edk/system/incoming_endpoint.h"
-#include "mojo/edk/system/local_message_pipe_endpoint.h"
-#include "mojo/edk/system/message_in_transit.h"
-#include "mojo/edk/system/message_pipe_dispatcher.h"
-#include "mojo/edk/system/message_pipe_endpoint.h"
-#include "mojo/edk/system/proxy_message_pipe_endpoint.h"
-
-namespace mojo {
-namespace system {
-
-// static
-MessagePipe* MessagePipe::CreateLocalLocal() {
- MessagePipe* message_pipe = new MessagePipe();
- message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
- message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
- return message_pipe;
-}
-
-// static
-MessagePipe* MessagePipe::CreateLocalProxy(
- scoped_refptr<ChannelEndpoint>* channel_endpoint) {
- DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
- MessagePipe* message_pipe = new MessagePipe();
- message_pipe->endpoints_[0].reset(new LocalMessagePipeEndpoint());
- *channel_endpoint = new ChannelEndpoint(message_pipe, 1);
- message_pipe->endpoints_[1].reset(
- new ProxyMessagePipeEndpoint(channel_endpoint->get()));
- return message_pipe;
-}
-
-// static
-MessagePipe* MessagePipe::CreateLocalProxyFromExisting(
- MessageInTransitQueue* message_queue,
- ChannelEndpoint* channel_endpoint) {
- DCHECK(message_queue);
- MessagePipe* message_pipe = new MessagePipe();
- message_pipe->endpoints_[0].reset(
- new LocalMessagePipeEndpoint(message_queue));
- if (channel_endpoint) {
- bool attached_to_channel = channel_endpoint->ReplaceClient(message_pipe, 1);
- message_pipe->endpoints_[1].reset(
- new ProxyMessagePipeEndpoint(channel_endpoint));
- if (!attached_to_channel)
- message_pipe->OnDetachFromChannel(1);
- } else {
- // This means that the proxy side was already closed; we only need to inform
- // the local side of this.
- // TODO(vtl): This is safe to do without locking (but perhaps slightly
- // dubious), since no other thread has access to |message_pipe| yet.
- message_pipe->endpoints_[0]->OnPeerClose();
- }
- return message_pipe;
-}
-
-// static
-MessagePipe* MessagePipe::CreateProxyLocal(
- scoped_refptr<ChannelEndpoint>* channel_endpoint) {
- DCHECK(!*channel_endpoint); // Not technically wrong, but unlikely.
- MessagePipe* message_pipe = new MessagePipe();
- *channel_endpoint = new ChannelEndpoint(message_pipe, 0);
- message_pipe->endpoints_[0].reset(
- new ProxyMessagePipeEndpoint(channel_endpoint->get()));
- message_pipe->endpoints_[1].reset(new LocalMessagePipeEndpoint());
- return message_pipe;
-}
-
-// static
-unsigned MessagePipe::GetPeerPort(unsigned port) {
- DCHECK(port == 0 || port == 1);
- return port ^ 1;
-}
-
-// static
-bool MessagePipe::Deserialize(Channel* channel,
- const void* source,
- size_t size,
- scoped_refptr<MessagePipe>* message_pipe,
- unsigned* port) {
- DCHECK(!*message_pipe); // Not technically wrong, but unlikely.
-
- if (size != channel->GetSerializedEndpointSize()) {
- LOG(ERROR) << "Invalid serialized message pipe";
- return false;
- }
-
- scoped_refptr<IncomingEndpoint> incoming_endpoint =
- channel->DeserializeEndpoint(source);
- if (!incoming_endpoint)
- return false;
-
- *message_pipe = incoming_endpoint->ConvertToMessagePipe();
- DCHECK(*message_pipe);
- *port = 0;
- return true;
-}
-
-MessagePipeEndpoint::Type MessagePipe::GetType(unsigned port) {
- DCHECK(port == 0 || port == 1);
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
-
- return endpoints_[port]->GetType();
-}
-
-void MessagePipe::CancelAllAwakables(unsigned port) {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
- endpoints_[port]->CancelAllAwakables();
-}
-
-void MessagePipe::Close(unsigned port) {
- DCHECK(port == 0 || port == 1);
-
- unsigned peer_port = GetPeerPort(port);
-
- base::AutoLock locker(lock_);
- // The endpoint's |OnPeerClose()| may have been called first and returned
- // false, which would have resulted in its destruction.
- if (!endpoints_[port])
- return;
-
- endpoints_[port]->Close();
- if (endpoints_[peer_port]) {
- if (!endpoints_[peer_port]->OnPeerClose())
- endpoints_[peer_port].reset();
- }
- endpoints_[port].reset();
-}
-
-// TODO(vtl): Handle flags.
-MojoResult MessagePipe::WriteMessage(
- unsigned port,
- UserPointer<const void> bytes,
- uint32_t num_bytes,
- std::vector<DispatcherTransport>* transports,
- MojoWriteMessageFlags flags) {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(lock_);
- return EnqueueMessageNoLock(
- GetPeerPort(port),
- make_scoped_ptr(new MessageInTransit(
- MessageInTransit::kTypeEndpoint,
- MessageInTransit::kSubtypeEndpointData, num_bytes, bytes)),
- transports);
-}
-
-MojoResult MessagePipe::ReadMessage(unsigned port,
- UserPointer<void> bytes,
- UserPointer<uint32_t> num_bytes,
- DispatcherVector* dispatchers,
- uint32_t* num_dispatchers,
- MojoReadMessageFlags flags) {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
-
- return endpoints_[port]->ReadMessage(bytes, num_bytes, dispatchers,
- num_dispatchers, flags);
-}
-
-HandleSignalsState MessagePipe::GetHandleSignalsState(unsigned port) const {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(const_cast<base::Lock&>(lock_));
- DCHECK(endpoints_[port]);
-
- return endpoints_[port]->GetHandleSignalsState();
-}
-
-MojoResult MessagePipe::AddAwakable(unsigned port,
- Awakable* awakable,
- MojoHandleSignals signals,
- uint32_t context,
- HandleSignalsState* signals_state) {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
-
- return endpoints_[port]->AddAwakable(awakable, signals, context,
- signals_state);
-}
-
-void MessagePipe::RemoveAwakable(unsigned port,
- Awakable* awakable,
- HandleSignalsState* signals_state) {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
-
- endpoints_[port]->RemoveAwakable(awakable, signals_state);
-}
-
-void MessagePipe::StartSerialize(unsigned /*port*/,
- Channel* channel,
- size_t* max_size,
- size_t* max_platform_handles) {
- *max_size = channel->GetSerializedEndpointSize();
- *max_platform_handles = 0;
-}
-
-bool MessagePipe::EndSerialize(
- unsigned port,
- Channel* channel,
- void* destination,
- size_t* actual_size,
- embedder::PlatformHandleVector* /*platform_handles*/) {
- DCHECK(port == 0 || port == 1);
-
- base::AutoLock locker(lock_);
- DCHECK(endpoints_[port]);
-
- // The port being serialized must be local.
- DCHECK_EQ(endpoints_[port]->GetType(), MessagePipeEndpoint::kTypeLocal);
-
- unsigned peer_port = GetPeerPort(port);
- MessageInTransitQueue* message_queue =
- static_cast<LocalMessagePipeEndpoint*>(endpoints_[port].get())
- ->message_queue();
- // The replacement for |endpoints_[port]|, if any.
- MessagePipeEndpoint* replacement_endpoint = nullptr;
-
- // The three cases below correspond to the ones described above
- // |Channel::SerializeEndpoint...()| (in channel.h).
- if (!endpoints_[peer_port]) {
- // Case 1: (known-)closed peer port. There's no reason for us to continue to
- // exist afterwards.
- channel->SerializeEndpointWithClosedPeer(destination, message_queue);
- } else if (endpoints_[peer_port]->GetType() ==
- MessagePipeEndpoint::kTypeLocal) {
- // Case 2: local peer port. We replace |port|'s |LocalMessagePipeEndpoint|
- // with a |ProxyMessagePipeEndpoint| hooked up to the |ChannelEndpoint| that
- // the |Channel| returns to us.
- scoped_refptr<ChannelEndpoint> channel_endpoint =
- channel->SerializeEndpointWithLocalPeer(destination, message_queue,
- this, port);
- replacement_endpoint = new ProxyMessagePipeEndpoint(channel_endpoint.get());
- } else {
- // Case 3: remote peer port. We get the |peer_port|'s |ChannelEndpoint| and
- // pass it to the |Channel|. There's no reason for us to continue to exist
- // afterwards.
- DCHECK_EQ(endpoints_[peer_port]->GetType(),
- MessagePipeEndpoint::kTypeProxy);
- ProxyMessagePipeEndpoint* peer_endpoint =
- static_cast<ProxyMessagePipeEndpoint*>(endpoints_[peer_port].get());
- scoped_refptr<ChannelEndpoint> peer_channel_endpoint =
- peer_endpoint->ReleaseChannelEndpoint();
- channel->SerializeEndpointWithRemotePeer(destination, message_queue,
- peer_channel_endpoint);
- // No need to call |Close()| after |ReleaseChannelEndpoint()|.
- endpoints_[peer_port].reset();
- }
-
- endpoints_[port]->Close();
- endpoints_[port].reset(replacement_endpoint);
-
- *actual_size = channel->GetSerializedEndpointSize();
- return true;
-}
-
-bool MessagePipe::OnReadMessage(unsigned port, MessageInTransit* message) {
- base::AutoLock locker(lock_);
-
- if (!endpoints_[port]) {
- // This will happen only on the rare occasion that the call to
- // |OnReadMessage()| is racing with us calling
- // |ChannelEndpoint::ReplaceClient()|, in which case we reject the message,
- // and the |ChannelEndpoint| can retry (calling the new client's
- // |OnReadMessage()|).
- return false;
- }
-
- // This is called when the |ChannelEndpoint| for the
- // |ProxyMessagePipeEndpoint| |port| receives a message (from the |Channel|).
- // We need to pass this message on to its peer port (typically a
- // |LocalMessagePipeEndpoint|).
- MojoResult result = EnqueueMessageNoLock(GetPeerPort(port),
- make_scoped_ptr(message), nullptr);
- DLOG_IF(WARNING, result != MOJO_RESULT_OK)
- << "EnqueueMessageNoLock() failed (result = " << result << ")";
- return true;
-}
-
-void MessagePipe::OnDetachFromChannel(unsigned port) {
- Close(port);
-}
-
-MessagePipe::MessagePipe() {
-}
-
-MessagePipe::~MessagePipe() {
- // Owned by the dispatchers. The owning dispatchers should only release us via
- // their |Close()| method, which should inform us of being closed via our
- // |Close()|. Thus these should already be null.
- DCHECK(!endpoints_[0]);
- DCHECK(!endpoints_[1]);
-}
-
-MojoResult MessagePipe::EnqueueMessageNoLock(
- unsigned port,
- scoped_ptr<MessageInTransit> message,
- std::vector<DispatcherTransport>* transports) {
- DCHECK(port == 0 || port == 1);
- DCHECK(message);
-
- DCHECK_EQ(message->type(), MessageInTransit::kTypeEndpoint);
- DCHECK(endpoints_[GetPeerPort(port)]);
-
- // The destination port need not be open, unlike the source port.
- if (!endpoints_[port])
- return MOJO_RESULT_FAILED_PRECONDITION;
-
- if (transports) {
- MojoResult result = AttachTransportsNoLock(port, message.get(), transports);
- if (result != MOJO_RESULT_OK)
- return result;
- }
-
- // The endpoint's |EnqueueMessage()| may not report failure.
- endpoints_[port]->EnqueueMessage(message.Pass());
- return MOJO_RESULT_OK;
-}
-
-MojoResult MessagePipe::AttachTransportsNoLock(
- unsigned port,
- MessageInTransit* message,
- std::vector<DispatcherTransport>* transports) {
- DCHECK(!message->has_dispatchers());
-
- // You're not allowed to send either handle to a message pipe over the message
- // pipe, so check for this. (The case of trying to write a handle to itself is
- // taken care of by |Core|. That case kind of makes sense, but leads to
- // complications if, e.g., both sides try to do the same thing with their
- // respective handles simultaneously. The other case, of trying to write the
- // peer handle to a handle, doesn't make sense -- since no handle will be
- // available to read the message from.)
- for (size_t i = 0; i < transports->size(); i++) {
- if (!(*transports)[i].is_valid())
- continue;
- if ((*transports)[i].GetType() == Dispatcher::kTypeMessagePipe) {
- MessagePipeDispatcherTransport mp_transport((*transports)[i]);
- if (mp_transport.GetMessagePipe() == this) {
- // The other case should have been disallowed by |Core|. (Note: |port|
- // is the peer port of the handle given to |WriteMessage()|.)
- DCHECK_EQ(mp_transport.GetPort(), port);
- return MOJO_RESULT_INVALID_ARGUMENT;
- }
- }
- }
-
- // Clone the dispatchers and attach them to the message. (This must be done as
- // a separate loop, since we want to leave the dispatchers alone on failure.)
- scoped_ptr<DispatcherVector> dispatchers(new DispatcherVector());
- dispatchers->reserve(transports->size());
- for (size_t i = 0; i < transports->size(); i++) {
- if ((*transports)[i].is_valid()) {
- dispatchers->push_back(
- (*transports)[i].CreateEquivalentDispatcherAndClose());
- } else {
- LOG(WARNING) << "Enqueueing null dispatcher";
- dispatchers->push_back(nullptr);
- }
- }
- message->SetDispatchers(dispatchers.Pass());
- return MOJO_RESULT_OK;
-}
-
-} // namespace system
-} // namespace mojo
« no previous file with comments | « mojo/edk/system/message_pipe.h ('k') | mojo/edk/system/message_pipe_dispatcher.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698