Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(935)

Unified Diff: mojo/edk/system/routed_raw_channel.cc

Issue 1488853002: Add multiplexing of message pipes in the new EDK. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: tsepez review comments Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/edk/system/routed_raw_channel.h ('k') | mojo/edk/system/run_all_unittests.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/edk/system/routed_raw_channel.cc
diff --git a/mojo/edk/system/routed_raw_channel.cc b/mojo/edk/system/routed_raw_channel.cc
new file mode 100644
index 0000000000000000000000000000000000000000..e2fe67e46e81802d433245d8ac1f10da5e28267a
--- /dev/null
+++ b/mojo/edk/system/routed_raw_channel.cc
@@ -0,0 +1,168 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/routed_raw_channel.h"
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "mojo/edk/embedder/embedder_internal.h"
+#include "mojo/edk/system/message_pipe_dispatcher.h"
+
+namespace mojo {
+namespace edk {
+
+namespace {
+const uint64_t kInternalRoutingId = 0;
+
+// These are messages sent over our internal routing id above, meant for the
+// other side's RoutedRawChannel to dispatch.
+enum InternalMessages {
+ ROUTE_CLOSED = 0,
+};
+}
+
+RoutedRawChannel::PendingMessage::PendingMessage() {
+}
+
+RoutedRawChannel::PendingMessage::~PendingMessage() {
+}
+
+RoutedRawChannel::RoutedRawChannel(
+ ScopedPlatformHandle handle,
+ const base::Callback<void(RoutedRawChannel*)>& destruct_callback)
+ : channel_(RawChannel::Create(handle.Pass())),
+ destruct_callback_(destruct_callback) {
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::Init, base::Unretained(channel_), this));
+ internal::g_io_thread_task_runner->PostTask(
+ FROM_HERE,
+ base::Bind(&RawChannel::EnsureLazyInitialized,
+ base::Unretained(channel_)));
+}
+
+void RoutedRawChannel::AddRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe) {
+ CHECK_NE(pipe_id, kInternalRoutingId) << kInternalRoutingId << " is reserved";
+ base::AutoLock auto_lock(lock_);
+ CHECK(routes_.find(pipe_id) == routes_.end());
+ routes_[pipe_id] = pipe;
+
+ for (size_t i = 0; i < pending_messages_.size();) {
+ MessageInTransit::View view(pending_messages_[i]->message.size(),
+ &pending_messages_[i]->message[0]);
+ if (view.route_id() == pipe_id) {
+ pipe->OnReadMessage(view, pending_messages_[i]->handles.Pass());
+ pending_messages_.erase(pending_messages_.begin() + i);
+ } else {
+ ++i;
+ }
+ }
+
+ if (close_routes_.find(pipe_id) != close_routes_.end())
+ pipe->OnError(ERROR_READ_SHUTDOWN);
+}
+
+void RoutedRawChannel::RemoveRoute(uint64_t pipe_id,
+ MessagePipeDispatcher* pipe) {
+ base::AutoLock auto_lock(lock_);
+ CHECK(routes_.find(pipe_id) != routes_.end());
+ CHECK_EQ(routes_[pipe_id], pipe);
+ routes_.erase(pipe_id);
+
+ // Only send a message to the other side to close the route if we hadn't
+ // received a close route message. Otherwise they would keep going back and
+ // forth.
+ if (close_routes_.find(pipe_id) != close_routes_.end()) {
+ close_routes_.erase(pipe_id);
+ } else if (channel_) {
+ // Default route id of 0 to reach the other side's RoutedRawChannel.
+ char message_data[sizeof(char) + sizeof(uint64_t)];
+ message_data[0] = ROUTE_CLOSED;
+ memcpy(&message_data[1], &pipe_id, sizeof(uint64_t));
+ scoped_ptr<MessageInTransit> message(new MessageInTransit(
+ MessageInTransit::Type::MESSAGE, arraysize(message_data),
+ message_data));
+ message->set_route_id(kInternalRoutingId);
+ channel_->WriteMessage(message.Pass());
+ }
+
+ if (!channel_ && routes_.empty()) {
+ // PostTask to avoid reentrancy since the broker might be calling us.
+ base::MessageLoop::current()->DeleteSoon(FROM_HERE, this);
+ }
+}
+
+RoutedRawChannel::~RoutedRawChannel() {
+ destruct_callback_.Run(this);
+}
+
+void RoutedRawChannel::OnReadMessage(
+ const MessageInTransit::View& message_view,
+ ScopedPlatformHandleVectorPtr platform_handles) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ // Note: normally, when a message arrives here we should find a corresponding
+ // entry for the MessagePipeDispatcher with the given route_id. However it is
+ // possible that they just connected, and due to race conditions one side has
+ // connected and sent a message (and even closed) before the other side had a
+ // chance to register with this RoutedRawChannel. In that case, we must buffer
+ // all messages.
+ base::AutoLock auto_lock(lock_);
+ uint64_t route_id = message_view.route_id();
+ if (route_id == kInternalRoutingId) {
+ if (message_view.num_bytes() != sizeof(char) + sizeof(uint64_t)) {
+ NOTREACHED() << "Invalid internal message in RoutedRawChannel.";
+ return;
+ }
+ const char* bytes = static_cast<const char*>(message_view.bytes());
+ if (bytes[0] != ROUTE_CLOSED) {
+ NOTREACHED() << "Unknown internal message in RoutedRawChannel.";
+ return;
+ }
+ uint64_t closed_route = *reinterpret_cast<const uint64_t*>(&bytes[1]);
+ if (close_routes_.find(closed_route) != close_routes_.end()) {
+ NOTREACHED() << "Should only receive one ROUTE_CLOSED per route.";
+ return;
+ }
+ close_routes_.insert(closed_route);
+ if (routes_.find(closed_route) == routes_.end())
+ return; // This side hasn't connected yet.
+
+ routes_[closed_route]->OnError(ERROR_READ_SHUTDOWN);
+ return;
+ }
+
+ if (routes_.find(route_id) != routes_.end()) {
+ routes_[route_id]->OnReadMessage(message_view, platform_handles.Pass());
+ } else {
+ scoped_ptr<PendingMessage> msg(new PendingMessage);
+ msg->message.resize(message_view.total_size());
+ memcpy(&msg->message[0], message_view.main_buffer(),
+ message_view.total_size());
+ msg->handles = platform_handles.Pass();
+ pending_messages_.push_back(msg.Pass());
+ }
+}
+
+void RoutedRawChannel::OnError(Error error) {
+ DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread());
+ bool destruct = false;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ channel_->Shutdown();
+ channel_ = nullptr;
+ if (routes_.empty()) {
+ destruct = true;
+ } else {
+ for (auto it = routes_.begin(); it != routes_.end(); ++it)
+ it->second->OnError(error);
+ }
+ }
+
+ if (destruct)
+ delete this;
+}
+
+} // namespace edk
+} // namespace mojo
« no previous file with comments | « mojo/edk/system/routed_raw_channel.h ('k') | mojo/edk/system/run_all_unittests.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698