Index: third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc b/third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..24278236af9f55ea39c6b0c63e4cc14a464f90bf |
--- /dev/null |
+++ b/third_party/mojo/src/mojo/edk/system/slave_connection_manager.cc |
@@ -0,0 +1,316 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/slave_connection_manager.h" |
+ |
+#include "base/bind.h" |
+#include "base/bind_helpers.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "mojo/edk/system/message_in_transit.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+// SlaveConnectionManager ------------------------------------------------------ |
+ |
+SlaveConnectionManager::SlaveConnectionManager() |
+ : creation_thread_task_runner_(base::MessageLoop::current()->task_runner()), |
+ slave_process_delegate_(), |
+ private_thread_("SlaveConnectionManagerPrivateThread"), |
+ awaiting_ack_type_(NOT_AWAITING_ACK), |
+ ack_result_(), |
+ ack_peer_process_identifier_(), |
+ ack_platform_handle_(), |
+ event_(false, false) { // Auto-reset, not initially signalled. |
+ DCHECK(creation_thread_task_runner_); |
+ AssertOnCreationThread(); // Just make sure this assertion works correctly. |
+} |
+ |
+SlaveConnectionManager::~SlaveConnectionManager() { |
+ AssertOnCreationThread(); |
+ DCHECK(!slave_process_delegate_); |
+ DCHECK(!private_thread_.message_loop()); |
+ DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
+ DCHECK(!ack_result_); |
+ DCHECK(!ack_peer_process_identifier_); |
+ DCHECK(!ack_platform_handle_); |
+} |
+ |
+void SlaveConnectionManager::Init( |
+ embedder::SlaveProcessDelegate* slave_process_delegate, |
+ embedder::ScopedPlatformHandle platform_handle) { |
+ AssertOnCreationThread(); |
+ DCHECK(slave_process_delegate); |
+ DCHECK(platform_handle.is_valid()); |
+ DCHECK(!slave_process_delegate_); |
+ DCHECK(!private_thread_.message_loop()); |
+ |
+ slave_process_delegate_ = slave_process_delegate; |
+ CHECK(private_thread_.StartWithOptions( |
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0))); |
+ private_thread_.message_loop()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&SlaveConnectionManager::InitOnPrivateThread, |
+ base::Unretained(this), base::Passed(&platform_handle))); |
+ event_.Wait(); |
+} |
+ |
+void SlaveConnectionManager::Shutdown() { |
+ AssertOnCreationThread(); |
+ DCHECK(slave_process_delegate_); |
+ DCHECK(private_thread_.message_loop()); |
+ |
+ // The |Stop()| will actually finish all posted tasks. |
+ private_thread_.message_loop()->PostTask( |
+ FROM_HERE, base::Bind(&SlaveConnectionManager::ShutdownOnPrivateThread, |
+ base::Unretained(this))); |
+ private_thread_.Stop(); |
+ slave_process_delegate_ = nullptr; |
+} |
+ |
+bool SlaveConnectionManager::AllowConnect( |
+ const ConnectionIdentifier& connection_id) { |
+ AssertNotOnPrivateThread(); |
+ |
+ base::AutoLock locker(lock_); |
+ bool result = false; |
+ private_thread_.message_loop()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&SlaveConnectionManager::AllowConnectOnPrivateThread, |
+ base::Unretained(this), connection_id, &result)); |
+ event_.Wait(); |
+ return result; |
+} |
+ |
+bool SlaveConnectionManager::CancelConnect( |
+ const ConnectionIdentifier& connection_id) { |
+ AssertNotOnPrivateThread(); |
+ |
+ base::AutoLock locker(lock_); |
+ bool result = false; |
+ private_thread_.message_loop()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&SlaveConnectionManager::CancelConnectOnPrivateThread, |
+ base::Unretained(this), connection_id, &result)); |
+ event_.Wait(); |
+ return result; |
+} |
+ |
+bool SlaveConnectionManager::Connect( |
+ const ConnectionIdentifier& connection_id, |
+ ProcessIdentifier* peer_process_identifier, |
+ embedder::ScopedPlatformHandle* platform_handle) { |
+ AssertNotOnPrivateThread(); |
+ |
+ base::AutoLock locker(lock_); |
+ bool result = false; |
+ private_thread_.message_loop()->PostTask( |
+ FROM_HERE, base::Bind(&SlaveConnectionManager::ConnectOnPrivateThread, |
+ base::Unretained(this), connection_id, &result, |
+ peer_process_identifier, platform_handle)); |
+ event_.Wait(); |
+ return result; |
+} |
+ |
+void SlaveConnectionManager::InitOnPrivateThread( |
+ embedder::ScopedPlatformHandle platform_handle) { |
+ AssertOnPrivateThread(); |
+ |
+ raw_channel_ = RawChannel::Create(platform_handle.Pass()); |
+ raw_channel_->Init(this); |
+ event_.Signal(); |
+} |
+ |
+void SlaveConnectionManager::ShutdownOnPrivateThread() { |
+ AssertOnPrivateThread(); |
+ |
+ CHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
+ if (raw_channel_) { |
+ raw_channel_->Shutdown(); |
+ raw_channel_.reset(); |
+ } |
+} |
+ |
+void SlaveConnectionManager::AllowConnectOnPrivateThread( |
+ const ConnectionIdentifier& connection_id, |
+ bool* result) { |
+ DCHECK(result); |
+ AssertOnPrivateThread(); |
+ // This should only posted (from another thread, to |private_thread_|) with |
+ // the lock held (until this thread triggers |event_|). |
+ DCHECK(!lock_.Try()); |
+ DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
+ |
+ DVLOG(1) << "Sending AllowConnect: connection ID " << connection_id; |
+ if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit( |
+ MessageInTransit::kTypeConnectionManager, |
+ MessageInTransit::kSubtypeConnectionManagerAllowConnect, |
+ sizeof(connection_id), &connection_id)))) { |
+ // Don't tear things down; possibly we'll still read some messages. |
+ *result = false; |
+ event_.Signal(); |
+ return; |
+ } |
+ awaiting_ack_type_ = AWAITING_ACCEPT_CONNECT_ACK; |
+ ack_result_ = result; |
+} |
+ |
+void SlaveConnectionManager::CancelConnectOnPrivateThread( |
+ const ConnectionIdentifier& connection_id, |
+ bool* result) { |
+ DCHECK(result); |
+ AssertOnPrivateThread(); |
+ // This should only posted (from another thread, to |private_thread_|) with |
+ // the lock held (until this thread triggers |event_|). |
+ DCHECK(!lock_.Try()); |
+ DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
+ |
+ DVLOG(1) << "Sending CancelConnect: connection ID " << connection_id; |
+ if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit( |
+ MessageInTransit::kTypeConnectionManager, |
+ MessageInTransit::kSubtypeConnectionManagerCancelConnect, |
+ sizeof(connection_id), &connection_id)))) { |
+ // Don't tear things down; possibly we'll still read some messages. |
+ *result = false; |
+ event_.Signal(); |
+ return; |
+ } |
+ awaiting_ack_type_ = AWAITING_CANCEL_CONNECT_ACK; |
+ ack_result_ = result; |
+} |
+ |
+void SlaveConnectionManager::ConnectOnPrivateThread( |
+ const ConnectionIdentifier& connection_id, |
+ bool* result, |
+ ProcessIdentifier* peer_process_identifier, |
+ embedder::ScopedPlatformHandle* platform_handle) { |
+ DCHECK(result); |
+ DCHECK(platform_handle); |
+ DCHECK(!platform_handle->is_valid()); // Not technically wrong, but unlikely. |
+ AssertOnPrivateThread(); |
+ // This should only posted (from another thread, to |private_thread_|) with |
+ // the lock held (until this thread triggers |event_|). |
+ DCHECK(!lock_.Try()); |
+ DCHECK_EQ(awaiting_ack_type_, NOT_AWAITING_ACK); |
+ |
+ DVLOG(1) << "Sending Connect: connection ID " << connection_id; |
+ if (!raw_channel_->WriteMessage(make_scoped_ptr(new MessageInTransit( |
+ MessageInTransit::kTypeConnectionManager, |
+ MessageInTransit::kSubtypeConnectionManagerConnect, |
+ sizeof(connection_id), &connection_id)))) { |
+ // Don't tear things down; possibly we'll still read some messages. |
+ *result = false; |
+ platform_handle->reset(); |
+ event_.Signal(); |
+ return; |
+ } |
+ awaiting_ack_type_ = AWAITING_CONNECT_ACK; |
+ ack_result_ = result; |
+ ack_peer_process_identifier_ = peer_process_identifier; |
+ ack_platform_handle_ = platform_handle; |
+} |
+ |
+void SlaveConnectionManager::OnReadMessage( |
+ const MessageInTransit::View& message_view, |
+ embedder::ScopedPlatformHandleVectorPtr platform_handles) { |
+ AssertOnPrivateThread(); |
+ |
+ // Set |*ack_result_| to false by default. |
+ *ack_result_ = false; |
+ |
+ // Note: Since we should be able to trust the master, simply crash (i.e., |
+ // |CHECK()|-fail) if it sends us something invalid. |
+ |
+ // Unsolicited message. |
+ CHECK_NE(awaiting_ack_type_, NOT_AWAITING_ACK); |
+ // Bad message type. |
+ CHECK_EQ(message_view.type(), MessageInTransit::kTypeConnectionManagerAck); |
+ |
+ size_t num_bytes = message_view.num_bytes(); |
+ size_t num_platform_handles = platform_handles ? platform_handles->size() : 0; |
+ |
+ if (message_view.subtype() == |
+ MessageInTransit::kSubtypeConnectionManagerAckFailure) { |
+ // Failure acks never have any contents. |
+ CHECK_EQ(num_bytes, 0u); |
+ CHECK_EQ(num_platform_handles, 0u); |
+ // Leave |*ack_result_| false. |
+ } else if (message_view.subtype() == |
+ MessageInTransit::kSubtypeConnectionManagerAckSuccess) { |
+ if (awaiting_ack_type_ == AWAITING_ACCEPT_CONNECT_ACK || |
+ awaiting_ack_type_ == AWAITING_CANCEL_CONNECT_ACK) { |
+ // Success acks for "accept/cancel connect" have no contents. |
+ CHECK_EQ(num_bytes, 0u); |
+ CHECK_EQ(num_platform_handles, 0u); |
+ *ack_result_ = true; |
+ DCHECK(!ack_peer_process_identifier_); |
+ DCHECK(!ack_platform_handle_); |
+ } else { |
+ DCHECK_EQ(awaiting_ack_type_, AWAITING_CONNECT_ACK); |
+ // Success acks for "connect" always have a |ProcessIdentifier| as data, |
+ // and *maybe* one platform handle. |
+ CHECK_EQ(num_bytes, sizeof(ProcessIdentifier)); |
+ CHECK_LE(num_platform_handles, 1u); |
+ *ack_result_ = true; |
+ *ack_peer_process_identifier_ = |
+ *reinterpret_cast<const ProcessIdentifier*>(message_view.bytes()); |
+ if (num_platform_handles > 0) { |
+ ack_platform_handle_->reset(platform_handles->at(0)); |
+ platform_handles->at(0) = embedder::PlatformHandle(); |
+ } else { |
+ ack_platform_handle_->reset(); |
+ } |
+ } |
+ } else { |
+ // Bad message subtype. |
+ CHECK(false); |
+ } |
+ |
+ awaiting_ack_type_ = NOT_AWAITING_ACK; |
+ ack_result_ = nullptr; |
+ ack_peer_process_identifier_ = nullptr; |
+ ack_platform_handle_ = nullptr; |
+ event_.Signal(); |
+} |
+ |
+void SlaveConnectionManager::OnError(Error error) { |
+ AssertOnPrivateThread(); |
+ |
+ // Ignore write errors, since we may still have some messages to read. |
+ if (error == RawChannel::Delegate::ERROR_WRITE) |
+ return; |
+ |
+ raw_channel_->Shutdown(); |
+ raw_channel_.reset(); |
+ |
+ DCHECK(slave_process_delegate_); |
+ creation_thread_task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&embedder::SlaveProcessDelegate::OnMasterDisconnect, |
+ base::Unretained(slave_process_delegate_))); |
+} |
+ |
+void SlaveConnectionManager::AssertOnCreationThread() const { |
+ DCHECK(base::MessageLoop::current()); |
+ DCHECK_EQ(base::MessageLoop::current()->task_runner(), |
+ creation_thread_task_runner_); |
+} |
+ |
+void SlaveConnectionManager::AssertNotOnPrivateThread() const { |
+ // This should only be called after |Init()| and before |Shutdown()|. (If not, |
+ // the subsequent |DCHECK_NE()| is invalid, since the current thread may not |
+ // have a message loop.) |
+ DCHECK(private_thread_.message_loop()); |
+ DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop()); |
+} |
+ |
+void SlaveConnectionManager::AssertOnPrivateThread() const { |
+ // This should only be called after |Init()| and before |Shutdown()|. |
+ DCHECK(private_thread_.message_loop()); |
+ DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop()); |
+} |
+ |
+} // namespace system |
+} // namespace mojo |