Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1750)

Unified Diff: third_party/mojo/src/mojo/edk/system/master_connection_manager.cc

Issue 883843002: Update mojo sdk to rev 126532ce21c5c3c55a1e1693731411cb60169efd (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Response to review Created 5 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/mojo/src/mojo/edk/system/master_connection_manager.cc
diff --git a/third_party/mojo/src/mojo/edk/system/master_connection_manager.cc b/third_party/mojo/src/mojo/edk/system/master_connection_manager.cc
new file mode 100644
index 0000000000000000000000000000000000000000..b78dfb618fd83f24f854836d4aa0801c5f86765a
--- /dev/null
+++ b/third_party/mojo/src/mojo/edk/system/master_connection_manager.cc
@@ -0,0 +1,572 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/edk/system/master_connection_manager.h"
+
+#include "base/bind.h"
+#include "base/bind_helpers.h"
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/message_loop/message_loop.h"
+#include "base/synchronization/waitable_event.h"
+#include "mojo/edk/embedder/master_process_delegate.h"
+#include "mojo/edk/embedder/platform_channel_pair.h"
+#include "mojo/edk/embedder/platform_handle_vector.h"
+#include "mojo/edk/system/message_in_transit.h"
+#include "mojo/edk/system/raw_channel.h"
+#include "mojo/edk/system/transport_data.h"
+
+namespace mojo {
+namespace system {
+
+const ProcessIdentifier kFirstProcessIdentifier = 1;
+const ProcessIdentifier kMasterProcessIdentifier =
+ static_cast<ProcessIdentifier>(-1);
+
+// MasterConnectionManager::Helper ---------------------------------------------
+
+// |MasterConnectionManager::Helper| is not thread-safe, and must only be used
+// on its |owner_|'s private thread.
+class MasterConnectionManager::Helper : public RawChannel::Delegate {
+ public:
+ Helper(MasterConnectionManager* owner,
+ ProcessIdentifier process_identifier,
+ scoped_ptr<embedder::SlaveInfo> slave_info,
+ embedder::ScopedPlatformHandle platform_handle);
+ ~Helper() override;
+
+ void Init();
+ scoped_ptr<embedder::SlaveInfo> Shutdown();
+
+ private:
+ // |RawChannel::Delegate| methods:
+ void OnReadMessage(
+ const MessageInTransit::View& message_view,
+ embedder::ScopedPlatformHandleVectorPtr platform_handles) override;
+ void OnError(Error error) override;
+
+ // Handles an error that's fatal to this object. Note that this probably
+ // results in |Shutdown()| being called (in the nested context) and then this
+ // object being destroyed.
+ void FatalError();
+
+ MasterConnectionManager* const owner_;
+ const ProcessIdentifier process_identifier_;
+ scoped_ptr<embedder::SlaveInfo> slave_info_;
+ scoped_ptr<RawChannel> raw_channel_;
+
+ DISALLOW_COPY_AND_ASSIGN(Helper);
+};
+
+MasterConnectionManager::Helper::Helper(
+ MasterConnectionManager* owner,
+ ProcessIdentifier process_identifier,
+ scoped_ptr<embedder::SlaveInfo> slave_info,
+ embedder::ScopedPlatformHandle platform_handle)
+ : owner_(owner),
+ process_identifier_(process_identifier),
+ slave_info_(slave_info.Pass()),
+ raw_channel_(RawChannel::Create(platform_handle.Pass())) {
+}
+
+MasterConnectionManager::Helper::~Helper() {
+ DCHECK(!slave_info_);
+}
+
+void MasterConnectionManager::Helper::Init() {
+ raw_channel_->Init(this);
+}
+
+scoped_ptr<embedder::SlaveInfo> MasterConnectionManager::Helper::Shutdown() {
+ raw_channel_->Shutdown();
+ raw_channel_.reset();
+ return slave_info_.Pass();
+}
+
+void MasterConnectionManager::Helper::OnReadMessage(
+ const MessageInTransit::View& message_view,
+ embedder::ScopedPlatformHandleVectorPtr platform_handles) {
+ if (message_view.type() != MessageInTransit::kTypeConnectionManager) {
+ LOG(ERROR) << "Invalid message type " << message_view.type();
+ FatalError(); // WARNING: This destroys us.
+ return;
+ }
+
+ // Currently, all the messages simply have a |ConnectionIdentifier| as data.
+ if (message_view.num_bytes() != sizeof(ConnectionIdentifier)) {
+ LOG(ERROR) << "Invalid message size " << message_view.num_bytes();
+ FatalError(); // WARNING: This destroys us.
+ return;
+ }
+
+ // And none of them should have any platform handles attached.
+ if (message_view.transport_data_buffer()) {
+ LOG(ERROR) << "Invalid message with transport data";
+ FatalError(); // WARNING: This destroys us.
+ return;
+ }
+
+ const ConnectionIdentifier* connection_id =
+ reinterpret_cast<const ConnectionIdentifier*>(message_view.bytes());
+ bool result;
+ ProcessIdentifier peer_process_identifier = kInvalidProcessIdentifier;
+ embedder::ScopedPlatformHandle platform_handle;
+ uint32_t num_bytes = 0;
+ const void* bytes = nullptr;
+ switch (message_view.subtype()) {
+ case MessageInTransit::kSubtypeConnectionManagerAllowConnect:
+ result = owner_->AllowConnectImpl(process_identifier_, *connection_id);
+ break;
+ case MessageInTransit::kSubtypeConnectionManagerCancelConnect:
+ result = owner_->CancelConnectImpl(process_identifier_, *connection_id);
+ break;
+ case MessageInTransit::kSubtypeConnectionManagerConnect:
+ result = owner_->ConnectImpl(process_identifier_, *connection_id,
+ &peer_process_identifier, &platform_handle);
+ // Success acks for "connect" have the peer process identifier as data
+ // (and maybe also a platform handle).
+ if (result) {
+ num_bytes = static_cast<uint32_t>(sizeof(peer_process_identifier));
+ bytes = &peer_process_identifier;
+ }
+ break;
+ default:
+ LOG(ERROR) << "Invalid message subtype " << message_view.subtype();
+ FatalError(); // WARNING: This destroys us.
+ return;
+ }
+
+ scoped_ptr<MessageInTransit> response(new MessageInTransit(
+ MessageInTransit::kTypeConnectionManagerAck,
+ result ? MessageInTransit::kSubtypeConnectionManagerAckSuccess
+ : MessageInTransit::kSubtypeConnectionManagerAckFailure,
+ num_bytes, bytes));
+
+ if (platform_handle.is_valid()) {
+ // Only success acks for "connect" *may* have a platform handle attached.
+ DCHECK(result);
+ DCHECK_EQ(message_view.subtype(),
+ MessageInTransit::kSubtypeConnectionManagerConnect);
+
+ embedder::ScopedPlatformHandleVectorPtr platform_handles(
+ new embedder::PlatformHandleVector());
+ platform_handles->push_back(platform_handle.release());
+ response->SetTransportData(
+ make_scoped_ptr(new TransportData(platform_handles.Pass())));
+ }
+
+ if (!raw_channel_->WriteMessage(response.Pass())) {
+ LOG(ERROR) << "WriteMessage failed";
+ FatalError(); // WARNING: This destroys us.
+ return;
+ }
+}
+
+void MasterConnectionManager::Helper::OnError(Error /*error*/) {
+ // Every error (read or write) is fatal (for that particular connection). Read
+ // errors are fatal since no more commands will be received from that
+ // connection. Write errors are fatal since it is no longer possible to send
+ // responses.
+ FatalError(); // WARNING: This destroys us.
+}
+
+void MasterConnectionManager::Helper::FatalError() {
+ owner_->OnError(process_identifier_); // WARNING: This destroys us.
+}
+
+// MasterConnectionManager::PendingConnectionInfo ------------------------------
+
+struct MasterConnectionManager::PendingConnectionInfo {
+ // States:
+ // - This is created upon a first "allow connect" (with |first| set
+ // immediately). We then wait for a second "allow connect".
+ // - After the second "allow connect" (and |second| is set), we wait for
+ // "connects" from both |first| and |second|.
+ // - We may then receive "connect" from either |first| or |second|, at which
+ // which point it remains to wait for "connect" from the other.
+ // I.e., the valid state transitions are:
+ // AWAITING_SECOND_ALLOW_CONNECT -> AWAITING_CONNECTS_FROM_BOTH
+ // -> {AWAITING_CONNECT_FROM_FIRST,AWAITING_CONNECT_FROM_SECOND}
+ enum State {
+ AWAITING_SECOND_ALLOW_CONNECT,
+ AWAITING_CONNECTS_FROM_BOTH,
+ AWAITING_CONNECT_FROM_FIRST,
+ AWAITING_CONNECT_FROM_SECOND
+ };
+
+ explicit PendingConnectionInfo(ProcessIdentifier first)
+ : state(AWAITING_SECOND_ALLOW_CONNECT),
+ first(first),
+ second(kInvalidProcessIdentifier) {
+ DCHECK_NE(first, kInvalidProcessIdentifier);
+ }
+ ~PendingConnectionInfo() {}
+
+ State state;
+
+ ProcessIdentifier first;
+ ProcessIdentifier second;
+
+ // Valid in AWAITING_CONNECT_FROM_{FIRST, SECOND} states.
+ embedder::ScopedPlatformHandle remaining_handle;
+};
+
+// MasterConnectionManager -----------------------------------------------------
+
+MasterConnectionManager::MasterConnectionManager()
+ : creation_thread_task_runner_(base::MessageLoop::current()->task_runner()),
+ master_process_delegate_(),
+ private_thread_("MasterConnectionManagerPrivateThread"),
+ next_process_identifier_(kFirstProcessIdentifier) {
+ DCHECK(creation_thread_task_runner_);
+ AssertOnCreationThread(); // Just make sure this assertion works correctly.
+}
+
+MasterConnectionManager::~MasterConnectionManager() {
+ AssertOnCreationThread();
+ DCHECK(!master_process_delegate_);
+ DCHECK(!private_thread_.message_loop());
+ DCHECK(helpers_.empty());
+ DCHECK(pending_connections_.empty());
+}
+
+void MasterConnectionManager::Init(
+ embedder::MasterProcessDelegate* master_process_delegate) {
+ AssertOnCreationThread();
+ DCHECK(master_process_delegate);
+ DCHECK(!master_process_delegate_);
+ DCHECK(!private_thread_.message_loop());
+
+ master_process_delegate_ = master_process_delegate;
+ CHECK(private_thread_.StartWithOptions(
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0)));
+}
+
+void MasterConnectionManager::Shutdown() {
+ AssertOnCreationThread();
+ DCHECK(master_process_delegate_);
+ DCHECK(private_thread_.message_loop());
+
+ // The |Stop()| will actually finish all posted tasks.
+ private_thread_.message_loop()->PostTask(
+ FROM_HERE, base::Bind(&MasterConnectionManager::ShutdownOnPrivateThread,
+ base::Unretained(this)));
+ private_thread_.Stop();
+ DCHECK(helpers_.empty());
+ DCHECK(pending_connections_.empty());
+ master_process_delegate_ = nullptr;
+}
+
+void MasterConnectionManager::AddSlave(
+ scoped_ptr<embedder::SlaveInfo> slave_info,
+ embedder::ScopedPlatformHandle platform_handle) {
+ // We don't really care if |slave_info| is non-null or not.
+ DCHECK(platform_handle.is_valid());
+ AssertNotOnPrivateThread();
+
+ // We have to wait for the task to be executed, in case someone calls
+ // |AddSlave()| followed immediately by |Shutdown()|.
+ base::WaitableEvent event(false, false);
+ private_thread_.message_loop()->PostTask(
+ FROM_HERE,
+ base::Bind(&MasterConnectionManager::AddSlaveOnPrivateThread,
+ base::Unretained(this), base::Passed(&slave_info),
+ base::Passed(&platform_handle), base::Unretained(&event)));
+ event.Wait();
+}
+
+bool MasterConnectionManager::AllowConnect(
+ const ConnectionIdentifier& connection_id) {
+ AssertNotOnPrivateThread();
+ return AllowConnectImpl(kMasterProcessIdentifier, connection_id);
+}
+
+bool MasterConnectionManager::CancelConnect(
+ const ConnectionIdentifier& connection_id) {
+ AssertNotOnPrivateThread();
+ return CancelConnectImpl(kMasterProcessIdentifier, connection_id);
+}
+
+bool MasterConnectionManager::Connect(
+ const ConnectionIdentifier& connection_id,
+ ProcessIdentifier* peer_process_identifier,
+ embedder::ScopedPlatformHandle* platform_handle) {
+ AssertNotOnPrivateThread();
+ return ConnectImpl(kMasterProcessIdentifier, connection_id,
+ peer_process_identifier, platform_handle);
+}
+
+bool MasterConnectionManager::AllowConnectImpl(
+ ProcessIdentifier process_identifier,
+ const ConnectionIdentifier& connection_id) {
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
+
+ base::AutoLock locker(lock_);
+
+ auto it = pending_connections_.find(connection_id);
+ if (it == pending_connections_.end()) {
+ pending_connections_[connection_id] =
+ new PendingConnectionInfo(process_identifier);
+ // TODO(vtl): Track process identifier -> pending connections also (so these
+ // can be removed efficiently if that process disconnects).
+ DVLOG(1) << "New pending connection ID " << connection_id
+ << ": AllowConnect() from first process identifier "
+ << process_identifier;
+ return true;
+ }
+
+ PendingConnectionInfo* info = it->second;
+ if (info->state == PendingConnectionInfo::AWAITING_SECOND_ALLOW_CONNECT) {
+ info->state = PendingConnectionInfo::AWAITING_CONNECTS_FROM_BOTH;
+ info->second = process_identifier;
+ DVLOG(1) << "Pending connection ID " << connection_id
+ << ": AllowConnect() from second process identifier "
+ << process_identifier;
+ return true;
+ }
+
+ // Someone's behaving badly, but we don't know who (it might not be the
+ // caller).
+ LOG(ERROR) << "AllowConnect() from process " << process_identifier
+ << " for connection ID " << connection_id << " already in state "
+ << info->state;
+ pending_connections_.erase(it);
+ delete info;
+ return false;
+}
+
+bool MasterConnectionManager::CancelConnectImpl(
+ ProcessIdentifier process_identifier,
+ const ConnectionIdentifier& connection_id) {
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
+
+ base::AutoLock locker(lock_);
+
+ auto it = pending_connections_.find(connection_id);
+ if (it == pending_connections_.end()) {
+ // Not necessarily the caller's fault, and not necessarily an error.
+ DVLOG(1) << "CancelConnect() from process " << process_identifier
+ << " for connection ID " << connection_id
+ << " which is not (or no longer) pending";
+ return true;
+ }
+
+ PendingConnectionInfo* info = it->second;
+ if (process_identifier != info->first && process_identifier != info->second) {
+ LOG(ERROR) << "CancelConnect() from process " << process_identifier
+ << " for connection ID " << connection_id
+ << " which is neither connectee";
+ return false;
+ }
+
+ // Just erase it. The other side may also try to cancel, in which case it'll
+ // "fail" in the first if statement above (we assume that connection IDs never
+ // collide, so there's no need to carefully track both sides).
+ pending_connections_.erase(it);
+ delete info;
+ return true;
+}
+
+bool MasterConnectionManager::ConnectImpl(
+ ProcessIdentifier process_identifier,
+ const ConnectionIdentifier& connection_id,
+ ProcessIdentifier* peer_process_identifier,
+ embedder::ScopedPlatformHandle* platform_handle) {
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
+ DCHECK(peer_process_identifier);
+ DCHECK(platform_handle);
+ DCHECK(!platform_handle->is_valid()); // Not technically wrong, but unlikely.
+
+ base::AutoLock locker(lock_);
+
+ auto it = pending_connections_.find(connection_id);
+ if (it == pending_connections_.end()) {
+ // Not necessarily the caller's fault.
+ LOG(ERROR) << "Connect() from process " << process_identifier
+ << " for connection ID " << connection_id
+ << " which is not pending";
+ return false;
+ }
+
+ PendingConnectionInfo* info = it->second;
+ if (info->state == PendingConnectionInfo::AWAITING_CONNECTS_FROM_BOTH) {
+ DCHECK(!info->remaining_handle.is_valid());
+
+ if (process_identifier == info->first) {
+ info->state = PendingConnectionInfo::AWAITING_CONNECT_FROM_SECOND;
+ *peer_process_identifier = info->second;
+ } else if (process_identifier == info->second) {
+ info->state = PendingConnectionInfo::AWAITING_CONNECT_FROM_FIRST;
+ *peer_process_identifier = info->first;
+ } else {
+ LOG(ERROR) << "Connect() from process " << process_identifier
+ << " for connection ID " << connection_id
+ << " which is neither connectee";
+ return false;
+ }
+
+ if (info->first == info->second) {
+ platform_handle->reset();
+ DCHECK(!info->remaining_handle.is_valid());
+ } else {
+ embedder::PlatformChannelPair platform_channel_pair;
+ *platform_handle = platform_channel_pair.PassServerHandle();
+ DCHECK(platform_handle->is_valid());
+ info->remaining_handle = platform_channel_pair.PassClientHandle();
+ DCHECK(info->remaining_handle.is_valid());
+ }
+ DVLOG(1) << "Connection ID " << connection_id
+ << ": first Connect() from process identifier "
+ << process_identifier;
+ return true;
+ }
+
+ ProcessIdentifier remaining_connectee;
+ ProcessIdentifier peer;
+ if (info->state == PendingConnectionInfo::AWAITING_CONNECT_FROM_FIRST) {
+ remaining_connectee = info->first;
+ peer = info->second;
+ } else if (info->state ==
+ PendingConnectionInfo::AWAITING_CONNECT_FROM_SECOND) {
+ remaining_connectee = info->second;
+ peer = info->first;
+ } else {
+ // Someone's behaving badly, but we don't know who (it might not be the
+ // caller).
+ LOG(ERROR) << "Connect() from process " << process_identifier
+ << " for connection ID " << connection_id << " in state "
+ << info->state;
+ pending_connections_.erase(it);
+ delete info;
+ return false;
+ }
+
+ if (process_identifier != remaining_connectee) {
+ LOG(ERROR) << "Connect() from process " << process_identifier
+ << " for connection ID " << connection_id
+ << " which is not the remaining connectee";
+ pending_connections_.erase(it);
+ delete info;
+ return false;
+ }
+
+ *peer_process_identifier = peer;
+ *platform_handle = info->remaining_handle.Pass();
+ DCHECK((info->first == info->second) ^ platform_handle->is_valid());
+ pending_connections_.erase(it);
+ delete info;
+ DVLOG(1) << "Connection ID " << connection_id
+ << ": second Connect() from process identifier "
+ << process_identifier;
+ return true;
+}
+
+void MasterConnectionManager::ShutdownOnPrivateThread() {
+ AssertOnPrivateThread();
+
+ if (!pending_connections_.empty()) {
+ DVLOG(1) << "Shutting down with connections pending";
+ for (auto& p : pending_connections_)
+ delete p.second;
+ pending_connections_.clear();
+ }
+
+ if (!helpers_.empty()) {
+ DVLOG(1) << "Shutting down with slaves still connected";
+ for (auto& p : helpers_) {
+ scoped_ptr<embedder::SlaveInfo> slave_info = p.second->Shutdown();
+ delete p.second;
+ CallOnSlaveDisconnect(slave_info.Pass());
+ }
+ helpers_.clear();
+ }
+}
+
+void MasterConnectionManager::AddSlaveOnPrivateThread(
+ scoped_ptr<embedder::SlaveInfo> slave_info,
+ embedder::ScopedPlatformHandle platform_handle,
+ base::WaitableEvent* event) {
+ DCHECK(platform_handle.is_valid());
+ DCHECK(event);
+ AssertOnPrivateThread();
+
+ CHECK_NE(next_process_identifier_, kMasterProcessIdentifier);
+ ProcessIdentifier process_identifier = next_process_identifier_;
+ next_process_identifier_++;
+
+ scoped_ptr<Helper> helper(new Helper(
+ this, process_identifier, slave_info.Pass(), platform_handle.Pass()));
+ helper->Init();
+
+ DCHECK(helpers_.find(process_identifier) == helpers_.end());
+ helpers_[process_identifier] = helper.release();
+
+ DVLOG(1) << "Added process identifier " << process_identifier;
+ event->Signal();
+}
+
+void MasterConnectionManager::OnError(ProcessIdentifier process_identifier) {
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier);
+ AssertOnPrivateThread();
+
+ auto it = helpers_.find(process_identifier);
+ DCHECK(it != helpers_.end());
+ Helper* helper = it->second;
+ scoped_ptr<embedder::SlaveInfo> slave_info = helper->Shutdown();
+ helpers_.erase(it);
+ delete helper;
+
+ {
+ base::AutoLock locker(lock_);
+
+ // TODO(vtl): This isn't very efficient.
+ for (auto it = pending_connections_.begin();
+ it != pending_connections_.end();) {
+ if (it->second->first == process_identifier ||
+ it->second->second == process_identifier) {
+ auto it_to_erase = it;
+ ++it;
+ delete it_to_erase->second;
+ pending_connections_.erase(it_to_erase);
+ } else {
+ ++it;
+ }
+ }
+ }
+
+ CallOnSlaveDisconnect(slave_info.Pass());
+}
+
+void MasterConnectionManager::CallOnSlaveDisconnect(
+ scoped_ptr<embedder::SlaveInfo> slave_info) {
+ AssertOnPrivateThread();
+ DCHECK(master_process_delegate_);
+ creation_thread_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&embedder::MasterProcessDelegate::OnSlaveDisconnect,
+ base::Unretained(master_process_delegate_),
+ base::Passed(&slave_info)));
+}
+
+void MasterConnectionManager::AssertOnCreationThread() const {
+ DCHECK(base::MessageLoop::current());
+ DCHECK_EQ(base::MessageLoop::current()->task_runner(),
+ creation_thread_task_runner_);
+}
+
+void MasterConnectionManager::AssertNotOnPrivateThread() const {
+ // This should only be called after |Init()| and before |Shutdown()|. (If not,
+ // the subsequent |DCHECK_NE()| is invalid, since the current thread may not
+ // have a message loop.)
+ DCHECK(private_thread_.message_loop());
+ DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop());
+}
+
+void MasterConnectionManager::AssertOnPrivateThread() const {
+ // This should only be called after |Init()| and before |Shutdown()|.
+ DCHECK(private_thread_.message_loop());
+ DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop());
+}
+
+} // namespace system
+} // namespace mojo

Powered by Google App Engine
This is Rietveld 408576698