Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a2b057be9fbd7d3389254dca2cb25f3924d57156 |
--- /dev/null |
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
@@ -0,0 +1,519 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
+ |
+#include "base/bind.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/stl_util.h" |
+#include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
+ |
+namespace mojo { |
+namespace internal { |
+ |
+// InterfaceEndpoint stores the information of an interface endpoint registered |
+// with the router. Always accessed under the router's lock. |
+// No one other than the router's |endpoints_| and |tasks_| should hold refs to |
+// this object. |
+class MultiplexRouter::InterfaceEndpoint |
+ : public base::RefCounted<InterfaceEndpoint> { |
+ public: |
+ InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
+ : router_lock_(&router->lock_), |
+ id_(id), |
+ closed_(false), |
+ peer_closed_(false), |
+ client_(nullptr) { |
+ router_lock_->AssertAcquired(); |
+ } |
+ |
+ InterfaceId id() const { return id_; } |
+ |
+ bool closed() const { return closed_; } |
+ void set_closed() { |
+ router_lock_->AssertAcquired(); |
+ closed_ = true; |
+ } |
+ |
+ bool peer_closed() const { return peer_closed_; } |
+ void set_peer_closed() { |
+ router_lock_->AssertAcquired(); |
+ peer_closed_ = true; |
+ } |
+ |
+ const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { |
+ return task_runner_; |
+ } |
+ void set_task_runner( |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
+ router_lock_->AssertAcquired(); |
+ task_runner_ = task_runner.Pass(); |
+ } |
+ |
+ InterfaceEndpointClient* client() const { return client_; } |
+ void set_client(InterfaceEndpointClient* client) { |
+ router_lock_->AssertAcquired(); |
+ client_ = client; |
+ } |
+ |
+ private: |
+ friend class base::RefCounted<InterfaceEndpoint>; |
+ |
+ ~InterfaceEndpoint() { |
+ router_lock_->AssertAcquired(); |
+ |
+ DCHECK(!client_); |
+ DCHECK(closed_); |
+ DCHECK(peer_closed_); |
+ } |
+ |
+ base::Lock* const router_lock_; |
+ const InterfaceId id_; |
+ |
+ // Whether the endpoint has been closed. |
+ bool closed_; |
+ // Whether the peer endpoint has been closed. |
+ bool peer_closed_; |
+ |
+ // The task runner on which |client_| can be accessed. |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
+ // Not owned. It is null if no client is attached to this endpoint. |
+ InterfaceEndpointClient* client_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
+}; |
+ |
+struct MultiplexRouter::Task { |
+ public: |
+ // Doesn't take ownership of |message| but takes its contents. |
+ static scoped_ptr<Task> CreateIncomingMessageTask(Message* message) { |
+ Task* task = new Task(); |
+ task->message.reset(new Message); |
+ message->MoveTo(task->message.get()); |
+ return make_scoped_ptr(task); |
+ } |
+ static scoped_ptr<Task> CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
+ Task* task = new Task(); |
+ task->endpoint_to_notify = endpoint; |
+ return make_scoped_ptr(task); |
+ } |
+ |
+ ~Task() {} |
+ |
+ bool IsIncomingMessageTask() const { return !!message; } |
+ bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } |
+ |
+ scoped_ptr<Message> message; |
+ scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
+ |
+ private: |
+ Task() {} |
+}; |
+ |
+MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
+ ScopedMessagePipeHandle message_pipe, |
+ const MojoAsyncWaiter* waiter) |
+ : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() |
+ ->task_runner()), |
+ set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
+ header_validator_(this), |
+ connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), |
+ encountered_error_(false), |
+ control_message_handler_(this), |
+ control_message_proxy_(&connector_), |
+ next_interface_id_value_(1), |
+ testing_mode_(false) { |
+ connector_.set_incoming_receiver(&header_validator_); |
+ connector_.set_connection_error_handler( |
+ [this]() { OnPipeConnectionError(); }); |
+} |
+ |
+MultiplexRouter::~MultiplexRouter() { |
+ base::AutoLock locker(lock_); |
+ |
+ tasks_.clear(); |
+ |
+ for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
+ InterfaceEndpoint* endpoint = iter->second.get(); |
+ // Increment the iterator before calling UpdateEndpointStateMayRemove() |
+ // because it may remove the corresponding value from the map. |
+ ++iter; |
+ |
+ DCHECK(endpoint->closed()); |
+ UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
+ } |
+ |
+ DCHECK(endpoints_.empty()); |
+} |
+ |
+void MultiplexRouter::CreateEndpointHandlePair( |
+ ScopedInterfaceEndpointHandle* local_endpoint, |
+ ScopedInterfaceEndpointHandle* remote_endpoint) { |
+ base::AutoLock locker(lock_); |
+ uint32_t id = 0; |
+ do { |
+ if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) |
+ next_interface_id_value_ = 1; |
+ id = next_interface_id_value_++; |
+ if (set_interface_id_namespace_bit_) |
+ id |= kInterfaceIdNamespaceMask; |
+ } while (ContainsKey(endpoints_, id)); |
+ |
+ InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
+ endpoints_[id] = endpoint; |
+ if (encountered_error_) |
+ UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
+ |
+ *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); |
+ *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); |
+} |
+ |
+ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
+ InterfaceId id) { |
+ if (!IsValidInterfaceId(id)) |
+ return ScopedInterfaceEndpointHandle(); |
+ |
+ base::AutoLock locker(lock_); |
+ if (ContainsKey(endpoints_, id)) { |
+ // If the endpoint already exist, it is because we have received a |
+ // notification that the peer endpoint has closed. |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ CHECK(!endpoint->closed()); |
+ CHECK(endpoint->peer_closed()); |
+ } else { |
+ InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
+ endpoints_[id] = endpoint; |
+ if (encountered_error_) |
+ UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
+ } |
+ return ScopedInterfaceEndpointHandle(id, true, this); |
+} |
+ |
+void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
+ if (!IsValidInterfaceId(id)) |
+ return; |
+ |
+ base::AutoLock locker(lock_); |
+ |
+ if (!is_local) { |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ DCHECK(!IsMasterInterfaceId(id)); |
+ |
+ // We will receive a NotifyPeerEndpointClosed message from the other side. |
+ control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
+ |
+ return; |
+ } |
+ |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ DCHECK(!endpoint->client()); |
+ DCHECK(!endpoint->closed()); |
+ UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
+ |
+ if (!IsMasterInterfaceId(id)) |
+ control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ |
+ ProcessTasks(true); |
+} |
+ |
+void MultiplexRouter::AttachEndpointClient( |
+ const ScopedInterfaceEndpointHandle& handle, |
+ InterfaceEndpointClient* client) { |
+ const InterfaceId id = handle.id(); |
+ |
+ DCHECK(IsValidInterfaceId(id)); |
+ DCHECK(client); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ DCHECK(!endpoint->client()); |
+ DCHECK(!endpoint->closed()); |
+ |
+ endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); |
+ endpoint->set_client(client); |
+ |
+ if (endpoint->peer_closed()) |
+ tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
+ ProcessTasks(true); |
+} |
+ |
+void MultiplexRouter::DetachEndpointClient( |
+ const ScopedInterfaceEndpointHandle& handle) { |
+ const InterfaceId id = handle.id(); |
+ |
+ DCHECK(IsValidInterfaceId(id)); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ DCHECK(endpoint->client()); |
+ DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
+ DCHECK(!endpoint->closed()); |
+ |
+ endpoint->set_task_runner(nullptr); |
+ endpoint->set_client(nullptr); |
+} |
+ |
+bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
+ Message* message) { |
+ const InterfaceId id = handle.id(); |
+ |
+ base::AutoLock locker(lock_); |
+ if (!ContainsKey(endpoints_, id)) |
+ return false; |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ if (endpoint->peer_closed()) |
+ return false; |
+ |
+ message->set_interface_id(id); |
+ return connector_.Accept(message); |
+} |
+ |
+void MultiplexRouter::RaiseError() { |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ connector_.RaiseError(); |
+ } else { |
+ task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&MultiplexRouter::RaiseError, this)); |
+ } |
+} |
+ |
+ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ { |
+ base::AutoLock locker(lock_); |
+ DCHECK(endpoints_.empty() || (endpoints_.size() == 1 && |
+ ContainsKey(endpoints_, kMasterInterfaceId))); |
+ } |
+ return connector_.PassMessagePipe(); |
+} |
+ |
+void MultiplexRouter::EnableTestingMode() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ base::AutoLock locker(lock_); |
+ |
+ testing_mode_ = true; |
+ connector_.set_enforce_errors_from_incoming_receiver(false); |
+} |
+ |
+bool MultiplexRouter::Accept(Message* message) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ scoped_refptr<MultiplexRouter> protector(this); |
+ base::AutoLock locker(lock_); |
+ tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
+ ProcessTasks(false); |
+ |
+ // Always return true. If we see errors during message processing, we will |
+ // explicitly call Connector::RaiseError() to disconnect the message pipe. |
+ return true; |
+} |
+ |
+bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
+ lock_.AssertAcquired(); |
+ |
+ if (IsMasterInterfaceId(id)) |
+ return false; |
+ |
+ if (!ContainsKey(endpoints_, id)) |
+ endpoints_[id] = new InterfaceEndpoint(this, id); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ DCHECK(!endpoint->peer_closed()); |
+ |
+ if (endpoint->client()) |
+ tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
+ UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
+ |
+ // No need to trigger a ProcessTasks() because it is already on the stack. |
+ |
+ return true; |
+} |
+ |
+bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
+ lock_.AssertAcquired(); |
+ |
+ if (IsMasterInterfaceId(id)) |
+ return false; |
+ |
+ if (!ContainsKey(endpoints_, id)) |
+ endpoints_[id] = new InterfaceEndpoint(this, id); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ DCHECK(!endpoint->closed()); |
+ UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
+ |
+ control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ |
+ return true; |
+} |
+ |
+void MultiplexRouter::OnPipeConnectionError() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ scoped_refptr<MultiplexRouter> protector(this); |
+ base::AutoLock locker(lock_); |
+ |
+ encountered_error_ = true; |
+ |
+ for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { |
+ InterfaceEndpoint* endpoint = iter->second.get(); |
+ // Increment the iterator before calling UpdateEndpointStateMayRemove() |
+ // because it may remove the corresponding value from the map. |
+ ++iter; |
+ |
+ if (endpoint->client()) |
+ tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
+ |
+ UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
+ } |
+ |
+ ProcessTasks(false); |
+} |
+ |
+void MultiplexRouter::ProcessTasks(bool force_async) { |
+ lock_.AssertAcquired(); |
+ |
+ while (!tasks_.empty()) { |
+ scoped_ptr<Task> task(tasks_.front().Pass()); |
+ tasks_.pop_front(); |
+ |
+ bool processed = task->IsNotifyErrorTask() |
+ ? ProcessNotifyErrorTask(task.get(), &force_async) |
+ : ProcessIncomingMessageTask(task.get(), &force_async); |
+ |
+ if (!processed) { |
+ tasks_.push_front(task.Pass()); |
+ break; |
+ } |
+ } |
+} |
+ |
+bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { |
+ lock_.AssertAcquired(); |
+ InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
+ if (!endpoint->client()) |
+ return true; |
+ |
+ if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
+ endpoint->task_runner()->PostTask( |
+ FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
+ return false; |
+ } |
+ |
+ *force_async = true; |
+ InterfaceEndpointClient* client = endpoint->client(); |
+ { |
+ // We must unlock before calling into |client| because it may call this |
+ // object within NotifyError(). Holding the lock will lead to deadlock. |
+ // |
+ // It is safe to call into |client| without the lock. Because |client| is |
+ // always accessed on the same thread, including DetachEndpointClient(). |
+ base::AutoUnlock unlocker(lock_); |
+ client->NotifyError(); |
+ } |
+ return true; |
+} |
+ |
+bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, |
+ bool* force_async) { |
+ lock_.AssertAcquired(); |
+ Message* message = task->message.get(); |
+ |
+ if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
+ if (!control_message_handler_.Accept(message)) |
+ RaiseErrorInNonTestingMode(); |
+ return true; |
+ } |
+ |
+ InterfaceId id = message->interface_id(); |
+ DCHECK(IsValidInterfaceId(id)); |
+ |
+ if (!ContainsKey(endpoints_, id)) { |
+ DCHECK(!IsMasterInterfaceId(id)); |
+ |
+ // Currently, it is legitimate to receive messages for an endpoint |
+ // that is not registered. For example, the endpoint is transferred in |
+ // a message that is discarded. Once we add support to specify all |
+ // enclosing endpoints in message header, we should be able to remove |
+ // this. |
+ InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
+ endpoints_[id] = endpoint; |
+ UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
+ |
+ control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ return true; |
+ } |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id].get(); |
+ if (endpoint->closed()) |
+ return true; |
+ |
+ if (!endpoint->client()) { |
+ // We need to wait until a client is attached in order to dispatch further |
+ // messages. |
+ return false; |
+ } |
+ |
+ if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
+ endpoint->task_runner()->PostTask( |
+ FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
+ return false; |
+ } |
+ |
+ *force_async = true; |
+ InterfaceEndpointClient* client = endpoint->client(); |
+ scoped_ptr<Message> owned_message = task->message.Pass(); |
+ bool result = false; |
+ { |
+ // We must unlock before calling into |client| because it may call this |
+ // object within HandleIncomingMessage(). Holding the lock will lead to |
+ // deadlock. |
+ // |
+ // It is safe to call into |client| without the lock. Because |client| is |
+ // always accessed on the same thread, including DetachEndpointClient(). |
+ base::AutoUnlock unlocker(lock_); |
+ result = client->HandleIncomingMessage(owned_message.get()); |
+ } |
+ if (!result) |
+ RaiseErrorInNonTestingMode(); |
+ |
+ return true; |
+} |
+ |
+void MultiplexRouter::LockAndCallProcessTasks() { |
+ // There is no need to hold a ref to this class in this case because this is |
+ // always called using base::Bind(), which holds a ref. |
+ base::AutoLock locker(lock_); |
+ ProcessTasks(false); |
+} |
+ |
+void MultiplexRouter::UpdateEndpointStateMayRemove( |
+ InterfaceEndpoint* endpoint, |
+ EndpointStateUpdateType type) { |
+ switch (type) { |
+ case ENDPOINT_CLOSED: |
+ endpoint->set_closed(); |
+ break; |
+ case PEER_ENDPOINT_CLOSED: |
+ endpoint->set_peer_closed(); |
+ break; |
+ } |
+ if (endpoint->closed() && endpoint->peer_closed()) |
+ endpoints_.erase(endpoint->id()); |
+} |
+ |
+void MultiplexRouter::RaiseErrorInNonTestingMode() { |
+ lock_.AssertAcquired(); |
+ if (!testing_mode_) |
+ RaiseError(); |
+} |
+ |
+} // namespace internal |
+} // namespace mojo |