Index: third_party/mojo/src/mojo/edk/system/master_connection_manager.cc |
diff --git a/third_party/mojo/src/mojo/edk/system/master_connection_manager.cc b/third_party/mojo/src/mojo/edk/system/master_connection_manager.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b78dfb618fd83f24f854836d4aa0801c5f86765a |
--- /dev/null |
+++ b/third_party/mojo/src/mojo/edk/system/master_connection_manager.cc |
@@ -0,0 +1,572 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/edk/system/master_connection_manager.h" |
+ |
+#include "base/bind.h" |
+#include "base/bind_helpers.h" |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/synchronization/waitable_event.h" |
+#include "mojo/edk/embedder/master_process_delegate.h" |
+#include "mojo/edk/embedder/platform_channel_pair.h" |
+#include "mojo/edk/embedder/platform_handle_vector.h" |
+#include "mojo/edk/system/message_in_transit.h" |
+#include "mojo/edk/system/raw_channel.h" |
+#include "mojo/edk/system/transport_data.h" |
+ |
+namespace mojo { |
+namespace system { |
+ |
+const ProcessIdentifier kFirstProcessIdentifier = 1; |
+const ProcessIdentifier kMasterProcessIdentifier = |
+ static_cast<ProcessIdentifier>(-1); |
+ |
+// MasterConnectionManager::Helper --------------------------------------------- |
+ |
+// |MasterConnectionManager::Helper| is not thread-safe, and must only be used |
+// on its |owner_|'s private thread. |
+class MasterConnectionManager::Helper : public RawChannel::Delegate { |
+ public: |
+ Helper(MasterConnectionManager* owner, |
+ ProcessIdentifier process_identifier, |
+ scoped_ptr<embedder::SlaveInfo> slave_info, |
+ embedder::ScopedPlatformHandle platform_handle); |
+ ~Helper() override; |
+ |
+ void Init(); |
+ scoped_ptr<embedder::SlaveInfo> Shutdown(); |
+ |
+ private: |
+ // |RawChannel::Delegate| methods: |
+ void OnReadMessage( |
+ const MessageInTransit::View& message_view, |
+ embedder::ScopedPlatformHandleVectorPtr platform_handles) override; |
+ void OnError(Error error) override; |
+ |
+ // Handles an error that's fatal to this object. Note that this probably |
+ // results in |Shutdown()| being called (in the nested context) and then this |
+ // object being destroyed. |
+ void FatalError(); |
+ |
+ MasterConnectionManager* const owner_; |
+ const ProcessIdentifier process_identifier_; |
+ scoped_ptr<embedder::SlaveInfo> slave_info_; |
+ scoped_ptr<RawChannel> raw_channel_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(Helper); |
+}; |
+ |
+MasterConnectionManager::Helper::Helper( |
+ MasterConnectionManager* owner, |
+ ProcessIdentifier process_identifier, |
+ scoped_ptr<embedder::SlaveInfo> slave_info, |
+ embedder::ScopedPlatformHandle platform_handle) |
+ : owner_(owner), |
+ process_identifier_(process_identifier), |
+ slave_info_(slave_info.Pass()), |
+ raw_channel_(RawChannel::Create(platform_handle.Pass())) { |
+} |
+ |
+MasterConnectionManager::Helper::~Helper() { |
+ DCHECK(!slave_info_); |
+} |
+ |
+void MasterConnectionManager::Helper::Init() { |
+ raw_channel_->Init(this); |
+} |
+ |
+scoped_ptr<embedder::SlaveInfo> MasterConnectionManager::Helper::Shutdown() { |
+ raw_channel_->Shutdown(); |
+ raw_channel_.reset(); |
+ return slave_info_.Pass(); |
+} |
+ |
+void MasterConnectionManager::Helper::OnReadMessage( |
+ const MessageInTransit::View& message_view, |
+ embedder::ScopedPlatformHandleVectorPtr platform_handles) { |
+ if (message_view.type() != MessageInTransit::kTypeConnectionManager) { |
+ LOG(ERROR) << "Invalid message type " << message_view.type(); |
+ FatalError(); // WARNING: This destroys us. |
+ return; |
+ } |
+ |
+ // Currently, all the messages simply have a |ConnectionIdentifier| as data. |
+ if (message_view.num_bytes() != sizeof(ConnectionIdentifier)) { |
+ LOG(ERROR) << "Invalid message size " << message_view.num_bytes(); |
+ FatalError(); // WARNING: This destroys us. |
+ return; |
+ } |
+ |
+ // And none of them should have any platform handles attached. |
+ if (message_view.transport_data_buffer()) { |
+ LOG(ERROR) << "Invalid message with transport data"; |
+ FatalError(); // WARNING: This destroys us. |
+ return; |
+ } |
+ |
+ const ConnectionIdentifier* connection_id = |
+ reinterpret_cast<const ConnectionIdentifier*>(message_view.bytes()); |
+ bool result; |
+ ProcessIdentifier peer_process_identifier = kInvalidProcessIdentifier; |
+ embedder::ScopedPlatformHandle platform_handle; |
+ uint32_t num_bytes = 0; |
+ const void* bytes = nullptr; |
+ switch (message_view.subtype()) { |
+ case MessageInTransit::kSubtypeConnectionManagerAllowConnect: |
+ result = owner_->AllowConnectImpl(process_identifier_, *connection_id); |
+ break; |
+ case MessageInTransit::kSubtypeConnectionManagerCancelConnect: |
+ result = owner_->CancelConnectImpl(process_identifier_, *connection_id); |
+ break; |
+ case MessageInTransit::kSubtypeConnectionManagerConnect: |
+ result = owner_->ConnectImpl(process_identifier_, *connection_id, |
+ &peer_process_identifier, &platform_handle); |
+ // Success acks for "connect" have the peer process identifier as data |
+ // (and maybe also a platform handle). |
+ if (result) { |
+ num_bytes = static_cast<uint32_t>(sizeof(peer_process_identifier)); |
+ bytes = &peer_process_identifier; |
+ } |
+ break; |
+ default: |
+ LOG(ERROR) << "Invalid message subtype " << message_view.subtype(); |
+ FatalError(); // WARNING: This destroys us. |
+ return; |
+ } |
+ |
+ scoped_ptr<MessageInTransit> response(new MessageInTransit( |
+ MessageInTransit::kTypeConnectionManagerAck, |
+ result ? MessageInTransit::kSubtypeConnectionManagerAckSuccess |
+ : MessageInTransit::kSubtypeConnectionManagerAckFailure, |
+ num_bytes, bytes)); |
+ |
+ if (platform_handle.is_valid()) { |
+ // Only success acks for "connect" *may* have a platform handle attached. |
+ DCHECK(result); |
+ DCHECK_EQ(message_view.subtype(), |
+ MessageInTransit::kSubtypeConnectionManagerConnect); |
+ |
+ embedder::ScopedPlatformHandleVectorPtr platform_handles( |
+ new embedder::PlatformHandleVector()); |
+ platform_handles->push_back(platform_handle.release()); |
+ response->SetTransportData( |
+ make_scoped_ptr(new TransportData(platform_handles.Pass()))); |
+ } |
+ |
+ if (!raw_channel_->WriteMessage(response.Pass())) { |
+ LOG(ERROR) << "WriteMessage failed"; |
+ FatalError(); // WARNING: This destroys us. |
+ return; |
+ } |
+} |
+ |
+void MasterConnectionManager::Helper::OnError(Error /*error*/) { |
+ // Every error (read or write) is fatal (for that particular connection). Read |
+ // errors are fatal since no more commands will be received from that |
+ // connection. Write errors are fatal since it is no longer possible to send |
+ // responses. |
+ FatalError(); // WARNING: This destroys us. |
+} |
+ |
+void MasterConnectionManager::Helper::FatalError() { |
+ owner_->OnError(process_identifier_); // WARNING: This destroys us. |
+} |
+ |
+// MasterConnectionManager::PendingConnectionInfo ------------------------------ |
+ |
+struct MasterConnectionManager::PendingConnectionInfo { |
+ // States: |
+ // - This is created upon a first "allow connect" (with |first| set |
+ // immediately). We then wait for a second "allow connect". |
+ // - After the second "allow connect" (and |second| is set), we wait for |
+ // "connects" from both |first| and |second|. |
+ // - We may then receive "connect" from either |first| or |second|, at which |
+ // which point it remains to wait for "connect" from the other. |
+ // I.e., the valid state transitions are: |
+ // AWAITING_SECOND_ALLOW_CONNECT -> AWAITING_CONNECTS_FROM_BOTH |
+ // -> {AWAITING_CONNECT_FROM_FIRST,AWAITING_CONNECT_FROM_SECOND} |
+ enum State { |
+ AWAITING_SECOND_ALLOW_CONNECT, |
+ AWAITING_CONNECTS_FROM_BOTH, |
+ AWAITING_CONNECT_FROM_FIRST, |
+ AWAITING_CONNECT_FROM_SECOND |
+ }; |
+ |
+ explicit PendingConnectionInfo(ProcessIdentifier first) |
+ : state(AWAITING_SECOND_ALLOW_CONNECT), |
+ first(first), |
+ second(kInvalidProcessIdentifier) { |
+ DCHECK_NE(first, kInvalidProcessIdentifier); |
+ } |
+ ~PendingConnectionInfo() {} |
+ |
+ State state; |
+ |
+ ProcessIdentifier first; |
+ ProcessIdentifier second; |
+ |
+ // Valid in AWAITING_CONNECT_FROM_{FIRST, SECOND} states. |
+ embedder::ScopedPlatformHandle remaining_handle; |
+}; |
+ |
+// MasterConnectionManager ----------------------------------------------------- |
+ |
+MasterConnectionManager::MasterConnectionManager() |
+ : creation_thread_task_runner_(base::MessageLoop::current()->task_runner()), |
+ master_process_delegate_(), |
+ private_thread_("MasterConnectionManagerPrivateThread"), |
+ next_process_identifier_(kFirstProcessIdentifier) { |
+ DCHECK(creation_thread_task_runner_); |
+ AssertOnCreationThread(); // Just make sure this assertion works correctly. |
+} |
+ |
+MasterConnectionManager::~MasterConnectionManager() { |
+ AssertOnCreationThread(); |
+ DCHECK(!master_process_delegate_); |
+ DCHECK(!private_thread_.message_loop()); |
+ DCHECK(helpers_.empty()); |
+ DCHECK(pending_connections_.empty()); |
+} |
+ |
+void MasterConnectionManager::Init( |
+ embedder::MasterProcessDelegate* master_process_delegate) { |
+ AssertOnCreationThread(); |
+ DCHECK(master_process_delegate); |
+ DCHECK(!master_process_delegate_); |
+ DCHECK(!private_thread_.message_loop()); |
+ |
+ master_process_delegate_ = master_process_delegate; |
+ CHECK(private_thread_.StartWithOptions( |
+ base::Thread::Options(base::MessageLoop::TYPE_IO, 0))); |
+} |
+ |
+void MasterConnectionManager::Shutdown() { |
+ AssertOnCreationThread(); |
+ DCHECK(master_process_delegate_); |
+ DCHECK(private_thread_.message_loop()); |
+ |
+ // The |Stop()| will actually finish all posted tasks. |
+ private_thread_.message_loop()->PostTask( |
+ FROM_HERE, base::Bind(&MasterConnectionManager::ShutdownOnPrivateThread, |
+ base::Unretained(this))); |
+ private_thread_.Stop(); |
+ DCHECK(helpers_.empty()); |
+ DCHECK(pending_connections_.empty()); |
+ master_process_delegate_ = nullptr; |
+} |
+ |
+void MasterConnectionManager::AddSlave( |
+ scoped_ptr<embedder::SlaveInfo> slave_info, |
+ embedder::ScopedPlatformHandle platform_handle) { |
+ // We don't really care if |slave_info| is non-null or not. |
+ DCHECK(platform_handle.is_valid()); |
+ AssertNotOnPrivateThread(); |
+ |
+ // We have to wait for the task to be executed, in case someone calls |
+ // |AddSlave()| followed immediately by |Shutdown()|. |
+ base::WaitableEvent event(false, false); |
+ private_thread_.message_loop()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&MasterConnectionManager::AddSlaveOnPrivateThread, |
+ base::Unretained(this), base::Passed(&slave_info), |
+ base::Passed(&platform_handle), base::Unretained(&event))); |
+ event.Wait(); |
+} |
+ |
+bool MasterConnectionManager::AllowConnect( |
+ const ConnectionIdentifier& connection_id) { |
+ AssertNotOnPrivateThread(); |
+ return AllowConnectImpl(kMasterProcessIdentifier, connection_id); |
+} |
+ |
+bool MasterConnectionManager::CancelConnect( |
+ const ConnectionIdentifier& connection_id) { |
+ AssertNotOnPrivateThread(); |
+ return CancelConnectImpl(kMasterProcessIdentifier, connection_id); |
+} |
+ |
+bool MasterConnectionManager::Connect( |
+ const ConnectionIdentifier& connection_id, |
+ ProcessIdentifier* peer_process_identifier, |
+ embedder::ScopedPlatformHandle* platform_handle) { |
+ AssertNotOnPrivateThread(); |
+ return ConnectImpl(kMasterProcessIdentifier, connection_id, |
+ peer_process_identifier, platform_handle); |
+} |
+ |
+bool MasterConnectionManager::AllowConnectImpl( |
+ ProcessIdentifier process_identifier, |
+ const ConnectionIdentifier& connection_id) { |
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier); |
+ |
+ base::AutoLock locker(lock_); |
+ |
+ auto it = pending_connections_.find(connection_id); |
+ if (it == pending_connections_.end()) { |
+ pending_connections_[connection_id] = |
+ new PendingConnectionInfo(process_identifier); |
+ // TODO(vtl): Track process identifier -> pending connections also (so these |
+ // can be removed efficiently if that process disconnects). |
+ DVLOG(1) << "New pending connection ID " << connection_id |
+ << ": AllowConnect() from first process identifier " |
+ << process_identifier; |
+ return true; |
+ } |
+ |
+ PendingConnectionInfo* info = it->second; |
+ if (info->state == PendingConnectionInfo::AWAITING_SECOND_ALLOW_CONNECT) { |
+ info->state = PendingConnectionInfo::AWAITING_CONNECTS_FROM_BOTH; |
+ info->second = process_identifier; |
+ DVLOG(1) << "Pending connection ID " << connection_id |
+ << ": AllowConnect() from second process identifier " |
+ << process_identifier; |
+ return true; |
+ } |
+ |
+ // Someone's behaving badly, but we don't know who (it might not be the |
+ // caller). |
+ LOG(ERROR) << "AllowConnect() from process " << process_identifier |
+ << " for connection ID " << connection_id << " already in state " |
+ << info->state; |
+ pending_connections_.erase(it); |
+ delete info; |
+ return false; |
+} |
+ |
+bool MasterConnectionManager::CancelConnectImpl( |
+ ProcessIdentifier process_identifier, |
+ const ConnectionIdentifier& connection_id) { |
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier); |
+ |
+ base::AutoLock locker(lock_); |
+ |
+ auto it = pending_connections_.find(connection_id); |
+ if (it == pending_connections_.end()) { |
+ // Not necessarily the caller's fault, and not necessarily an error. |
+ DVLOG(1) << "CancelConnect() from process " << process_identifier |
+ << " for connection ID " << connection_id |
+ << " which is not (or no longer) pending"; |
+ return true; |
+ } |
+ |
+ PendingConnectionInfo* info = it->second; |
+ if (process_identifier != info->first && process_identifier != info->second) { |
+ LOG(ERROR) << "CancelConnect() from process " << process_identifier |
+ << " for connection ID " << connection_id |
+ << " which is neither connectee"; |
+ return false; |
+ } |
+ |
+ // Just erase it. The other side may also try to cancel, in which case it'll |
+ // "fail" in the first if statement above (we assume that connection IDs never |
+ // collide, so there's no need to carefully track both sides). |
+ pending_connections_.erase(it); |
+ delete info; |
+ return true; |
+} |
+ |
+bool MasterConnectionManager::ConnectImpl( |
+ ProcessIdentifier process_identifier, |
+ const ConnectionIdentifier& connection_id, |
+ ProcessIdentifier* peer_process_identifier, |
+ embedder::ScopedPlatformHandle* platform_handle) { |
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier); |
+ DCHECK(peer_process_identifier); |
+ DCHECK(platform_handle); |
+ DCHECK(!platform_handle->is_valid()); // Not technically wrong, but unlikely. |
+ |
+ base::AutoLock locker(lock_); |
+ |
+ auto it = pending_connections_.find(connection_id); |
+ if (it == pending_connections_.end()) { |
+ // Not necessarily the caller's fault. |
+ LOG(ERROR) << "Connect() from process " << process_identifier |
+ << " for connection ID " << connection_id |
+ << " which is not pending"; |
+ return false; |
+ } |
+ |
+ PendingConnectionInfo* info = it->second; |
+ if (info->state == PendingConnectionInfo::AWAITING_CONNECTS_FROM_BOTH) { |
+ DCHECK(!info->remaining_handle.is_valid()); |
+ |
+ if (process_identifier == info->first) { |
+ info->state = PendingConnectionInfo::AWAITING_CONNECT_FROM_SECOND; |
+ *peer_process_identifier = info->second; |
+ } else if (process_identifier == info->second) { |
+ info->state = PendingConnectionInfo::AWAITING_CONNECT_FROM_FIRST; |
+ *peer_process_identifier = info->first; |
+ } else { |
+ LOG(ERROR) << "Connect() from process " << process_identifier |
+ << " for connection ID " << connection_id |
+ << " which is neither connectee"; |
+ return false; |
+ } |
+ |
+ if (info->first == info->second) { |
+ platform_handle->reset(); |
+ DCHECK(!info->remaining_handle.is_valid()); |
+ } else { |
+ embedder::PlatformChannelPair platform_channel_pair; |
+ *platform_handle = platform_channel_pair.PassServerHandle(); |
+ DCHECK(platform_handle->is_valid()); |
+ info->remaining_handle = platform_channel_pair.PassClientHandle(); |
+ DCHECK(info->remaining_handle.is_valid()); |
+ } |
+ DVLOG(1) << "Connection ID " << connection_id |
+ << ": first Connect() from process identifier " |
+ << process_identifier; |
+ return true; |
+ } |
+ |
+ ProcessIdentifier remaining_connectee; |
+ ProcessIdentifier peer; |
+ if (info->state == PendingConnectionInfo::AWAITING_CONNECT_FROM_FIRST) { |
+ remaining_connectee = info->first; |
+ peer = info->second; |
+ } else if (info->state == |
+ PendingConnectionInfo::AWAITING_CONNECT_FROM_SECOND) { |
+ remaining_connectee = info->second; |
+ peer = info->first; |
+ } else { |
+ // Someone's behaving badly, but we don't know who (it might not be the |
+ // caller). |
+ LOG(ERROR) << "Connect() from process " << process_identifier |
+ << " for connection ID " << connection_id << " in state " |
+ << info->state; |
+ pending_connections_.erase(it); |
+ delete info; |
+ return false; |
+ } |
+ |
+ if (process_identifier != remaining_connectee) { |
+ LOG(ERROR) << "Connect() from process " << process_identifier |
+ << " for connection ID " << connection_id |
+ << " which is not the remaining connectee"; |
+ pending_connections_.erase(it); |
+ delete info; |
+ return false; |
+ } |
+ |
+ *peer_process_identifier = peer; |
+ *platform_handle = info->remaining_handle.Pass(); |
+ DCHECK((info->first == info->second) ^ platform_handle->is_valid()); |
+ pending_connections_.erase(it); |
+ delete info; |
+ DVLOG(1) << "Connection ID " << connection_id |
+ << ": second Connect() from process identifier " |
+ << process_identifier; |
+ return true; |
+} |
+ |
+void MasterConnectionManager::ShutdownOnPrivateThread() { |
+ AssertOnPrivateThread(); |
+ |
+ if (!pending_connections_.empty()) { |
+ DVLOG(1) << "Shutting down with connections pending"; |
+ for (auto& p : pending_connections_) |
+ delete p.second; |
+ pending_connections_.clear(); |
+ } |
+ |
+ if (!helpers_.empty()) { |
+ DVLOG(1) << "Shutting down with slaves still connected"; |
+ for (auto& p : helpers_) { |
+ scoped_ptr<embedder::SlaveInfo> slave_info = p.second->Shutdown(); |
+ delete p.second; |
+ CallOnSlaveDisconnect(slave_info.Pass()); |
+ } |
+ helpers_.clear(); |
+ } |
+} |
+ |
+void MasterConnectionManager::AddSlaveOnPrivateThread( |
+ scoped_ptr<embedder::SlaveInfo> slave_info, |
+ embedder::ScopedPlatformHandle platform_handle, |
+ base::WaitableEvent* event) { |
+ DCHECK(platform_handle.is_valid()); |
+ DCHECK(event); |
+ AssertOnPrivateThread(); |
+ |
+ CHECK_NE(next_process_identifier_, kMasterProcessIdentifier); |
+ ProcessIdentifier process_identifier = next_process_identifier_; |
+ next_process_identifier_++; |
+ |
+ scoped_ptr<Helper> helper(new Helper( |
+ this, process_identifier, slave_info.Pass(), platform_handle.Pass())); |
+ helper->Init(); |
+ |
+ DCHECK(helpers_.find(process_identifier) == helpers_.end()); |
+ helpers_[process_identifier] = helper.release(); |
+ |
+ DVLOG(1) << "Added process identifier " << process_identifier; |
+ event->Signal(); |
+} |
+ |
+void MasterConnectionManager::OnError(ProcessIdentifier process_identifier) { |
+ DCHECK_NE(process_identifier, kInvalidProcessIdentifier); |
+ AssertOnPrivateThread(); |
+ |
+ auto it = helpers_.find(process_identifier); |
+ DCHECK(it != helpers_.end()); |
+ Helper* helper = it->second; |
+ scoped_ptr<embedder::SlaveInfo> slave_info = helper->Shutdown(); |
+ helpers_.erase(it); |
+ delete helper; |
+ |
+ { |
+ base::AutoLock locker(lock_); |
+ |
+ // TODO(vtl): This isn't very efficient. |
+ for (auto it = pending_connections_.begin(); |
+ it != pending_connections_.end();) { |
+ if (it->second->first == process_identifier || |
+ it->second->second == process_identifier) { |
+ auto it_to_erase = it; |
+ ++it; |
+ delete it_to_erase->second; |
+ pending_connections_.erase(it_to_erase); |
+ } else { |
+ ++it; |
+ } |
+ } |
+ } |
+ |
+ CallOnSlaveDisconnect(slave_info.Pass()); |
+} |
+ |
+void MasterConnectionManager::CallOnSlaveDisconnect( |
+ scoped_ptr<embedder::SlaveInfo> slave_info) { |
+ AssertOnPrivateThread(); |
+ DCHECK(master_process_delegate_); |
+ creation_thread_task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&embedder::MasterProcessDelegate::OnSlaveDisconnect, |
+ base::Unretained(master_process_delegate_), |
+ base::Passed(&slave_info))); |
+} |
+ |
+void MasterConnectionManager::AssertOnCreationThread() const { |
+ DCHECK(base::MessageLoop::current()); |
+ DCHECK_EQ(base::MessageLoop::current()->task_runner(), |
+ creation_thread_task_runner_); |
+} |
+ |
+void MasterConnectionManager::AssertNotOnPrivateThread() const { |
+ // This should only be called after |Init()| and before |Shutdown()|. (If not, |
+ // the subsequent |DCHECK_NE()| is invalid, since the current thread may not |
+ // have a message loop.) |
+ DCHECK(private_thread_.message_loop()); |
+ DCHECK_NE(base::MessageLoop::current(), private_thread_.message_loop()); |
+} |
+ |
+void MasterConnectionManager::AssertOnPrivateThread() const { |
+ // This should only be called after |Init()| and before |Shutdown()|. |
+ DCHECK(private_thread_.message_loop()); |
+ DCHECK_EQ(base::MessageLoop::current(), private_thread_.message_loop()); |
+} |
+ |
+} // namespace system |
+} // namespace mojo |