Chromium Code Reviews| Index: mojo/edk/system/routed_raw_channel.cc |
| diff --git a/mojo/edk/system/routed_raw_channel.cc b/mojo/edk/system/routed_raw_channel.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8e8e394649f57d9738d1c4bb3a4d766384d3bcd6 |
| --- /dev/null |
| +++ b/mojo/edk/system/routed_raw_channel.cc |
| @@ -0,0 +1,162 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "mojo/edk/system/routed_raw_channel.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/logging.h" |
| +#include "mojo/edk/embedder/embedder_internal.h" |
| +#include "mojo/edk/system/message_pipe_dispatcher.h" |
| + |
| +namespace mojo { |
| +namespace edk { |
| + |
| +namespace { |
| +const uint64_t kInternalRoutingId = 0; |
| + |
| +// These are messages sent over our internal routing id above, meant for the |
| +// other side's RoutedRawChannel to dispatch. |
| +enum InternalMessages { |
| + ROUTE_CLOSED = 0, |
| +}; |
| +} |
| + |
| +RoutedRawChannel::PendingMessage::PendingMessage() { |
| +} |
| + |
| +RoutedRawChannel::PendingMessage::~PendingMessage() { |
| +} |
| + |
| +RoutedRawChannel::RoutedRawChannel( |
| + ScopedPlatformHandle handle, |
| + const base::Callback<void(RoutedRawChannel*)>& destruct_callback) |
| + : channel_(RawChannel::Create(handle.Pass())), |
| + destruct_callback_(destruct_callback) { |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&RawChannel::Init, base::Unretained(channel_), this)); |
| + internal::g_io_thread_task_runner->PostTask( |
| + FROM_HERE, |
| + base::Bind(&RawChannel::EnsureLazyInitialized, |
| + base::Unretained(channel_))); |
| +} |
| + |
| +void RoutedRawChannel::AddRoute(uint64_t pipe_id, MessagePipeDispatcher* pipe) { |
| + CHECK_NE(pipe_id, kInternalRoutingId) << kInternalRoutingId << " is reserved"; |
| + base::AutoLock auto_lock(lock_); |
| + CHECK(routes_.find(pipe_id) == routes_.end()); |
| + routes_[pipe_id] = pipe; |
| + |
| + for (size_t i = 0; i < pending_messages_.size();) { |
| + MessageInTransit::View view(pending_messages_[i]->message.size(), |
| + &pending_messages_[i]->message[0]); |
| + if (view.route_id() == pipe_id) { |
| + pipe->OnReadMessage(view, pending_messages_[i]->handles.Pass()); |
| + pending_messages_.erase(pending_messages_.begin() + i); |
| + } else { |
| + ++i; |
| + } |
| + } |
| + |
| + if (close_routes_.find(pipe_id) != close_routes_.end()) |
| + pipe->OnError(ERROR_READ_SHUTDOWN); |
| +} |
| + |
| +void RoutedRawChannel::RemoveRoute(uint64_t pipe_id, |
| + MessagePipeDispatcher* pipe) { |
| + base::AutoLock auto_lock(lock_); |
| + CHECK(routes_.find(pipe_id) != routes_.end()); |
| + CHECK_EQ(routes_[pipe_id], pipe); |
| + routes_.erase(pipe_id); |
| + |
| + // Only send a message to the other side to close the route if we hadn't |
| + // received a close route message. Otherwise they would keep going back and |
| + // forth. |
| + if (close_routes_.find(pipe_id) != close_routes_.end()) { |
| + close_routes_.erase(pipe_id); |
| + } else if (channel_) { |
| + // Default route id of 0 to reach the other side's RoutedRawChannel. |
| + char message_data[sizeof(char) + sizeof(uint64_t)]; |
| + message_data[0] = ROUTE_CLOSED; |
| + memcpy(&message_data[1], &pipe_id, sizeof(uint64_t)); |
| + scoped_ptr<MessageInTransit> message(new MessageInTransit( |
| + MessageInTransit::Type::MESSAGE, arraysize(message_data), |
| + message_data)); |
| + message->set_route_id(kInternalRoutingId); |
| + channel_->WriteMessage(message.Pass()); |
| + } |
| + |
| + if (!channel_ && routes_.empty()) { |
| + // PostTask to avoid reentrancy since the broker might be calling us. |
| + base::MessageLoop::current()->DeleteSoon(FROM_HERE, this); |
| + } |
| +} |
| + |
| +RoutedRawChannel::~RoutedRawChannel() { |
| + destruct_callback_.Run(this); |
| +} |
| + |
| +void RoutedRawChannel::OnReadMessage( |
| + const MessageInTransit::View& message_view, |
| + ScopedPlatformHandleVectorPtr platform_handles) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + // Note: normally, when a message arrives here we should find a corresponding |
| + // entry for the MessagePipeDispatcher with the given route_id. However it is |
| + // possible that they just connected, and due to race conditions one side has |
| + // connected and sent a message (and even closed) before the other side had a |
| + // chance to register with this RoutedRawChannel. In that case, we must buffer |
| + // all messages. |
| + base::AutoLock auto_lock(lock_); |
| + uint64_t route_id = message_view.route_id(); |
| + if (route_id == kInternalRoutingId) { |
| + CHECK_EQ(message_view.num_bytes(), sizeof(char) + sizeof(uint64_t)); |
|
Tom Sepez
2015/12/04 17:59:43
Again, can a rouge child cause us to abort there?
jam
2015/12/05 00:09:34
yep, added if statement
|
| + const char* bytes = static_cast<const char*>(message_view.bytes()); |
| + CHECK_EQ(bytes[0], ROUTE_CLOSED); |
| + uint64_t closed_route = *reinterpret_cast<const uint64_t*>(&bytes[1]); |
| + if (close_routes_.find(closed_route) != close_routes_.end()) { |
| + NOTREACHED() << "Should only receive one ROUTE_CLOSED per route."; |
| + return; |
| + } |
| + close_routes_.insert(closed_route); |
| + if (routes_.find(closed_route) == routes_.end()) |
| + return; // This side hasn't connected yet. |
| + |
| + routes_[closed_route]->OnError(ERROR_READ_SHUTDOWN); |
| + return; |
| + } |
| + |
| + if (routes_.find(route_id) != routes_.end()) { |
| + routes_[route_id]->OnReadMessage(message_view, platform_handles.Pass()); |
| + } else { |
| + scoped_ptr<PendingMessage> msg(new PendingMessage); |
| + msg->message.resize(message_view.total_size()); |
| + memcpy(&msg->message[0], message_view.main_buffer(), |
| + message_view.total_size()); |
| + msg->handles = platform_handles.Pass(); |
| + pending_messages_.push_back(msg.Pass()); |
| + } |
| +} |
| + |
| +void RoutedRawChannel::OnError(Error error) { |
| + DCHECK(internal::g_io_thread_task_runner->RunsTasksOnCurrentThread()); |
| + bool destruct = false; |
| + { |
| + base::AutoLock auto_lock(lock_); |
| + |
| + channel_->Shutdown(); |
| + channel_ = nullptr; |
| + if (routes_.empty()) { |
| + destruct = true; |
| + } else { |
| + for (auto it = routes_.begin(); it != routes_.end(); ++it) |
| + it->second->OnError(error); |
| + } |
| + } |
| + |
| + if (destruct) |
| + delete this; |
| +} |
| + |
| +} // namespace edk |
| +} // namespace mojo |