| Index: third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc
|
| diff --git a/third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc b/third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..24278236af9f55ea39c6b0c63e4cc14a464f90bf
|
| --- /dev/null
|
| +++ b/third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc
|
| @@ -0,0 +1,316 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "mojo/edk/system/slave_connection_manager.h"
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/bind_helpers.h"
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/message_loop/message_loop.h"
|
| +#include "mojo/edk/system/message_in_transit.h"
|
| +
|
| +namespace mojo {
|
| +namespace system {
|
| +
|
| +// SlaveConnectionManager ------------------------------------------------------
|
| +
|
| +SlaveConnectionManager::SlaveConnectionManager()
|
| + : creation_thread_task_runner_(base::MessageLoop::current()->task_runner()),
|
| + slave_process_delegate_(),
|
| + private_thread_("SlaveConnectionManagerPrivateThread"),
|
| + awaiting_ack_type_(NOT_AWAITING_ACK),
|
| + ack_result_(),
|
| + ack_peer_process_identifier_(),
|
| + ack_platform_handle_(),
|
| + event_(false, false) { // Auto-reset, not initially signalled.
|
| + DCHECK(creation_thread_task_runner_);
|
| + AssertOnCreationThread(); // Just make sure this assertion works correctly.
|
| +}
|
| +
|
| +SlaveConnectionManager::~SlaveConnectionManager() {
|
| + AssertOnCreationThread();
|
| + DCHECK(!slave_process_delegate_);
|
| + DCHECK(!private_thread_.message_loop());
|
| + DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
|
| + DCHECK(!ack_result_);
|
| + DCHECK(!ack_peer_process_identifier_);
|
| + DCHECK(!ack_platform_handle_);
|
| +}
|
| +
|
| +void SlaveConnectionManager::Init(
|
| + embedder::SlaveProcessDelegate* slave_process_delegate,
|
| + embedder::ScopedPlatformHandle platform_handle) {
|
| + AssertOnCreationThread();
|
| + DCHECK(slave_process_delegate);
|
| + DCHECK(platform_handle.is_valid());
|
| + DCHECK(!slave_process_delegate_);
|
| + DCHECK(!private_thread_.message_loop());
|
| +
|
| + slave_process_delegate_ = slave_process_delegate;
|
| + CHECK(private_thread_.StartWithOptions(
|
| + base::Thread::Options(base::MessageLoop::TYPE_IO, 0)));
|
| + private_thread_.message_loop()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&SlaveConnectionManager::InitOnPrivateThread,
|
| + base::Unretained(this), base::Passed(&platform_handle)));
|
| + event_.Wait();
|
| +}
|
| +
|
| +void SlaveConnectionManager::Shutdown() {
|
| + AssertOnCreationThread();
|
| + DCHECK(slave_process_delegate_);
|
| + DCHECK(private_thread_.message_loop());
|
| +
|
| + // The |Stop()| will actually finish all posted tasks.
|
| + private_thread_.message_loop()->PostTask(
|
| + FROM_HERE, base::Bind(&SlaveConnectionManager::ShutdownOnPrivateThread,
|
| + base::Unretained(this)));
|
| + private_thread_.Stop();
|
| + slave_process_delegate_ = nullptr;
|
| +}
|
| +
|
| +bool SlaveConnectionManager::AllowConnect(
|
| + const ConnectionIdentifier& connection_id) {
|
| + AssertNotOnPrivateThread();
|
| +
|
| + base::AutoLock locker(lock_);
|
| + bool result = false;
|
| + private_thread_.message_loop()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&SlaveConnectionManager::AllowConnectOnPrivateThread,
|
| + base::Unretained(this), connection_id, &result));
|
| + event_.Wait();
|
| + return result;
|
| +}
|
| +
|
| +bool SlaveConnectionManager::CancelConnect(
|
| + const ConnectionIdentifier& connection_id) {
|
| + AssertNotOnPrivateThread();
|
| +
|
| + base::AutoLock locker(lock_);
|
| + bool result = false;
|
| + private_thread_.message_loop()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&SlaveConnectionManager::CancelConnectOnPrivateThread,
|
| + base::Unretained(this), connection_id, &result));
|
| + event_.Wait();
|
| + return result;
|
| +}
|
| +
|
| +bool SlaveConnectionManager::Connect(
|
| + const ConnectionIdentifier& connection_id,
|
| + ProcessIdentifier* peer_process_identifier,
|
| + embedder::ScopedPlatformHandle* platform_handle) {
|
| + AssertNotOnPrivateThread();
|
| +
|
| + base::AutoLock locker(lock_);
|
| + bool result = false;
|
| + private_thread_.message_loop()->PostTask(
|
| + FROM_HERE, base::Bind(&SlaveConnectionManager::ConnectOnPrivateThread,
|
| + base::Unretained(this), connection_id, &result,
|
| + peer_process_identifier, platform_handle));
|
| + event_.Wait();
|
| + return result;
|
| +}
|
| +
|
| +void SlaveConnectionManager::InitOnPrivateThread(
|
| + embedder::ScopedPlatformHandle platform_handle) {
|
| + AssertOnPrivateThread();
|
| +
|
| + raw_channel_ = RawChannel::Create(platform_handle.Pass());
|
| + raw_channel_->Init(this);
|
| + event_.Signal();
|
| +}
|
| +
|
| +void SlaveConnectionManager::ShutdownOnPrivateThread() {
|
| + AssertOnPrivateThread();
|
| +
|
| + CHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
|
| + if (raw_channel_) {
|
| + raw_channel_->Shutdown();
|
| + raw_channel_.reset();
|
| + }
|
| +}
|
| +
|
| +void SlaveConnectionManager::AllowConnectOnPrivateThread(
|
| + const ConnectionIdentifier& connection_id,
|
| + bool* result) {
|
| + DCHECK(result);
|
| + AssertOnPrivateThread();
|
| + // This should only posted (from another thread, to |private_thread_|) with
|
| + // the lock held (until this thread triggers |event_|).
|
| + DCHECK(!lock_.Try());
|
| + DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
|
| +
|
| + DVLOG(1) << "Sending AllowConnect: connection ID " << connection_id;
|
| + if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit(
|
| + MessageInTransit::kTypeConnectionManager,
|
| + MessageInTransit::kSubtypeConnectionManagerAllowConnect,
|
| + sizeof(connection_id), &connection_id)))) {
|
| + // Don't tear things down; possibly we'll still read some messages.
|
| + *result = false;
|
| + event_.Signal();
|
| + return;
|
| + }
|
| + awaiting_ack_type_ = AWAITING_ACCEPT_CONNECT_ACK;
|
| + ack_result_ = result;
|
| +}
|
| +
|
| +void SlaveConnectionManager::CancelConnectOnPrivateThread(
|
| + const ConnectionIdentifier& connection_id,
|
| + bool* result) {
|
| + DCHECK(result);
|
| + AssertOnPrivateThread();
|
| + // This should only posted (from another thread, to |private_thread_|) with
|
| + // the lock held (until this thread triggers |event_|).
|
| + DCHECK(!lock_.Try());
|
| + DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
|
| +
|
| + DVLOG(1) << "Sending CancelConnect: connection ID " << connection_id;
|
| + if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit(
|
| + MessageInTransit::kTypeConnectionManager,
|
| + MessageInTransit::kSubtypeConnectionManagerCancelConnect,
|
| + sizeof(connection_id), &connection_id)))) {
|
| + // Don't tear things down; possibly we'll still read some messages.
|
| + *result = false;
|
| + event_.Signal();
|
| + return;
|
| + }
|
| + awaiting_ack_type_ = AWAITING_CANCEL_CONNECT_ACK;
|
| + ack_result_ = result;
|
| +}
|
| +
|
| +void SlaveConnectionManager::ConnectOnPrivateThread(
|
| + const ConnectionIdentifier& connection_id,
|
| + bool* result,
|
| + ProcessIdentifier* peer_process_identifier,
|
| + embedder::ScopedPlatformHandle* platform_handle) {
|
| + DCHECK(result);
|
| + DCHECK(platform_handle);
|
| + DCHECK(!platform_handle->is_valid()); // Not technically wrong, but unlikely.
|
| + AssertOnPrivateThread();
|
| + // This should only posted (from another thread, to |private_thread_|) with
|
| + // the lock held (until this thread triggers |event_|).
|
| + DCHECK(!lock_.Try());
|
| + DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK);
|
| +
|
| + DVLOG(1) << "Sending Connect: connection ID " << connection_id;
|
| + if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit(
|
| + MessageInTransit::kTypeConnectionManager,
|
| + MessageInTransit::kSubtypeConnectionManagerConnect,
|
| + sizeof(connection_id), &connection_id)))) {
|
| + // Don't tear things down; possibly we'll still read some messages.
|
| + *result = false;
|
| + platform_handle->reset();
|
| + event_.Signal();
|
| + return;
|
| + }
|
| + awaiting_ack_type_ = AWAITING_CONNECT_ACK;
|
| + ack_result_ = result;
|
| + ack_peer_process_identifier_ = peer_process_identifier;
|
| + ack_platform_handle_ = platform_handle;
|
| +}
|
| +
|
| +void SlaveConnectionManager::OnReadMessage(
|
| + const MessageInTransit::View& message_view,
|
| + embedder::ScopedPlatformHandleVectorPtr platform_handles) {
|
| + AssertOnPrivateThread();
|
| +
|
| + // Set |*ack_result_| to false by default.
|
| + *ack_result_ = false;
|
| +
|
| + // Note: Since we should be able to trust the master, simply crash (i.e.,
|
| + // |CHECK()|-fail) if it sends us something invalid.
|
| +
|
| + // Unsolicited message.
|
| + CHECK_NE(awaiting_ack_type_, NOT_AWAITING_ACK);
|
| + // Bad message type.
|
| + CHECK_EQ(message_view.type(), MessageInTransit::kTypeConnectionManagerAck);
|
| +
|
| + size_t num_bytes = message_view.num_bytes();
|
| + size_t num_platform_handles = platform_handles ? platform_handles->size() : 0;
|
| +
|
| + if (message_view.subtype() ==
|
| + MessageInTransit::kSubtypeConnectionManagerAckFailure) {
|
| + // Failure acks never have any contents.
|
| + CHECK_EQ(num_bytes, 0u);
|
| + CHECK_EQ(num_platform_handles, 0u);
|
| + // Leave |*ack_result_| false.
|
| + } else if (message_view.subtype() ==
|
| + MessageInTransit::kSubtypeConnectionManagerAckSuccess) {
|
| + if (awaiting_ack_type_ == AWAITING_ACCEPT_CONNECT_ACK ||
|
| + awaiting_ack_type_ == AWAITING_CANCEL_CONNECT_ACK) {
|
| + // Success acks for "accept/cancel connect" have no contents.
|
| + CHECK_EQ(num_bytes, 0u);
|
| + CHECK_EQ(num_platform_handles, 0u);
|
| + *ack_result_ = true;
|
| + DCHECK(!ack_peer_process_identifier_);
|
| + DCHECK(!ack_platform_handle_);
|
| + } else {
|
| + DCHECK_EQ(awaiting_ack_type_, AWAITING_CONNECT_ACK);
|
| + // Success acks for "connect" always have a |ProcessIdentifier| as data,
|
| + // and *maybe* one platform handle.
|
| + CHECK_EQ(num_bytes, sizeof(ProcessIdentifier));
|
| + CHECK_LE(num_platform_handles, 1u);
|
| + *ack_result_ = true;
|
| + *ack_peer_process_identifier_ =
|
| + *reinterpret_cast<const ProcessIdentifier*>(message_view.bytes());
|
| + if (num_platform_handles > 0) {
|
| + ack_platform_handle_->reset(platform_handles->at(0));
|
| + platform_handles->at(0) = embedder::PlatformHandle();
|
| + } else {
|
| + ack_platform_handle_->reset();
|
| + }
|
| + }
|
| + } else {
|
| + // Bad message subtype.
|
| + CHECK(false);
|
| + }
|
| +
|
| + awaiting_ack_type_ = NOT_AWAITING_ACK;
|
| + ack_result_ = nullptr;
|
| + ack_peer_process_identifier_ = nullptr;
|
| + ack_platform_handle_ = nullptr;
|
| + event_.Signal();
|
| +}
|
| +
|
| +void SlaveConnectionManager::OnError(Error error) {
|
| + AssertOnPrivateThread();
|
| +
|
| + // Ignore write errors, since we may still have some messages to read.
|
| + if (error == RawChannel::Delegate::ERROR_WRITE)
|
| + return;
|
| +
|
| + raw_channel_->Shutdown();
|
| + raw_channel_.reset();
|
| +
|
| + DCHECK(slave_process_delegate_);
|
| + creation_thread_task_runner_->PostTask(
|
| + FROM_HERE, base::Bind(&embedder::SlaveProcessDelegate::OnMasterDisconnect,
|
| + base::Unretained(slave_process_delegate_)));
|
| +}
|
| +
|
| +void SlaveConnectionManager::AssertOnCreationThread() const {
|
| + DCHECK(base::MessageLoop::current());
|
| + DCHECK_EQ(base::MessageLoop::current()->task_runner(),
|
| + creation_thread_task_runner_);
|
| +}
|
| +
|
| +void SlaveConnectionManager::AssertNotOnPrivateThread() const {
|
| + // This should only be called after |Init()| and before |Shutdown()|. (If not,
|
| + // the subsequent |DCHECK_NE()| is invalid, since the current thread may not
|
| + // have a message loop.)
|
| + DCHECK(private_thread_.message_loop());
|
| + DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop());
|
| +}
|
| +
|
| +void SlaveConnectionManager::AssertOnPrivateThread() const {
|
| + // This should only be called after |Init()| and before |Shutdown()|.
|
| + DCHECK(private_thread_.message_loop());
|
| + DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop());
|
| +}
|
| +
|
| +} // namespace system
|
| +} // namespace mojo
|
|
|