Chromium Code Reviews| Index: mojo/public/cpp/bindings/lib/multiplex_router.cc |
| diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| index ca22e86a82fe49811621b5f0c34c7233373d2dff..64332ef65238e65c892d99fb21769ef8c5b258ab 100644 |
| --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc |
| @@ -128,6 +128,7 @@ MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| control_message_handler_(this), |
| control_message_proxy_(&connector_), |
| next_interface_id_value_(1), |
| + posted_to_process_tasks_(false), |
| testing_mode_(false) { |
| connector_.set_incoming_receiver(&header_validator_); |
| connector_.set_connection_error_handler( |
| @@ -180,17 +181,16 @@ ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| return ScopedInterfaceEndpointHandle(); |
| base::AutoLock locker(lock_); |
| - if (ContainsKey(endpoints_, id)) { |
| + bool inserted = false; |
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| + if (inserted) { |
| + if (encountered_error_) |
| + UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| + } else { |
| // If the endpoint already exist, it is because we have received a |
| // notification that the peer endpoint has closed. |
| - InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| CHECK(!endpoint->closed()); |
| CHECK(endpoint->peer_closed()); |
| - } else { |
| - InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| - endpoints_[id] = endpoint; |
| - if (encountered_error_) |
| - UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| } |
| return ScopedInterfaceEndpointHandle(id, true, this); |
| } |
| @@ -266,17 +266,7 @@ void MultiplexRouter::DetachEndpointClient( |
| bool MultiplexRouter::SendMessage(const ScopedInterfaceEndpointHandle& handle, |
| Message* message) { |
| - const InterfaceId id = handle.id(); |
| - |
| - base::AutoLock locker(lock_); |
| - if (!ContainsKey(endpoints_, id)) |
| - return false; |
| - |
| - InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| - if (endpoint->peer_closed()) |
| - return false; |
| - |
| - message->set_interface_id(id); |
| + message->set_interface_id(handle.id()); |
| return connector_.Accept(message); |
| } |
| @@ -325,8 +315,19 @@ bool MultiplexRouter::Accept(Message* message) { |
| scoped_refptr<MultiplexRouter> protector(this); |
| base::AutoLock locker(lock_); |
| - tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
| - ProcessTasks(false); |
| + |
| + bool processed = tasks_.empty() && ProcessIncomingMessage(message, false); |
| + |
| + if (!processed) { |
| + // Either the task queue is not empty or we cannot process the message |
| + // directly. In both cases, there is no need to call ProcessTasks(). |
| + tasks_.push_back(Task::CreateIncomingMessageTask(message)); |
| + } else if (!tasks_.empty()) { |
| + // Processing the message may result in new tasks (for error notification) |
| + // being added to the queue. In this case, we have to attempt to process the |
| + // tasks. |
| + ProcessTasks(false); |
| + } |
| // Always return true. If we see errors during message processing, we will |
| // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| @@ -339,10 +340,7 @@ bool MultiplexRouter::OnPeerAssociatedEndpointClosed(InterfaceId id) { |
| if (IsMasterInterfaceId(id)) |
| return false; |
| - if (!ContainsKey(endpoints_, id)) |
| - endpoints_[id] = new InterfaceEndpoint(this, id); |
| - |
| - InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| DCHECK(!endpoint->peer_closed()); |
| if (endpoint->client()) |
| @@ -360,10 +358,7 @@ bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
| if (IsMasterInterfaceId(id)) |
| return false; |
| - if (!ContainsKey(endpoints_, id)) |
| - endpoints_[id] = new InterfaceEndpoint(this, id); |
| - |
| - InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| DCHECK(!endpoint->closed()); |
| UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| @@ -398,13 +393,17 @@ void MultiplexRouter::OnPipeConnectionError() { |
| void MultiplexRouter::ProcessTasks(bool force_async) { |
| lock_.AssertAcquired(); |
| + if (posted_to_process_tasks_) |
| + return; |
| + |
| while (!tasks_.empty()) { |
| scoped_ptr<Task> task(std::move(tasks_.front())); |
| tasks_.pop_front(); |
| - bool processed = task->IsNotifyErrorTask() |
| - ? ProcessNotifyErrorTask(task.get(), force_async) |
| - : ProcessIncomingMessageTask(task.get(), force_async); |
| + bool processed = |
| + task->IsNotifyErrorTask() |
| + ? ProcessNotifyErrorTask(task.get(), force_async) |
| + : ProcessIncomingMessage(task->message.get(), force_async); |
| if (!processed) { |
| tasks_.push_front(std::move(task)); |
| @@ -420,8 +419,12 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
| return true; |
| if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| - endpoint->task_runner()->PostTask( |
| - FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| + if (!posted_to_process_tasks_) { |
|
Ken Rockot(use gerrit already)
2016/03/23 19:15:55
nit: Perhaps you could move this logic into anothe
yzshen1
2016/03/23 20:24:12
Done.
|
| + posted_to_process_tasks_ = true; |
| + endpoint->task_runner()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| + } |
| return false; |
| } |
| @@ -438,10 +441,9 @@ bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
| return true; |
| } |
| -bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { |
| +bool MultiplexRouter::ProcessIncomingMessage(Message* message, |
| + bool force_async) { |
| lock_.AssertAcquired(); |
| - Message* message = task->message.get(); |
| - |
| if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| if (!control_message_handler_.Accept(message)) |
| RaiseErrorInNonTestingMode(); |
| @@ -451,7 +453,9 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { |
| InterfaceId id = message->interface_id(); |
| DCHECK(IsValidInterfaceId(id)); |
| - if (!ContainsKey(endpoints_, id)) { |
| + bool inserted = false; |
| + InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| + if (inserted) { |
| DCHECK(!IsMasterInterfaceId(id)); |
| // Currently, it is legitimate to receive messages for an endpoint |
| @@ -459,15 +463,12 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { |
| // a message that is discarded. Once we add support to specify all |
| // enclosing endpoints in message header, we should be able to remove |
| // this. |
| - InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| - endpoints_[id] = endpoint; |
| UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| control_message_proxy_.NotifyPeerEndpointClosed(id); |
| return true; |
| } |
| - InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| if (endpoint->closed()) |
| return true; |
| @@ -478,13 +479,16 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { |
| } |
| if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| - endpoint->task_runner()->PostTask( |
| - FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| + if (!posted_to_process_tasks_) { |
| + posted_to_process_tasks_ = true; |
| + endpoint->task_runner()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| + } |
| return false; |
| } |
| InterfaceEndpointClient* client = endpoint->client(); |
| - scoped_ptr<Message> owned_message = std::move(task->message); |
| bool result = false; |
| { |
| // We must unlock before calling into |client| because it may call this |
| @@ -494,7 +498,7 @@ bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { |
| // It is safe to call into |client| without the lock. Because |client| is |
| // always accessed on the same thread, including DetachEndpointClient(). |
| base::AutoUnlock unlocker(lock_); |
| - result = client->HandleIncomingMessage(owned_message.get()); |
| + result = client->HandleIncomingMessage(message); |
| } |
| if (!result) |
| RaiseErrorInNonTestingMode(); |
| @@ -506,6 +510,7 @@ void MultiplexRouter::LockAndCallProcessTasks() { |
| // There is no need to hold a ref to this class in this case because this is |
| // always called using base::Bind(), which holds a ref. |
| base::AutoLock locker(lock_); |
| + posted_to_process_tasks_ = false; |
| ProcessTasks(false); |
| } |
| @@ -530,5 +535,27 @@ void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| RaiseError(); |
| } |
| +MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( |
| + InterfaceId id, |
| + bool* inserted) { |
| + lock_.AssertAcquired(); |
| + // Either |inserted| is nullptr or it points to a boolean initialized as |
| + // false. |
| + DCHECK(!inserted || !*inserted); |
| + |
| + auto iter = endpoints_.find(id); |
| + InterfaceEndpoint* endpoint; |
| + if (iter == endpoints_.end()) { |
| + endpoint = new InterfaceEndpoint(this, id); |
| + endpoints_[id] = endpoint; |
| + if (inserted) |
| + *inserted = true; |
| + } else { |
| + endpoint = iter->second.get(); |
| + } |
| + |
| + return endpoint; |
| +} |
| + |
| } // namespace internal |
| } // namespace mojo |