Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..75ed6cd61409d13a7ab7d835e8f7c088eea61830 |
--- /dev/null |
+++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
@@ -0,0 +1,492 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
+ |
+#include "base/bind.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/stl_util.h" |
+#include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
+ |
+namespace mojo { |
+namespace internal { |
+ |
+// InterfaceEndpoint stores the endpoint of an interface endpoint registered |
+// with |
sky
2015/11/19 17:16:27
nit: fix wrapping.
yzshen1
2015/11/19 22:00:40
Done.
|
+// the router. Always accessed under the router's lock. |
sky
2015/11/19 17:16:27
To help debugging could this object take the lock
yzshen1
2015/11/19 22:00:40
Done. I added the asserts.
It adds a little cost
|
+// |
+// On creation, this object holds two refs to itself. Those refs are released |
+// when SetClosedMayDestruct() and SetPeerClosedMayDestruct() are called. |
+// Other than that, the task queue in the router may hold refs to it, too. |
+class MultiplexRouter::InterfaceEndpoint |
+ : public base::RefCounted<InterfaceEndpoint> { |
+ public: |
+ InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
+ : router_(router), |
+ id_(id), |
+ closed_(false), |
+ peer_closed_(false), |
+ client_(nullptr) { |
+ AddRef(); |
+ AddRef(); |
sky
2015/11/19 17:16:27
I suspect a scoped_refptr to this member would be
yzshen1
2015/11/19 22:00:40
Done.
|
+ } |
+ |
+ InterfaceId id() const { return id_; } |
+ |
+ bool closed() const { return closed_; } |
+ void SetClosedMayDestruct() { |
+ if (closed_) |
+ return; |
+ |
+ closed_ = true; |
+ Release(); |
+ } |
+ |
+ bool peer_closed() const { return peer_closed_; } |
+ void SetPeerClosedMayDestruct() { |
+ if (peer_closed_) |
+ return; |
+ |
+ peer_closed_ = true; |
+ Release(); |
+ } |
+ |
+ const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { |
+ return task_runner_; |
+ } |
+ void set_task_runner( |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
+ task_runner_ = task_runner.Pass(); |
+ } |
+ |
+ InterfaceEndpointClient* client() const { return client_; } |
+ void set_client(InterfaceEndpointClient* client) { client_ = client; } |
+ |
+ private: |
+ friend class base::RefCounted<InterfaceEndpoint>; |
+ |
+ ~InterfaceEndpoint() { |
+ DCHECK(!client_); |
+ router_->OnEndpointDestructed(id_); |
+ } |
+ |
+ MultiplexRouter* const router_; |
+ const InterfaceId id_; |
+ |
+ // Whether the endpoint has been closed. |
+ bool closed_; |
+ // Whether the peer endpoint has been closed. |
+ bool peer_closed_; |
+ |
+ // The task runner on which |client_| can be accessed. |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
+ // Not owned. It is null if no client is attached to this endpoint. |
+ InterfaceEndpointClient* client_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
+}; |
+ |
+struct MultiplexRouter::Task { |
+ public: |
+ // Doesn't take ownership of |message| but takes its contents. |
+ static Task* CreateIncomingMessageTask(Message* message) { |
+ Task* task = new Task(); |
+ task->message.reset(new Message); |
+ message->MoveTo(task->message.get()); |
+ return task; |
+ } |
+ static Task* CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
+ Task* task = new Task(); |
+ task->endpoint_to_notify = endpoint; |
+ return task; |
+ } |
+ |
+ ~Task() {} |
+ |
+ bool IsIncomingMessageTask() const { return !!message; } |
+ bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } |
+ |
+ scoped_ptr<Message> message; |
+ scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
+ |
+ private: |
+ Task() {} |
+}; |
+ |
+MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
+ ScopedMessagePipeHandle message_pipe, |
+ const MojoAsyncWaiter* waiter) |
+ : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() |
+ ->task_runner()), |
+ set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
+ header_validator_(this), |
+ connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), |
+ control_message_handler_(this), |
+ control_message_proxy_(&connector_), |
+ next_interface_id_value_(1), |
+ testing_mode_(false) { |
+ connector_.set_incoming_receiver(&header_validator_); |
+ connector_.set_connection_error_handler( |
+ [this]() { OnPipeConnectionError(); }); |
+} |
+ |
+MultiplexRouter::~MultiplexRouter() { |
+ base::AutoLock locker(lock_); |
+ |
+ while (!tasks_.empty()) { |
+ delete tasks_.front(); |
+ tasks_.pop_front(); |
+ } |
+ |
+ auto iter = endpoints_.begin(); |
sky
2015/11/19 17:16:27
Use a for loop (you can still increment inside the
yzshen1
2015/11/19 22:00:40
Done.
|
+ while (iter != endpoints_.end()) { |
+ InterfaceEndpoint* endpoint = iter->second; |
+ ++iter; |
sky
2015/11/19 17:16:27
This is subtle and worth a comment.
yzshen1
2015/11/19 22:00:40
Done.
|
+ |
+ DCHECK(endpoint->closed()); |
+ endpoint->SetPeerClosedMayDestruct(); |
+ } |
+ |
+ DCHECK(endpoints_.empty()); |
+} |
+ |
+void MultiplexRouter::CreateEndpointHandlePair( |
+ ScopedInterfaceEndpointHandle* local_endpoint, |
+ ScopedInterfaceEndpointHandle* remote_endpoint) { |
+ base::AutoLock locker(lock_); |
+ uint32_t id = 0; |
+ do { |
+ if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) |
+ next_interface_id_value_ = 1; |
+ id = next_interface_id_value_++; |
+ if (set_interface_id_namespace_bit_) |
+ id |= kInterfaceIdNamespaceMask; |
+ } while (ContainsKey(endpoints_, id)); |
+ |
+ endpoints_[id] = new InterfaceEndpoint(this, id); |
+ *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); |
+ *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); |
+} |
+ |
+ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
+ InterfaceId id) { |
+ if (!IsValidInterfaceId(id)) |
+ return ScopedInterfaceEndpointHandle(); |
+ |
+ base::AutoLock locker(lock_); |
+ if (ContainsKey(endpoints_, id)) { |
+ // If the endpoint already exist, it is because we have received a |
sky
2015/11/19 17:16:27
It seems weird that you have to detect this case i
yzshen1
2015/11/19 22:00:40
This is legitimate:
this method is a public method
|
+ // notification that the peer endpoint has closed. |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ CHECK(!endpoint->closed()); |
+ CHECK(endpoint->peer_closed()); |
+ } else { |
+ endpoints_[id] = new InterfaceEndpoint(this, id); |
+ } |
+ return ScopedInterfaceEndpointHandle(id, true, this); |
+} |
+ |
+void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
+ if (!IsValidInterfaceId(id)) |
+ return; |
+ |
+ base::AutoLock locker(lock_); |
+ |
+ if (!is_local) { |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ DCHECK(!IsMasterInterfaceId(id)); |
+ |
+ // We will receive a NotifyPeerEndpointClosed message from the other side. |
+ control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
+ |
+ return; |
+ } |
+ |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ DCHECK(!endpoint->client()); |
+ DCHECK(!endpoint->closed()); |
+ endpoint->SetClosedMayDestruct(); |
+ |
+ if (!IsMasterInterfaceId(id)) |
+ control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ |
+ ProcessTasks(true); |
+} |
+ |
+void MultiplexRouter::AttachEndpointClient( |
+ const ScopedInterfaceEndpointHandle& handle, |
+ InterfaceEndpointClient* client) { |
+ const InterfaceId id = handle.id(); |
+ |
+ DCHECK(IsValidInterfaceId(id)); |
+ DCHECK(client); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ DCHECK(!endpoint->client()); |
+ DCHECK(!endpoint->closed()); |
+ |
+ endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); |
+ endpoint->set_client(client); |
+ |
+ if (endpoint->peer_closed()) |
+ tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
+ ProcessTasks(true); |
+} |
+ |
+void MultiplexRouter::DetachEndpointClient( |
+ const ScopedInterfaceEndpointHandle& handle) { |
+ const InterfaceId id = handle.id(); |
+ |
+ DCHECK(IsValidInterfaceId(id)); |
+ |
+ base::AutoLock locker(lock_); |
+ DCHECK(ContainsKey(endpoints_, id)); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ DCHECK(endpoint->client()); |
+ DCHECK(!endpoint->closed()); |
+ |
+ endpoint->set_task_runner(nullptr); |
+ endpoint->set_client(nullptr); |
+} |
+ |
+bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
+ Message* message) { |
+ const InterfaceId id = handle.id(); |
+ |
+ base::AutoLock locker(lock_); |
+ if (!ContainsKey(endpoints_, id)) |
+ return false; |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ if (endpoint->peer_closed()) |
+ return false; |
+ |
+ message->set_interface_id(id); |
+ return connector_.Accept(message); |
+} |
+ |
+void MultiplexRouter::RaiseError() { |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ connector_.RaiseError(); |
+ } else { |
+ task_runner_->PostTask(FROM_HERE, |
+ base::Bind(&MultiplexRouter::RaiseError, this)); |
+ } |
+} |
+ |
+ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ { |
+ base::AutoLock locker(lock_); |
+ DCHECK(endpoints_.empty() || (endpoints_.size() == 1 && |
+ ContainsKey(endpoints_, kMasterInterfaceId))); |
+ } |
+ return connector_.PassMessagePipe(); |
+} |
+ |
+void MultiplexRouter::EnableTestingMode() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ base::AutoLock locker(lock_); |
+ |
+ testing_mode_ = true; |
+ connector_.set_enforce_errors_from_incoming_receiver(false); |
+} |
+ |
+bool MultiplexRouter::Accept(Message* message) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ scoped_refptr<MultiplexRouter> protector(this); |
+ base::AutoLock locker(lock_); |
+ tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
+ ProcessTasks(false); |
+ |
+ // Always return true. If we see errors during message processing, we will |
+ // explicitly call Connector::RaiseError() to disconnect the message pipe. |
+ return true; |
+} |
+ |
+bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
+ lock_.AssertAcquired(); |
+ |
+ if (IsMasterInterfaceId(id)) |
+ return false; |
+ |
+ if (!ContainsKey(endpoints_, id)) |
+ endpoints_[id] = new InterfaceEndpoint(this, id); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ DCHECK(!endpoint->peer_closed()); |
+ |
+ if (endpoint->client()) |
+ tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
+ endpoint->SetPeerClosedMayDestruct(); |
+ |
+ // No need to trigger a ProcessTasks() because it is already on the stack. |
+ |
+ return true; |
+} |
+ |
+bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
+ lock_.AssertAcquired(); |
+ |
+ if (IsMasterInterfaceId(id)) |
+ return false; |
+ |
+ if (!ContainsKey(endpoints_, id)) |
+ endpoints_[id] = new InterfaceEndpoint(this, id); |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ DCHECK(!endpoint->closed()); |
+ endpoint->SetClosedMayDestruct(); |
+ |
+ control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ |
+ return true; |
+} |
+ |
+void MultiplexRouter::OnPipeConnectionError() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ scoped_refptr<MultiplexRouter> protector(this); |
+ base::AutoLock locker(lock_); |
+ |
+ auto iter = endpoints_.begin(); |
+ while (iter != endpoints_.end()) { |
+ InterfaceEndpoint* endpoint = iter->second; |
+ ++iter; |
+ |
+ if (endpoint->client()) |
+ tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
+ |
+ endpoint->SetPeerClosedMayDestruct(); |
+ } |
+ |
+ ProcessTasks(false); |
+} |
+ |
+void MultiplexRouter::ProcessTasks(bool force_async) { |
+ lock_.AssertAcquired(); |
+ |
+ while (!tasks_.empty()) { |
+ scoped_ptr<Task> task(tasks_.front()); |
+ tasks_.pop_front(); |
+ |
+ bool processed = task->IsNotifyErrorTask() |
+ ? ProcessNotifyErrorTask(task.get(), &force_async) |
+ : ProcessIncomingMessageTask(task.get(), &force_async); |
+ |
+ if (!processed) { |
+ tasks_.push_front(task.release()); |
+ break; |
+ } |
+ } |
+} |
+ |
+bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { |
+ lock_.AssertAcquired(); |
+ InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
+ if (!endpoint->client()) |
+ return true; |
+ |
+ if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
+ endpoint->task_runner()->PostTask( |
+ FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
+ return false; |
+ } |
+ |
+ *force_async = true; |
+ InterfaceEndpointClient* client = endpoint->client(); |
+ { |
+ base::AutoUnlock unlocker(lock_); |
+ client->NotifyError(); |
sky
2015/11/19 17:16:27
Can bad things happen between the time you unlock
yzshen1
2015/11/19 22:00:40
|client| lives on a single thread. NotifyError() a
|
+ } |
+ return true; |
+} |
+ |
+bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, |
+ bool* force_async) { |
+ lock_.AssertAcquired(); |
+ Message* message = task->message.get(); |
+ |
+ if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
+ if (!control_message_handler_.Accept(message)) |
+ RaiseErrorInNonTestingMode(); |
+ return true; |
+ } |
+ |
+ InterfaceId id = message->interface_id(); |
+ DCHECK(IsValidInterfaceId(id)); |
+ |
+ if (!ContainsKey(endpoints_, id)) { |
+ DCHECK(!IsMasterInterfaceId(id)); |
+ |
+ // Currently, it is legitimate to receive messages for an endpoint |
+ // that is not registered. For example, the endpoint is transferred in |
+ // a message that is discarded. Once we add support to specify all |
+ // enclosing endpoints in message header, we should be able to remove |
+ // this. |
+ InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
+ endpoints_[id] = endpoint; |
+ endpoint->SetClosedMayDestruct(); |
+ |
+ control_message_proxy_.NotifyPeerEndpointClosed(id); |
+ return true; |
+ } |
+ |
+ InterfaceEndpoint* endpoint = endpoints_[id]; |
+ if (endpoint->closed()) |
+ return true; |
+ |
+ if (!endpoint->client()) { |
+ // We need to wait until a client is attached in order to dispatch further |
+ // messages. |
+ return false; |
+ } |
+ |
+ if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
+ endpoint->task_runner()->PostTask( |
+ FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
+ return false; |
+ } |
+ |
+ *force_async = true; |
+ InterfaceEndpointClient* client = endpoint->client(); |
+ scoped_ptr<Message> owned_message = task->message.Pass(); |
+ bool result = false; |
+ { |
+ base::AutoUnlock unlocker(lock_); |
+ result = client->HandleIncomingMessage(owned_message.get()); |
sky
2015/11/19 17:16:27
Similar question to above about unlocking.
yzshen1
2015/11/19 22:00:40
(Please see my comment for the previous question.)
|
+ } |
+ if (!result) |
+ RaiseErrorInNonTestingMode(); |
+ |
+ return true; |
+} |
+ |
+void MultiplexRouter::LockAndCallProcessTasks() { |
+ // There is no need to hold a ref to this class in this case because this is |
+ // always called using base::Bind(), which holds a ref. |
+ base::AutoLock locker(lock_); |
+ ProcessTasks(false); |
+} |
+ |
+void MultiplexRouter::OnEndpointDestructed(InterfaceId id) { |
+ lock_.AssertAcquired(); |
+ endpoints_.erase(id); |
+} |
+ |
+void MultiplexRouter::RaiseErrorInNonTestingMode() { |
+ lock_.AssertAcquired(); |
+ if (!testing_mode_) |
+ RaiseError(); |
+} |
+ |
+} // namespace internal |
+} // namespace mojo |