| Index: mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| index ca22e86a82fe49811621b5f0c34c7233373d2dff..b9e086c53e11b4371a2858ba0a808a2613cb69ad 100644
|
| --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc
|
| @@ -11,6 +11,7 @@
|
| #include "base/bind.h"
|
| #include "base/macros.h"
|
| #include "base/message_loop/message_loop.h"
|
| +#include "base/single_thread_task_runner.h"
|
| #include "base/stl_util.h"
|
| #include "mojo/public/cpp/bindings/associated_group.h"
|
| #include "mojo/public/cpp/bindings/lib/interface_endpoint_client.h"
|
| @@ -48,8 +49,8 @@ class MultiplexRouter::InterfaceEndpoint
|
| peer_closed_ = true;
|
| }
|
|
|
| - const scoped_refptr<base::SingleThreadTaskRunner> task_runner() const {
|
| - return task_runner_;
|
| + base::SingleThreadTaskRunner* task_runner() const {
|
| + return task_runner_.get();
|
| }
|
| void set_task_runner(
|
| scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
|
| @@ -128,6 +129,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit,
|
| control_message_handler_(this),
|
| control_message_proxy_(&connector_),
|
| next_interface_id_value_(1),
|
| + posted_to_process_tasks_(false),
|
| testing_mode_(false) {
|
| connector_.set_incoming_receiver(&header_validator_);
|
| connector_.set_connection_error_handler(
|
| @@ -180,17 +182,16 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
|
| return ScopedInterfaceEndpointHandle();
|
|
|
| base::AutoLock locker(lock_);
|
| - if (ContainsKey(endpoints_, id)) {
|
| + bool inserted = false;
|
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
|
| + if (inserted) {
|
| + if (encountered_error_)
|
| + UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
|
| + } else {
|
| // If the endpoint already exist, it is because we have received a
|
| // notification that the peer endpoint has closed.
|
| - InterfaceEndpoint* endpoint = endpoints_[id].get();
|
| CHECK(!endpoint->closed());
|
| CHECK(endpoint->peer_closed());
|
| - } else {
|
| - InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
|
| - endpoints_[id] = endpoint;
|
| - if (encountered_error_)
|
| - UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
|
| }
|
| return ScopedInterfaceEndpointHandle(id, true, this);
|
| }
|
| @@ -266,17 +267,7 @@ void MultiplexRouter::DetachEndpointClient(
|
|
|
| bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle,
|
| Message* message) {
|
| - const InterfaceId id = handle.id();
|
| -
|
| - base::AutoLock locker(lock_);
|
| - if (!ContainsKey(endpoints_, id))
|
| - return false;
|
| -
|
| - InterfaceEndpoint* endpoint = endpoints_[id].get();
|
| - if (endpoint->peer_closed())
|
| - return false;
|
| -
|
| - message->set_interface_id(id);
|
| + message->set_interface_id(handle.id());
|
| return connector_.Accept(message);
|
| }
|
|
|
| @@ -325,8 +316,19 @@ bool MultiplexRouter::Accept(Message* message) {
|
|
|
| scoped_refptr<MultiplexRouter> protector(this);
|
| base::AutoLock locker(lock_);
|
| - tasks_.push_back(Task::CreateIncomingMessageTask(message));
|
| - ProcessTasks(false);
|
| +
|
| + bool processed = tasks_.empty() && ProcessIncomingMessage(message, false);
|
| +
|
| + if (!processed) {
|
| + // Either the task queue is not empty or we cannot process the message
|
| + // directly. In both cases, there is no need to call ProcessTasks().
|
| + tasks_.push_back(Task::CreateIncomingMessageTask(message));
|
| + } else if (!tasks_.empty()) {
|
| + // Processing the message may result in new tasks (for error notification)
|
| + // being added to the queue. In this case, we have to attempt to process the
|
| + // tasks.
|
| + ProcessTasks(false);
|
| + }
|
|
|
| // Always return true. If we see errors during message processing, we will
|
| // explicitly call Connector::RaiseError() to disconnect the message pipe.
|
| @@ -339,10 +341,7 @@ bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) {
|
| if (IsMasterInterfaceId(id))
|
| return false;
|
|
|
| - if (!ContainsKey(endpoints_, id))
|
| - endpoints_[id] = new InterfaceEndpoint(this, id);
|
| -
|
| - InterfaceEndpoint* endpoint = endpoints_[id].get();
|
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
|
| DCHECK(!endpoint->peer_closed());
|
|
|
| if (endpoint->client())
|
| @@ -360,10 +359,7 @@ bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
|
| if (IsMasterInterfaceId(id))
|
| return false;
|
|
|
| - if (!ContainsKey(endpoints_, id))
|
| - endpoints_[id] = new InterfaceEndpoint(this, id);
|
| -
|
| - InterfaceEndpoint* endpoint = endpoints_[id].get();
|
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
|
| DCHECK(!endpoint->closed());
|
| UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
|
|
|
| @@ -398,13 +394,17 @@ void MultiplexRouter::OnPipeConnectionError() {
|
| void MultiplexRouter::ProcessTasks(bool force_async) {
|
| lock_.AssertAcquired();
|
|
|
| + if (posted_to_process_tasks_)
|
| + return;
|
| +
|
| while (!tasks_.empty()) {
|
| scoped_ptr<Task> task(std::move(tasks_.front()));
|
| tasks_.pop_front();
|
|
|
| - bool processed = task->IsNotifyErrorTask()
|
| - ? ProcessNotifyErrorTask(task.get(), force_async)
|
| - : ProcessIncomingMessageTask(task.get(), force_async);
|
| + bool processed =
|
| + task->IsNotifyErrorTask()
|
| + ? ProcessNotifyErrorTask(task.get(), force_async)
|
| + : ProcessIncomingMessage(task->message.get(), force_async);
|
|
|
| if (!processed) {
|
| tasks_.push_front(std::move(task));
|
| @@ -420,8 +420,7 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
|
| return true;
|
|
|
| if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
|
| - endpoint->task_runner()->PostTask(
|
| - FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
|
| + MaybePostToProcessTasks(endpoint->task_runner());
|
| return false;
|
| }
|
|
|
| @@ -438,10 +437,9 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) {
|
| return true;
|
| }
|
|
|
| -bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
|
| +bool MultiplexRouter::ProcessIncomingMessage(Message* message,
|
| + bool force_async) {
|
| lock_.AssertAcquired();
|
| - Message* message = task->message.get();
|
| -
|
| if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
|
| if (!control_message_handler_.Accept(message))
|
| RaiseErrorInNonTestingMode();
|
| @@ -451,7 +449,9 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
|
| InterfaceId id = message->interface_id();
|
| DCHECK(IsValidInterfaceId(id));
|
|
|
| - if (!ContainsKey(endpoints_, id)) {
|
| + bool inserted = false;
|
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
|
| + if (inserted) {
|
| DCHECK(!IsMasterInterfaceId(id));
|
|
|
| // Currently, it is legitimate to receive messages for an endpoint
|
| @@ -459,15 +459,12 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
|
| // a message that is discarded. Once we add support to specify all
|
| // enclosing endpoints in message header, we should be able to remove
|
| // this.
|
| - InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
|
| - endpoints_[id] = endpoint;
|
| UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
|
|
|
| control_message_proxy_.NotifyPeerEndpointClosed(id);
|
| return true;
|
| }
|
|
|
| - InterfaceEndpoint* endpoint = endpoints_[id].get();
|
| if (endpoint->closed())
|
| return true;
|
|
|
| @@ -478,13 +475,11 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
|
| }
|
|
|
| if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) {
|
| - endpoint->task_runner()->PostTask(
|
| - FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
|
| + MaybePostToProcessTasks(endpoint->task_runner());
|
| return false;
|
| }
|
|
|
| InterfaceEndpointClient* client = endpoint->client();
|
| - scoped_ptr<Message> owned_message = std::move(task->message);
|
| bool result = false;
|
| {
|
| // We must unlock before calling into |client| because it may call this
|
| @@ -494,7 +489,7 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
|
| // It is safe to call into |client| without the lock. Because |client| is
|
| // always accessed on the same thread, including DetachEndpointClient().
|
| base::AutoUnlock unlocker(lock_);
|
| - result = client->HandleIncomingMessage(owned_message.get());
|
| + result = client->HandleIncomingMessage(message);
|
| }
|
| if (!result)
|
| RaiseErrorInNonTestingMode();
|
| @@ -502,10 +497,22 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) {
|
| return true;
|
| }
|
|
|
| +void MultiplexRouter::MaybePostToProcessTasks(
|
| + base::SingleThreadTaskRunner* task_runner) {
|
| + lock_.AssertAcquired();
|
| + if (posted_to_process_tasks_)
|
| + return;
|
| +
|
| + posted_to_process_tasks_ = true;
|
| + task_runner->PostTask(
|
| + FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
|
| +}
|
| +
|
| void MultiplexRouter::LockAndCallProcessTasks() {
|
| // There is no need to hold a ref to this class in this case because this is
|
| // always called using base::Bind(), which holds a ref.
|
| base::AutoLock locker(lock_);
|
| + posted_to_process_tasks_ = false;
|
| ProcessTasks(false);
|
| }
|
|
|
| @@ -530,5 +537,27 @@ void MultiplexRouter::RaiseErrorInNonTestingMode() {
|
| RaiseError();
|
| }
|
|
|
| +MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint(
|
| + InterfaceId id,
|
| + bool* inserted) {
|
| + lock_.AssertAcquired();
|
| + // Either |inserted| is nullptr or it points to a boolean initialized as
|
| + // false.
|
| + DCHECK(!inserted || !*inserted);
|
| +
|
| + auto iter = endpoints_.find(id);
|
| + InterfaceEndpoint* endpoint;
|
| + if (iter == endpoints_.end()) {
|
| + endpoint = new InterfaceEndpoint(this, id);
|
| + endpoints_[id] = endpoint;
|
| + if (inserted)
|
| + *inserted = true;
|
| + } else {
|
| + endpoint = iter->second.get();
|
| + }
|
| +
|
| + return endpoint;
|
| +}
|
| +
|
| } // namespace internal
|
| } // namespace mojo
|
|
|