Chromium Code Reviews| Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
| diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..75ed6cd61409d13a7ab7d835e8f7c088eea61830 |
| --- /dev/null |
| +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| @@ -0,0 +1,492 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| + |
| +#include "base/bind.h" |
| +#include "base/memory/ref_counted.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "base/stl_util.h" |
| +#include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h" |
| + |
| +namespace mojo { |
| +namespace internal { |
| + |
| +// InterfaceEndpoint stores the endpoint of an interface endpoint registered |
| +// with |
|
sky
2015/11/19 17:16:27
nit: fix wrapping.
yzshen1
2015/11/19 22:00:40
Done.
|
| +// the router. Always accessed under the router's lock. |
|
sky
2015/11/19 17:16:27
To help debugging could this object take the lock
yzshen1
2015/11/19 22:00:40
Done. I added the asserts.
It adds a little cost
|
| +// |
| +// On creation, this object holds two refs to itself. Those refs are released |
| +// when SetClosedMayDestruct() and SetPeerClosedMayDestruct() are called. |
| +// Other than that, the task queue in the router may hold refs to it, too. |
| +class MultiplexRouter::InterfaceEndpoint |
| + : public base::RefCounted<InterfaceEndpoint> { |
| + public: |
| + InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
| + : router_(router), |
| + id_(id), |
| + closed_(false), |
| + peer_closed_(false), |
| + client_(nullptr) { |
| + AddRef(); |
| + AddRef(); |
|
sky
2015/11/19 17:16:27
I suspect a scoped_refptr to this member would be
yzshen1
2015/11/19 22:00:40
Done.
|
| + } |
| + |
| + InterfaceId id() const { return id_; } |
| + |
| + bool closed() const { return closed_; } |
| + void SetClosedMayDestruct() { |
| + if (closed_) |
| + return; |
| + |
| + closed_ = true; |
| + Release(); |
| + } |
| + |
| + bool peer_closed() const { return peer_closed_; } |
| + void SetPeerClosedMayDestruct() { |
| + if (peer_closed_) |
| + return; |
| + |
| + peer_closed_ = true; |
| + Release(); |
| + } |
| + |
| + const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const { |
| + return task_runner_; |
| + } |
| + void set_task_runner( |
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner) { |
| + task_runner_ = task_runner.Pass(); |
| + } |
| + |
| + InterfaceEndpointClient* client() const { return client_; } |
| + void set_client(InterfaceEndpointClient* client) { client_ = client; } |
| + |
| + private: |
| + friend class base::RefCounted<InterfaceEndpoint>; |
| + |
| + ~InterfaceEndpoint() { |
| + DCHECK(!client_); |
| + router_->OnEndpointDestructed(id_); |
| + } |
| + |
| + MultiplexRouter* const router_; |
| + const InterfaceId id_; |
| + |
| + // Whether the endpoint has been closed. |
| + bool closed_; |
| + // Whether the peer endpoint has been closed. |
| + bool peer_closed_; |
| + |
| + // The task runner on which |client_| can be accessed. |
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| + // Not owned. It is null if no client is attached to this endpoint. |
| + InterfaceEndpointClient* client_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
| +}; |
| + |
| +struct MultiplexRouter::Task { |
| + public: |
| + // Doesn't take ownership of |message| but takes its contents. |
| + static Task* CreateIncomingMessageTask(Message* message) { |
| + Task* task = new Task(); |
| + task->message.reset(new Message); |
| + message->MoveTo(task->message.get()); |
| + return task; |
| + } |
| + static Task* CreateNotifyErrorTask(InterfaceEndpoint* endpoint) { |
| + Task* task = new Task(); |
| + task->endpoint_to_notify = endpoint; |
| + return task; |
| + } |
| + |
| + ~Task() {} |
| + |
| + bool IsIncomingMessageTask() const { return !!message; } |
| + bool IsNotifyErrorTask() const { return !!endpoint_to_notify; } |
| + |
| + scoped_ptr<Message> message; |
| + scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| + |
| + private: |
| + Task() {} |
| +}; |
| + |
| +MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| + ScopedMessagePipeHandle message_pipe, |
| + const MojoAsyncWaiter* waiter) |
| + : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() |
| + ->task_runner()), |
| + set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| + header_validator_(this), |
| + connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), |
| + control_message_handler_(this), |
| + control_message_proxy_(&connector_), |
| + next_interface_id_value_(1), |
| + testing_mode_(false) { |
| + connector_.set_incoming_receiver(&header_validator_); |
| + connector_.set_connection_error_handler( |
| + [this]() { OnPipeConnectionError(); }); |
| +} |
| + |
| +MultiplexRouter::~MultiplexRouter() { |
| + base::AutoLock locker(lock_); |
| + |
| + while (!tasks_.empty()) { |
| + delete tasks_.front(); |
| + tasks_.pop_front(); |
| + } |
| + |
| + auto iter = endpoints_.begin(); |
|
sky
2015/11/19 17:16:27
Use a for loop (you can still increment inside the
yzshen1
2015/11/19 22:00:40
Done.
|
| + while (iter != endpoints_.end()) { |
| + InterfaceEndpoint* endpoint = iter->second; |
| + ++iter; |
|
sky
2015/11/19 17:16:27
This is subtle and worth a comment.
yzshen1
2015/11/19 22:00:40
Done.
|
| + |
| + DCHECK(endpoint->closed()); |
| + endpoint->SetPeerClosedMayDestruct(); |
| + } |
| + |
| + DCHECK(endpoints_.empty()); |
| +} |
| + |
| +void MultiplexRouter::CreateEndpointHandlePair( |
| + ScopedInterfaceEndpointHandle* local_endpoint, |
| + ScopedInterfaceEndpointHandle* remote_endpoint) { |
| + base::AutoLock locker(lock_); |
| + uint32_t id = 0; |
| + do { |
| + if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) |
| + next_interface_id_value_ = 1; |
| + id = next_interface_id_value_++; |
| + if (set_interface_id_namespace_bit_) |
| + id |= kInterfaceIdNamespaceMask; |
| + } while (ContainsKey(endpoints_, id)); |
| + |
| + endpoints_[id] = new InterfaceEndpoint(this, id); |
| + *local_endpoint = ScopedInterfaceEndpointHandle(id, true, this); |
| + *remote_endpoint = ScopedInterfaceEndpointHandle(id, false, this); |
| +} |
| + |
| +ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| + InterfaceId id) { |
| + if (!IsValidInterfaceId(id)) |
| + return ScopedInterfaceEndpointHandle(); |
| + |
| + base::AutoLock locker(lock_); |
| + if (ContainsKey(endpoints_, id)) { |
| + // If the endpoint already exist, it is because we have received a |
|
sky
2015/11/19 17:16:27
It seems weird that you have to detect this case i
yzshen1
2015/11/19 22:00:40
This is legitimate:
this method is a public method
|
| + // notification that the peer endpoint has closed. |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + CHECK(!endpoint->closed()); |
| + CHECK(endpoint->peer_closed()); |
| + } else { |
| + endpoints_[id] = new InterfaceEndpoint(this, id); |
| + } |
| + return ScopedInterfaceEndpointHandle(id, true, this); |
| +} |
| + |
| +void MultiplexRouter::CloseEndpointHandle(InterfaceId id, bool is_local) { |
| + if (!IsValidInterfaceId(id)) |
| + return; |
| + |
| + base::AutoLock locker(lock_); |
| + |
| + if (!is_local) { |
| + DCHECK(ContainsKey(endpoints_, id)); |
| + DCHECK(!IsMasterInterfaceId(id)); |
| + |
| + // We will receive a NotifyPeerEndpointClosed message from the other side. |
| + control_message_proxy_.NotifyEndpointClosedBeforeSent(id); |
| + |
| + return; |
| + } |
| + |
| + DCHECK(ContainsKey(endpoints_, id)); |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + DCHECK(!endpoint->client()); |
| + DCHECK(!endpoint->closed()); |
| + endpoint->SetClosedMayDestruct(); |
| + |
| + if (!IsMasterInterfaceId(id)) |
| + control_message_proxy_.NotifyPeerEndpointClosed(id); |
| + |
| + ProcessTasks(true); |
| +} |
| + |
| +void MultiplexRouter::AttachEndpointClient( |
| + const ScopedInterfaceEndpointHandle& handle, |
| + InterfaceEndpointClient* client) { |
| + const InterfaceId id = handle.id(); |
| + |
| + DCHECK(IsValidInterfaceId(id)); |
| + DCHECK(client); |
| + |
| + base::AutoLock locker(lock_); |
| + DCHECK(ContainsKey(endpoints_, id)); |
| + |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + DCHECK(!endpoint->client()); |
| + DCHECK(!endpoint->closed()); |
| + |
| + endpoint->set_task_runner(base::MessageLoop::current()->task_runner()); |
| + endpoint->set_client(client); |
| + |
| + if (endpoint->peer_closed()) |
| + tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| + ProcessTasks(true); |
| +} |
| + |
| +void MultiplexRouter::DetachEndpointClient( |
| + const ScopedInterfaceEndpointHandle& handle) { |
| + const InterfaceId id = handle.id(); |
| + |
| + DCHECK(IsValidInterfaceId(id)); |
| + |
| + base::AutoLock locker(lock_); |
| + DCHECK(ContainsKey(endpoints_, id)); |
| + |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + DCHECK(endpoint->client()); |
| + DCHECK(!endpoint->closed()); |
| + |
| + endpoint->set_task_runner(nullptr); |
| + endpoint->set_client(nullptr); |
| +} |
| + |
| +bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
| + Message* message) { |
| + const InterfaceId id = handle.id(); |
| + |
| + base::AutoLock locker(lock_); |
| + if (!ContainsKey(endpoints_, id)) |
| + return false; |
| + |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + if (endpoint->peer_closed()) |
| + return false; |
| + |
| + message->set_interface_id(id); |
| + return connector_.Accept(message); |
| +} |
| + |
| +void MultiplexRouter::RaiseError() { |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + connector_.RaiseError(); |
| + } else { |
| + task_runner_->PostTask(FROM_HERE, |
| + base::Bind(&MultiplexRouter::RaiseError, this)); |
| + } |
| +} |
| + |
| +ScopedMessagePipeHandle MultiplexRouter::PassMessagePipe() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + { |
| + base::AutoLock locker(lock_); |
| + DCHECK(endpoints_.empty() || (endpoints_.size() == 1 && |
| + ContainsKey(endpoints_, kMasterInterfaceId))); |
| + } |
| + return connector_.PassMessagePipe(); |
| +} |
| + |
| +void MultiplexRouter::EnableTestingMode() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + base::AutoLock locker(lock_); |
| + |
| + testing_mode_ = true; |
| + connector_.set_enforce_errors_from_incoming_receiver(false); |
| +} |
| + |
| +bool MultiplexRouter::Accept(Message* message) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + scoped_refptr<MultiplexRouter> protector(this); |
| + base::AutoLock locker(lock_); |
| + tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
| + ProcessTasks(false); |
| + |
| + // Always return true. If we see errors during message processing, we will |
| + // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| + return true; |
| +} |
| + |
| +bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| + lock_.AssertAcquired(); |
| + |
| + if (IsMasterInterfaceId(id)) |
| + return false; |
| + |
| + if (!ContainsKey(endpoints_, id)) |
| + endpoints_[id] = new InterfaceEndpoint(this, id); |
| + |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + DCHECK(!endpoint->peer_closed()); |
| + |
| + if (endpoint->client()) |
| + tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| + endpoint->SetPeerClosedMayDestruct(); |
| + |
| + // No need to trigger a ProcessTasks() because it is already on the stack. |
| + |
| + return true; |
| +} |
| + |
| +bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
| + lock_.AssertAcquired(); |
| + |
| + if (IsMasterInterfaceId(id)) |
| + return false; |
| + |
| + if (!ContainsKey(endpoints_, id)) |
| + endpoints_[id] = new InterfaceEndpoint(this, id); |
| + |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + DCHECK(!endpoint->closed()); |
| + endpoint->SetClosedMayDestruct(); |
| + |
| + control_message_proxy_.NotifyPeerEndpointClosed(id); |
| + |
| + return true; |
| +} |
| + |
| +void MultiplexRouter::OnPipeConnectionError() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + scoped_refptr<MultiplexRouter> protector(this); |
| + base::AutoLock locker(lock_); |
| + |
| + auto iter = endpoints_.begin(); |
| + while (iter != endpoints_.end()) { |
| + InterfaceEndpoint* endpoint = iter->second; |
| + ++iter; |
| + |
| + if (endpoint->client()) |
| + tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| + |
| + endpoint->SetPeerClosedMayDestruct(); |
| + } |
| + |
| + ProcessTasks(false); |
| +} |
| + |
| +void MultiplexRouter::ProcessTasks(bool force_async) { |
| + lock_.AssertAcquired(); |
| + |
| + while (!tasks_.empty()) { |
| + scoped_ptr<Task> task(tasks_.front()); |
| + tasks_.pop_front(); |
| + |
| + bool processed = task->IsNotifyErrorTask() |
| + ? ProcessNotifyErrorTask(task.get(), &force_async) |
| + : ProcessIncomingMessageTask(task.get(), &force_async); |
| + |
| + if (!processed) { |
| + tasks_.push_front(task.release()); |
| + break; |
| + } |
| + } |
| +} |
| + |
| +bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { |
| + lock_.AssertAcquired(); |
| + InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| + if (!endpoint->client()) |
| + return true; |
| + |
| + if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
| + endpoint->task_runner()->PostTask( |
| + FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| + return false; |
| + } |
| + |
| + *force_async = true; |
| + InterfaceEndpointClient* client = endpoint->client(); |
| + { |
| + base::AutoUnlock unlocker(lock_); |
| + client->NotifyError(); |
|
sky
2015/11/19 17:16:27
Can bad things happen between the time you unlock
yzshen1
2015/11/19 22:00:40
|client| lives on a single thread. NotifyError() a
|
| + } |
| + return true; |
| +} |
| + |
| +bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, |
| + bool* force_async) { |
| + lock_.AssertAcquired(); |
| + Message* message = task->message.get(); |
| + |
| + if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| + if (!control_message_handler_.Accept(message)) |
| + RaiseErrorInNonTestingMode(); |
| + return true; |
| + } |
| + |
| + InterfaceId id = message->interface_id(); |
| + DCHECK(IsValidInterfaceId(id)); |
| + |
| + if (!ContainsKey(endpoints_, id)) { |
| + DCHECK(!IsMasterInterfaceId(id)); |
| + |
| + // Currently, it is legitimate to receive messages for an endpoint |
| + // that is not registered. For example, the endpoint is transferred in |
| + // a message that is discarded. Once we add support to specify all |
| + // enclosing endpoints in message header, we should be able to remove |
| + // this. |
| + InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| + endpoints_[id] = endpoint; |
| + endpoint->SetClosedMayDestruct(); |
| + |
| + control_message_proxy_.NotifyPeerEndpointClosed(id); |
| + return true; |
| + } |
| + |
| + InterfaceEndpoint* endpoint = endpoints_[id]; |
| + if (endpoint->closed()) |
| + return true; |
| + |
| + if (!endpoint->client()) { |
| + // We need to wait until a client is attached in order to dispatch further |
| + // messages. |
| + return false; |
| + } |
| + |
| + if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
| + endpoint->task_runner()->PostTask( |
| + FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| + return false; |
| + } |
| + |
| + *force_async = true; |
| + InterfaceEndpointClient* client = endpoint->client(); |
| + scoped_ptr<Message> owned_message = task->message.Pass(); |
| + bool result = false; |
| + { |
| + base::AutoUnlock unlocker(lock_); |
| + result = client->HandleIncomingMessage(owned_message.get()); |
|
sky
2015/11/19 17:16:27
Similar question to above about unlocking.
yzshen1
2015/11/19 22:00:40
(Please see my comment for the previous question.)
|
| + } |
| + if (!result) |
| + RaiseErrorInNonTestingMode(); |
| + |
| + return true; |
| +} |
| + |
| +void MultiplexRouter::LockAndCallProcessTasks() { |
| + // There is no need to hold a ref to this class in this case because this is |
| + // always called using base::Bind(), which holds a ref. |
| + base::AutoLock locker(lock_); |
| + ProcessTasks(false); |
| +} |
| + |
| +void MultiplexRouter::OnEndpointDestructed(InterfaceId id) { |
| + lock_.AssertAcquired(); |
| + endpoints_.erase(id); |
| +} |
| + |
| +void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| + lock_.AssertAcquired(); |
| + if (!testing_mode_) |
| + RaiseError(); |
| +} |
| + |
| +} // namespace internal |
| +} // namespace mojo |