OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
6 | 6 |
7 #include <utility> | 7 #include <utility> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/message_loop/message_loop.h" | 10 #include "base/message_loop/message_loop.h" |
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
114 Task() {} | 114 Task() {} |
115 }; | 115 }; |
116 | 116 |
117 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, | 117 MultiplexRouter::MultiplexRouter(bool set_interface_id_namesapce_bit, |
118 ScopedMessagePipeHandle message_pipe, | 118 ScopedMessagePipeHandle message_pipe, |
119 const MojoAsyncWaiter* waiter) | 119 const MojoAsyncWaiter* waiter) |
120 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() | 120 : RefCountedDeleteOnMessageLoop(base::MessageLoop::current() |
121 ->task_runner()), | 121 ->task_runner()), |
122 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 122 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
123 header_validator_(this), | 123 header_validator_(this), |
124 connector_(message_pipe.Pass(), Connector::MULTI_THREADED_SEND, waiter), | 124 connector_(std::move(message_pipe), |
| 125 Connector::MULTI_THREADED_SEND, |
| 126 waiter), |
125 encountered_error_(false), | 127 encountered_error_(false), |
126 control_message_handler_(this), | 128 control_message_handler_(this), |
127 control_message_proxy_(&connector_), | 129 control_message_proxy_(&connector_), |
128 next_interface_id_value_(1), | 130 next_interface_id_value_(1), |
129 testing_mode_(false) { | 131 testing_mode_(false) { |
130 connector_.set_incoming_receiver(&header_validator_); | 132 connector_.set_incoming_receiver(&header_validator_); |
131 connector_.set_connection_error_handler( | 133 connector_.set_connection_error_handler( |
132 [this]() { OnPipeConnectionError(); }); | 134 [this]() { OnPipeConnectionError(); }); |
133 } | 135 } |
134 | 136 |
(...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
283 connector_.RaiseError(); | 285 connector_.RaiseError(); |
284 } else { | 286 } else { |
285 task_runner_->PostTask(FROM_HERE, | 287 task_runner_->PostTask(FROM_HERE, |
286 base::Bind(&MultiplexRouter::RaiseError, this)); | 288 base::Bind(&MultiplexRouter::RaiseError, this)); |
287 } | 289 } |
288 } | 290 } |
289 | 291 |
290 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { | 292 scoped_ptr<AssociatedGroup> MultiplexRouter::CreateAssociatedGroup() { |
291 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); | 293 scoped_ptr<AssociatedGroup> group(new AssociatedGroup); |
292 group->router_ = this; | 294 group->router_ = this; |
293 return group.Pass(); | 295 return group; |
294 } | 296 } |
295 | 297 |
296 // static | 298 // static |
297 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { | 299 MultiplexRouter* MultiplexRouter::GetRouter(AssociatedGroup* associated_group) { |
298 return associated_group->router_.get(); | 300 return associated_group->router_.get(); |
299 } | 301 } |
300 | 302 |
301 bool MultiplexRouter::HasAssociatedEndpoints() const { | 303 bool MultiplexRouter::HasAssociatedEndpoints() const { |
302 DCHECK(thread_checker_.CalledOnValidThread()); | 304 DCHECK(thread_checker_.CalledOnValidThread()); |
303 base::AutoLock locker(lock_); | 305 base::AutoLock locker(lock_); |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
390 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 392 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
391 } | 393 } |
392 | 394 |
393 ProcessTasks(false); | 395 ProcessTasks(false); |
394 } | 396 } |
395 | 397 |
396 void MultiplexRouter::ProcessTasks(bool force_async) { | 398 void MultiplexRouter::ProcessTasks(bool force_async) { |
397 lock_.AssertAcquired(); | 399 lock_.AssertAcquired(); |
398 | 400 |
399 while (!tasks_.empty()) { | 401 while (!tasks_.empty()) { |
400 scoped_ptr<Task> task(tasks_.front().Pass()); | 402 scoped_ptr<Task> task(std::move(tasks_.front())); |
401 tasks_.pop_front(); | 403 tasks_.pop_front(); |
402 | 404 |
403 bool processed = task->IsNotifyErrorTask() | 405 bool processed = task->IsNotifyErrorTask() |
404 ? ProcessNotifyErrorTask(task.get(), &force_async) | 406 ? ProcessNotifyErrorTask(task.get(), &force_async) |
405 : ProcessIncomingMessageTask(task.get(), &force_async); | 407 : ProcessIncomingMessageTask(task.get(), &force_async); |
406 | 408 |
407 if (!processed) { | 409 if (!processed) { |
408 tasks_.push_front(task.Pass()); | 410 tasks_.push_front(std::move(task)); |
409 break; | 411 break; |
410 } | 412 } |
411 } | 413 } |
412 } | 414 } |
413 | 415 |
414 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { | 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { |
415 lock_.AssertAcquired(); | 417 lock_.AssertAcquired(); |
416 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
417 if (!endpoint->client()) | 419 if (!endpoint->client()) |
418 return true; | 420 return true; |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
478 } | 480 } |
479 | 481 |
480 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | 482 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { |
481 endpoint->task_runner()->PostTask( | 483 endpoint->task_runner()->PostTask( |
482 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 484 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
483 return false; | 485 return false; |
484 } | 486 } |
485 | 487 |
486 *force_async = true; | 488 *force_async = true; |
487 InterfaceEndpointClient* client = endpoint->client(); | 489 InterfaceEndpointClient* client = endpoint->client(); |
488 scoped_ptr<Message> owned_message = task->message.Pass(); | 490 scoped_ptr<Message> owned_message = std::move(task->message); |
489 bool result = false; | 491 bool result = false; |
490 { | 492 { |
491 // We must unlock before calling into |client| because it may call this | 493 // We must unlock before calling into |client| because it may call this |
492 // object within HandleIncomingMessage(). Holding the lock will lead to | 494 // object within HandleIncomingMessage(). Holding the lock will lead to |
493 // deadlock. | 495 // deadlock. |
494 // | 496 // |
495 // It is safe to call into |client| without the lock. Because |client| is | 497 // It is safe to call into |client| without the lock. Because |client| is |
496 // always accessed on the same thread, including DetachEndpointClient(). | 498 // always accessed on the same thread, including DetachEndpointClient(). |
497 base::AutoUnlock unlocker(lock_); | 499 base::AutoUnlock unlocker(lock_); |
498 result = client->HandleIncomingMessage(owned_message.get()); | 500 result = client->HandleIncomingMessage(owned_message.get()); |
(...skipping 27 matching lines...) Expand all Loading... |
526 } | 528 } |
527 | 529 |
528 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 530 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
529 lock_.AssertAcquired(); | 531 lock_.AssertAcquired(); |
530 if (!testing_mode_) | 532 if (!testing_mode_) |
531 RaiseError(); | 533 RaiseError(); |
532 } | 534 } |
533 | 535 |
534 } // namespace internal | 536 } // namespace internal |
535 } // namespace mojo | 537 } // namespace mojo |
OLD | NEW |