| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <utility> | 7 #include <utility> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
| (...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 114 Task() {} | 114 Task() {} |
| 115 }; | 115 }; |
| 116 | 116 |
| 117 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | 117 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
| 118 ScopedMessagePipeHandle message_pipe, | 118 ScopedMessagePipeHandle message_pipe, |
| 119 const MojoAsyncWaiter* waiter) | 119 const MojoAsyncWaiter* waiter) |
| 120 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() | 120 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() |
| 121 ->task_runner()), | 121 ->task_runner()), |
| 122 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 122 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 123 header_validator_(this), | 123 header_validator_(this), |
| 124 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), | 124 connector_(std::move(message_pipe), |
| 125 Connector::MULTI_THREADED_SEND, |
| 126 waiter), |
| 125 encountered_error_(false), | 127 encountered_error_(false), |
| 126 control_message_handler_(this), | 128 control_message_handler_(this), |
| 127 control_message_proxy_(&connector_), | 129 control_message_proxy_(&connector_), |
| 128 next_interface_id_value_(1), | 130 next_interface_id_value_(1), |
| 129 testing_mode_(false) { | 131 testing_mode_(false) { |
| 130 connector_.set_incoming_receiver(&header_validator_); | 132 connector_.set_incoming_receiver(&header_validator_); |
| 131 connector_.set_connection_error_handler( | 133 connector_.set_connection_error_handler( |
| 132 [this]() { OnPipeConnectionError(); }); | 134 [this]() { OnPipeConnectionError(); }); |
| 133 } | 135 } |
| 134 | 136 |
| (...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 283 connector_.RaiseError(); | 285 connector_.RaiseError(); |
| 284 } else { | 286 } else { |
| 285 task_runner_->PostTask(FROM_HERE, | 287 task_runner_->PostTask(FROM_HERE, |
| 286 base::Bind(&MultiplexRouter::RaiseError, this)); | 288 base::Bind(&MultiplexRouter::RaiseError, this)); |
| 287 } | 289 } |
| 288 } | 290 } |
| 289 | 291 |
| 290 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { | 292 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { |
| 291 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); | 293 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); |
| 292 group->router_ = this; | 294 group->router_ = this; |
| 293 return group.Pass(); | 295 return group; |
| 294 } | 296 } |
| 295 | 297 |
| 296 // static | 298 // static |
| 297 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { | 299 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { |
| 298 return associated_group->router_.get(); | 300 return associated_group->router_.get(); |
| 299 } | 301 } |
| 300 | 302 |
| 301 bool MultiplexRouter::HasAssociatedEndpoints() const { | 303 bool MultiplexRouter::HasAssociatedEndpoints() const { |
| 302 DCHECK(thread_checker_.CalledOnValidThread()); | 304 DCHECK(thread_checker_.CalledOnValidThread()); |
| 303 base::AutoLock locker(lock_); | 305 base::AutoLock locker(lock_); |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 390 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 392 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 391 } | 393 } |
| 392 | 394 |
| 393 ProcessTasks(false); | 395 ProcessTasks(false); |
| 394 } | 396 } |
| 395 | 397 |
| 396 void MultiplexRouter::ProcessTasks(bool force_async) { | 398 void MultiplexRouter::ProcessTasks(bool force_async) { |
| 397 lock_.AssertAcquired(); | 399 lock_.AssertAcquired(); |
| 398 | 400 |
| 399 while (!tasks_.empty()) { | 401 while (!tasks_.empty()) { |
| 400 scoped_ptr<Task> task(tasks_.front().Pass()); | 402 scoped_ptr<Task> task(std::move(tasks_.front())); |
| 401 tasks_.pop_front(); | 403 tasks_.pop_front(); |
| 402 | 404 |
| 403 bool processed = task->IsNotifyErrorTask() | 405 bool processed = task->IsNotifyErrorTask() |
| 404 ? ProcessNotifyErrorTask(task.get(), &force_async) | 406 ? ProcessNotifyErrorTask(task.get(), &force_async) |
| 405 : ProcessIncomingMessageTask(task.get(), &force_async); | 407 : ProcessIncomingMessageTask(task.get(), &force_async); |
| 406 | 408 |
| 407 if (!processed) { | 409 if (!processed) { |
| 408 tasks_.push_front(task.Pass()); | 410 tasks_.push_front(std::move(task)); |
| 409 break; | 411 break; |
| 410 } | 412 } |
| 411 } | 413 } |
| 412 } | 414 } |
| 413 | 415 |
| 414 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { | 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { |
| 415 lock_.AssertAcquired(); | 417 lock_.AssertAcquired(); |
| 416 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 417 if (!endpoint->client()) | 419 if (!endpoint->client()) |
| 418 return true; | 420 return true; |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 478 } | 480 } |
| 479 | 481 |
| 480 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | 482 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
| 481 endpoint->task_runner()->PostTask( | 483 endpoint->task_runner()->PostTask( |
| 482 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 484 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 483 return false; | 485 return false; |
| 484 } | 486 } |
| 485 | 487 |
| 486 *force_async = true; | 488 *force_async = true; |
| 487 InterfaceEndpointClient* client = endpoint->client(); | 489 InterfaceEndpointClient* client = endpoint->client(); |
| 488 scoped_ptr<Message> owned_message = task->message.Pass(); | 490 scoped_ptr<Message> owned_message = std::move(task->message); |
| 489 bool result = false; | 491 bool result = false; |
| 490 { | 492 { |
| 491 // We must unlock before calling into |client| because it may call this | 493 // We must unlock before calling into |client| because it may call this |
| 492 // object within HandleIncomingMessage(). Holding the lock will lead to | 494 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 493 // deadlock. | 495 // deadlock. |
| 494 // | 496 // |
| 495 // It is safe to call into |client| without the lock. Because |client| is | 497 // It is safe to call into |client| without the lock. Because |client| is |
| 496 // always accessed on the same thread, including DetachEndpointClient(). | 498 // always accessed on the same thread, including DetachEndpointClient(). |
| 497 base::AutoUnlock unlocker(lock_); | 499 base::AutoUnlock unlocker(lock_); |
| 498 result = client->HandleIncomingMessage(owned_message.get()); | 500 result = client->HandleIncomingMessage(owned_message.get()); |
| (...skipping 27 matching lines...) Expand all Loading... |
| 526 } | 528 } |
| 527 | 529 |
| 528 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 530 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| 529 lock_.AssertAcquired(); | 531 lock_.AssertAcquired(); |
| 530 if (!testing_mode_) | 532 if (!testing_mode_) |
| 531 RaiseError(); | 533 RaiseError(); |
| 532 } | 534 } |
| 533 | 535 |
| 534 } // namespace internal | 536 } // namespace internal |
| 535 } // namespace mojo | 537 } // namespace mojo |
| OLD | NEW |