| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 385 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 396 } | 396 } |
| 397 | 397 |
| 398 void MultiplexRouter::ProcessTasks(bool force_async) { | 398 void MultiplexRouter::ProcessTasks(bool force_async) { |
| 399 lock_.AssertAcquired(); | 399 lock_.AssertAcquired(); |
| 400 | 400 |
| 401 while (!tasks_.empty()) { | 401 while (!tasks_.empty()) { |
| 402 scoped_ptr<Task> task(std::move(tasks_.front())); | 402 scoped_ptr<Task> task(std::move(tasks_.front())); |
| 403 tasks_.pop_front(); | 403 tasks_.pop_front(); |
| 404 | 404 |
| 405 bool processed = task->IsNotifyErrorTask() | 405 bool processed = task->IsNotifyErrorTask() |
| 406 ? ProcessNotifyErrorTask(task.get(), &force_async) | 406 ? ProcessNotifyErrorTask(task.get(), force_async) |
| 407 : ProcessIncomingMessageTask(task.get(), &force_async); | 407 : ProcessIncomingMessageTask(task.get(), force_async); |
| 408 | 408 |
| 409 if (!processed) { | 409 if (!processed) { |
| 410 tasks_.push_front(std::move(task)); | 410 tasks_.push_front(std::move(task)); |
| 411 break; | 411 break; |
| 412 } | 412 } |
| 413 } | 413 } |
| 414 } | 414 } |
| 415 | 415 |
| 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool* force_async) { | 416 bool MultiplexRouter::ProcessNotifyErrorTask(Task* task, bool force_async) { |
| 417 lock_.AssertAcquired(); | 417 lock_.AssertAcquired(); |
| 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 418 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 419 if (!endpoint->client()) | 419 if (!endpoint->client()) |
| 420 return true; | 420 return true; |
| 421 | 421 |
| 422 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | 422 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| 423 endpoint->task_runner()->PostTask( | 423 endpoint->task_runner()->PostTask( |
| 424 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 424 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 425 return false; | 425 return false; |
| 426 } | 426 } |
| 427 | 427 |
| 428 *force_async = true; | |
| 429 InterfaceEndpointClient* client = endpoint->client(); | 428 InterfaceEndpointClient* client = endpoint->client(); |
| 430 { | 429 { |
| 431 // We must unlock before calling into |client| because it may call this | 430 // We must unlock before calling into |client| because it may call this |
| 432 // object within NotifyError(). Holding the lock will lead to deadlock. | 431 // object within NotifyError(). Holding the lock will lead to deadlock. |
| 433 // | 432 // |
| 434 // It is safe to call into |client| without the lock. Because |client| is | 433 // It is safe to call into |client| without the lock. Because |client| is |
| 435 // always accessed on the same thread, including DetachEndpointClient(). | 434 // always accessed on the same thread, including DetachEndpointClient(). |
| 436 base::AutoUnlock unlocker(lock_); | 435 base::AutoUnlock unlocker(lock_); |
| 437 client->NotifyError(); | 436 client->NotifyError(); |
| 438 } | 437 } |
| 439 return true; | 438 return true; |
| 440 } | 439 } |
| 441 | 440 |
| 442 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, | 441 bool MultiplexRouter::ProcessIncomingMessageTask(Task* task, bool force_async) { |
| 443 bool* force_async) { | |
| 444 lock_.AssertAcquired(); | 442 lock_.AssertAcquired(); |
| 445 Message* message = task->message.get(); | 443 Message* message = task->message.get(); |
| 446 | 444 |
| 447 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 445 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 448 if (!control_message_handler_.Accept(message)) | 446 if (!control_message_handler_.Accept(message)) |
| 449 RaiseErrorInNonTestingMode(); | 447 RaiseErrorInNonTestingMode(); |
| 450 return true; | 448 return true; |
| 451 } | 449 } |
| 452 | 450 |
| 453 InterfaceId id = message->interface_id(); | 451 InterfaceId id = message->interface_id(); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 472 InterfaceEndpoint* endpoint = endpoints_[id].get(); | 470 InterfaceEndpoint* endpoint = endpoints_[id].get(); |
| 473 if (endpoint->closed()) | 471 if (endpoint->closed()) |
| 474 return true; | 472 return true; |
| 475 | 473 |
| 476 if (!endpoint->client()) { | 474 if (!endpoint->client()) { |
| 477 // We need to wait until a client is attached in order to dispatch further | 475 // We need to wait until a client is attached in order to dispatch further |
| 478 // messages. | 476 // messages. |
| 479 return false; | 477 return false; |
| 480 } | 478 } |
| 481 | 479 |
| 482 if (!endpoint->task_runner()->BelongsToCurrentThread() || *force_async) { | 480 if (!endpoint->task_runner()->BelongsToCurrentThread() || force_async) { |
| 483 endpoint->task_runner()->PostTask( | 481 endpoint->task_runner()->PostTask( |
| 484 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); | 482 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); |
| 485 return false; | 483 return false; |
| 486 } | 484 } |
| 487 | 485 |
| 488 *force_async = true; | |
| 489 InterfaceEndpointClient* client = endpoint->client(); | 486 InterfaceEndpointClient* client = endpoint->client(); |
| 490 scoped_ptr<Message> owned_message = std::move(task->message); | 487 scoped_ptr<Message> owned_message = std::move(task->message); |
| 491 bool result = false; | 488 bool result = false; |
| 492 { | 489 { |
| 493 // We must unlock before calling into |client| because it may call this | 490 // We must unlock before calling into |client| because it may call this |
| 494 // object within HandleIncomingMessage(). Holding the lock will lead to | 491 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 495 // deadlock. | 492 // deadlock. |
| 496 // | 493 // |
| 497 // It is safe to call into |client| without the lock. Because |client| is | 494 // It is safe to call into |client| without the lock. Because |client| is |
| 498 // always accessed on the same thread, including DetachEndpointClient(). | 495 // always accessed on the same thread, including DetachEndpointClient(). |
| (...skipping 29 matching lines...) Expand all Loading... |
| 528 } | 525 } |
| 529 | 526 |
| 530 void MultiplexRouter::RaiseErrorInNonTestingMode() { | 527 void MultiplexRouter::RaiseErrorInNonTestingMode() { |
| 531 lock_.AssertAcquired(); | 528 lock_.AssertAcquired(); |
| 532 if (!testing_mode_) | 529 if (!testing_mode_) |
| 533 RaiseError(); | 530 RaiseError(); |
| 534 } | 531 } |
| 535 | 532 |
| 536 } // namespace internal | 533 } // namespace internal |
| 537 } // namespace mojo | 534 } // namespace mojo |
| OLD | NEW |