| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 96 return; | 96 return; |
| 97 | 97 |
| 98 EnsureEventMessagePipeExists(); | 98 EnsureEventMessagePipeExists(); |
| 99 event_signalled_ = true; | 99 event_signalled_ = true; |
| 100 MojoResult result = | 100 MojoResult result = |
| 101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, | 101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, |
| 102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); | 102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
| 103 DCHECK_EQ(MOJO_RESULT_OK, result); | 103 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 104 } | 104 } |
| 105 | 105 |
| 106 void ResetSyncMessageSignal() { |
| 107 router_->lock_.AssertAcquired(); |
| 108 |
| 109 if (!event_signalled_) |
| 110 return; |
| 111 |
| 112 DCHECK(sync_message_event_receiver_.is_valid()); |
| 113 MojoResult result = |
| 114 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, |
| 115 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| 116 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 117 event_signalled_ = false; |
| 118 } |
| 119 |
| 106 // --------------------------------------------------------------------------- | 120 // --------------------------------------------------------------------------- |
| 107 // The following public methods (i.e., InterfaceEndpointController | 121 // The following public methods (i.e., InterfaceEndpointController |
| 108 // implementation) are called by the client on the same thread as the | 122 // implementation) are called by the client on the same thread as the |
| 109 // AttachClient() call. They are called outside of the router's lock. | 123 // AttachClient() call. They are called outside of the router's lock. |
| 110 | 124 |
| 111 bool SendMessage(Message* message) override { | 125 bool SendMessage(Message* message) override { |
| 112 DCHECK(task_runner_->BelongsToCurrentThread()); | 126 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 113 message->set_interface_id(id_); | 127 message->set_interface_id(id_); |
| 114 return router_->connector_.Accept(message); | 128 return router_->connector_.Accept(message); |
| 115 } | 129 } |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 192 router_->lock_.AssertAcquired(); | 206 router_->lock_.AssertAcquired(); |
| 193 | 207 |
| 194 if (sync_message_event_receiver_.is_valid()) | 208 if (sync_message_event_receiver_.is_valid()) |
| 195 return; | 209 return; |
| 196 | 210 |
| 197 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, | 211 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
| 198 &sync_message_event_receiver_); | 212 &sync_message_event_receiver_); |
| 199 DCHECK_EQ(MOJO_RESULT_OK, result); | 213 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 200 } | 214 } |
| 201 | 215 |
| 202 void ResetSyncMessageSignal() { | |
| 203 router_->lock_.AssertAcquired(); | |
| 204 | |
| 205 if (!event_signalled_) | |
| 206 return; | |
| 207 | |
| 208 DCHECK(sync_message_event_receiver_.is_valid()); | |
| 209 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), | |
| 210 nullptr, nullptr, nullptr, nullptr, | |
| 211 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | |
| 212 DCHECK_EQ(MOJO_RESULT_OK, result); | |
| 213 event_signalled_ = false; | |
| 214 } | |
| 215 | |
| 216 // --------------------------------------------------------------------------- | 216 // --------------------------------------------------------------------------- |
| 217 // The following members are safe to access from any threads. | 217 // The following members are safe to access from any threads. |
| 218 | 218 |
| 219 MultiplexRouter* const router_; | 219 MultiplexRouter* const router_; |
| 220 const InterfaceId id_; | 220 const InterfaceId id_; |
| 221 | 221 |
| 222 // --------------------------------------------------------------------------- | 222 // --------------------------------------------------------------------------- |
| 223 // The following members are accessed under the router's lock. | 223 // The following members are accessed under the router's lock. |
| 224 | 224 |
| 225 // Whether the endpoint has been closed. | 225 // Whether the endpoint has been closed. |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 290 header_validator_(this), | 290 header_validator_(this), |
| 291 connector_(std::move(message_pipe), | 291 connector_(std::move(message_pipe), |
| 292 Connector::MULTI_THREADED_SEND, | 292 Connector::MULTI_THREADED_SEND, |
| 293 std::move(runner)), | 293 std::move(runner)), |
| 294 control_message_handler_(this), | 294 control_message_handler_(this), |
| 295 control_message_proxy_(&connector_), | 295 control_message_proxy_(&connector_), |
| 296 next_interface_id_value_(1), | 296 next_interface_id_value_(1), |
| 297 posted_to_process_tasks_(false), | 297 posted_to_process_tasks_(false), |
| 298 encountered_error_(false), | 298 encountered_error_(false), |
| 299 paused_(false), |
| 299 testing_mode_(false) { | 300 testing_mode_(false) { |
| 300 // Always participate in sync handle watching, because even if it doesn't | 301 // Always participate in sync handle watching, because even if it doesn't |
| 301 // expect sync requests during sync handle watching, it may still need to | 302 // expect sync requests during sync handle watching, it may still need to |
| 302 // dispatch messages to associated endpoints on a different thread. | 303 // dispatch messages to associated endpoints on a different thread. |
| 303 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 304 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
| 304 connector_.set_incoming_receiver(&header_validator_); | 305 connector_.set_incoming_receiver(&header_validator_); |
| 305 connector_.set_connection_error_handler( | 306 connector_.set_connection_error_handler( |
| 306 base::Bind(&MultiplexRouter::OnPipeConnectionError, | 307 base::Bind(&MultiplexRouter::OnPipeConnectionError, |
| 307 base::Unretained(this))); | 308 base::Unretained(this))); |
| 308 } | 309 } |
| (...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 449 | 450 |
| 450 void MultiplexRouter::CloseMessagePipe() { | 451 void MultiplexRouter::CloseMessagePipe() { |
| 451 DCHECK(thread_checker_.CalledOnValidThread()); | 452 DCHECK(thread_checker_.CalledOnValidThread()); |
| 452 connector_.CloseMessagePipe(); | 453 connector_.CloseMessagePipe(); |
| 453 // CloseMessagePipe() above won't trigger connection error handler. | 454 // CloseMessagePipe() above won't trigger connection error handler. |
| 454 // Explicitly call OnPipeConnectionError() so that associated endpoints will | 455 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
| 455 // get notified. | 456 // get notified. |
| 456 OnPipeConnectionError(); | 457 OnPipeConnectionError(); |
| 457 } | 458 } |
| 458 | 459 |
| 460 void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
| 461 DCHECK(thread_checker_.CalledOnValidThread()); |
| 462 connector_.PauseIncomingMethodCallProcessing(); |
| 463 |
| 464 base::AutoLock locker(lock_); |
| 465 paused_ = true; |
| 466 |
| 467 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
| 468 iter->second->ResetSyncMessageSignal(); |
| 469 } |
| 470 |
| 471 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
| 472 DCHECK(thread_checker_.CalledOnValidThread()); |
| 473 connector_.ResumeIncomingMethodCallProcessing(); |
| 474 |
| 475 base::AutoLock locker(lock_); |
| 476 paused_ = false; |
| 477 |
| 478 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
| 479 auto sync_iter = sync_message_tasks_.find(iter->first); |
| 480 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
| 481 iter->second->SignalSyncMessageEvent(); |
| 482 } |
| 483 |
| 484 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 485 } |
| 486 |
| 459 bool MultiplexRouter::HasAssociatedEndpoints() const { | 487 bool MultiplexRouter::HasAssociatedEndpoints() const { |
| 460 DCHECK(thread_checker_.CalledOnValidThread()); | 488 DCHECK(thread_checker_.CalledOnValidThread()); |
| 461 base::AutoLock locker(lock_); | 489 base::AutoLock locker(lock_); |
| 462 | 490 |
| 463 if (endpoints_.size() > 1) | 491 if (endpoints_.size() > 1) |
| 464 return true; | 492 return true; |
| 465 if (endpoints_.size() == 0) | 493 if (endpoints_.size() == 0) |
| 466 return false; | 494 return false; |
| 467 | 495 |
| 468 return !ContainsKey(endpoints_, kMasterInterfaceId); | 496 return !ContainsKey(endpoints_, kMasterInterfaceId); |
| 469 } | 497 } |
| 470 | 498 |
| 471 void MultiplexRouter::EnableTestingMode() { | 499 void MultiplexRouter::EnableTestingMode() { |
| 472 DCHECK(thread_checker_.CalledOnValidThread()); | 500 DCHECK(thread_checker_.CalledOnValidThread()); |
| 473 base::AutoLock locker(lock_); | 501 base::AutoLock locker(lock_); |
| 474 | 502 |
| 475 testing_mode_ = true; | 503 testing_mode_ = true; |
| 476 connector_.set_enforce_errors_from_incoming_receiver(false); | 504 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 477 } | 505 } |
| 478 | 506 |
| 479 bool MultiplexRouter::Accept(Message* message) { | 507 bool MultiplexRouter::Accept(Message* message) { |
| 480 DCHECK(thread_checker_.CalledOnValidThread()); | 508 DCHECK(thread_checker_.CalledOnValidThread()); |
| 481 | 509 |
| 482 scoped_refptr<MultiplexRouter> protector(this); | 510 scoped_refptr<MultiplexRouter> protector(this); |
| 483 base::AutoLock locker(lock_); | 511 base::AutoLock locker(lock_); |
| 484 | 512 |
| 513 DCHECK(!paused_); |
| 514 |
| 485 ClientCallBehavior client_call_behavior = | 515 ClientCallBehavior client_call_behavior = |
| 486 connector_.during_sync_handle_watcher_callback() | 516 connector_.during_sync_handle_watcher_callback() |
| 487 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 517 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 488 : ALLOW_DIRECT_CLIENT_CALLS; | 518 : ALLOW_DIRECT_CLIENT_CALLS; |
| 489 | 519 |
| 490 bool processed = | 520 bool processed = |
| 491 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, | 521 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, |
| 492 connector_.task_runner()); | 522 connector_.task_runner()); |
| 493 | 523 |
| 494 if (!processed) { | 524 if (!processed) { |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 583 } | 613 } |
| 584 | 614 |
| 585 void MultiplexRouter::ProcessTasks( | 615 void MultiplexRouter::ProcessTasks( |
| 586 ClientCallBehavior client_call_behavior, | 616 ClientCallBehavior client_call_behavior, |
| 587 base::SingleThreadTaskRunner* current_task_runner) { | 617 base::SingleThreadTaskRunner* current_task_runner) { |
| 588 lock_.AssertAcquired(); | 618 lock_.AssertAcquired(); |
| 589 | 619 |
| 590 if (posted_to_process_tasks_) | 620 if (posted_to_process_tasks_) |
| 591 return; | 621 return; |
| 592 | 622 |
| 593 while (!tasks_.empty()) { | 623 while (!tasks_.empty() && !paused_) { |
| 594 std::unique_ptr<Task> task(std::move(tasks_.front())); | 624 std::unique_ptr<Task> task(std::move(tasks_.front())); |
| 595 tasks_.pop_front(); | 625 tasks_.pop_front(); |
| 596 | 626 |
| 597 InterfaceId id = kInvalidInterfaceId; | 627 InterfaceId id = kInvalidInterfaceId; |
| 598 bool sync_message = task->IsMessageTask() && task->message && | 628 bool sync_message = task->IsMessageTask() && task->message && |
| 599 task->message->has_flag(Message::kFlagIsSync); | 629 task->message->has_flag(Message::kFlagIsSync); |
| 600 if (sync_message) { | 630 if (sync_message) { |
| 601 id = task->message->interface_id(); | 631 id = task->message->interface_id(); |
| 602 auto& sync_message_queue = sync_message_tasks_[id]; | 632 auto& sync_message_queue = sync_message_tasks_[id]; |
| 603 DCHECK_EQ(task.get(), sync_message_queue.front()); | 633 DCHECK_EQ(task.get(), sync_message_queue.front()); |
| (...skipping 24 matching lines...) Expand all Loading... |
| 628 } | 658 } |
| 629 } | 659 } |
| 630 | 660 |
| 631 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { | 661 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
| 632 lock_.AssertAcquired(); | 662 lock_.AssertAcquired(); |
| 633 | 663 |
| 634 auto iter = sync_message_tasks_.find(id); | 664 auto iter = sync_message_tasks_.find(id); |
| 635 if (iter == sync_message_tasks_.end()) | 665 if (iter == sync_message_tasks_.end()) |
| 636 return false; | 666 return false; |
| 637 | 667 |
| 668 if (paused_) |
| 669 return true; |
| 670 |
| 638 MultiplexRouter::Task* task = iter->second.front(); | 671 MultiplexRouter::Task* task = iter->second.front(); |
| 639 iter->second.pop_front(); | 672 iter->second.pop_front(); |
| 640 | 673 |
| 641 DCHECK(task->IsMessageTask()); | 674 DCHECK(task->IsMessageTask()); |
| 642 std::unique_ptr<Message> message(std::move(task->message)); | 675 std::unique_ptr<Message> message(std::move(task->message)); |
| 643 | 676 |
| 644 // Note: after this call, |task| and |iter| may be invalidated. | 677 // Note: after this call, |task| and |iter| may be invalidated. |
| 645 bool processed = ProcessIncomingMessage( | 678 bool processed = ProcessIncomingMessage( |
| 646 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); | 679 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); |
| 647 DCHECK(processed); | 680 DCHECK(processed); |
| 648 | 681 |
| 649 iter = sync_message_tasks_.find(id); | 682 iter = sync_message_tasks_.find(id); |
| 650 if (iter == sync_message_tasks_.end()) | 683 if (iter == sync_message_tasks_.end()) |
| 651 return false; | 684 return false; |
| 652 | 685 |
| 653 if (iter->second.empty()) { | 686 if (iter->second.empty()) { |
| 654 sync_message_tasks_.erase(iter); | 687 sync_message_tasks_.erase(iter); |
| 655 return false; | 688 return false; |
| 656 } | 689 } |
| 657 | 690 |
| 658 return true; | 691 return true; |
| 659 } | 692 } |
| 660 | 693 |
| 661 bool MultiplexRouter::ProcessNotifyErrorTask( | 694 bool MultiplexRouter::ProcessNotifyErrorTask( |
| 662 Task* task, | 695 Task* task, |
| 663 ClientCallBehavior client_call_behavior, | 696 ClientCallBehavior client_call_behavior, |
| 664 base::SingleThreadTaskRunner* current_task_runner) { | 697 base::SingleThreadTaskRunner* current_task_runner) { |
| 665 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 698 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 699 DCHECK(!paused_); |
| 700 |
| 666 lock_.AssertAcquired(); | 701 lock_.AssertAcquired(); |
| 667 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 702 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
| 668 if (!endpoint->client()) | 703 if (!endpoint->client()) |
| 669 return true; | 704 return true; |
| 670 | 705 |
| 671 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || | 706 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || |
| 672 endpoint->task_runner() != current_task_runner) { | 707 endpoint->task_runner() != current_task_runner) { |
| 673 MaybePostToProcessTasks(endpoint->task_runner()); | 708 MaybePostToProcessTasks(endpoint->task_runner()); |
| 674 return false; | 709 return false; |
| 675 } | 710 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 687 client->NotifyError(); | 722 client->NotifyError(); |
| 688 } | 723 } |
| 689 return true; | 724 return true; |
| 690 } | 725 } |
| 691 | 726 |
| 692 bool MultiplexRouter::ProcessIncomingMessage( | 727 bool MultiplexRouter::ProcessIncomingMessage( |
| 693 Message* message, | 728 Message* message, |
| 694 ClientCallBehavior client_call_behavior, | 729 ClientCallBehavior client_call_behavior, |
| 695 base::SingleThreadTaskRunner* current_task_runner) { | 730 base::SingleThreadTaskRunner* current_task_runner) { |
| 696 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 731 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 732 DCHECK(!paused_); |
| 697 lock_.AssertAcquired(); | 733 lock_.AssertAcquired(); |
| 698 | 734 |
| 699 if (!message) { | 735 if (!message) { |
| 700 // This is a sync message and has been processed during sync handle | 736 // This is a sync message and has been processed during sync handle |
| 701 // watching. | 737 // watching. |
| 702 return true; | 738 return true; |
| 703 } | 739 } |
| 704 | 740 |
| 705 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 741 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 706 if (!control_message_handler_.Accept(message)) | 742 if (!control_message_handler_.Accept(message)) |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 840 *inserted = true; | 876 *inserted = true; |
| 841 } else { | 877 } else { |
| 842 endpoint = iter->second.get(); | 878 endpoint = iter->second.get(); |
| 843 } | 879 } |
| 844 | 880 |
| 845 return endpoint; | 881 return endpoint; |
| 846 } | 882 } |
| 847 | 883 |
| 848 } // namespace internal | 884 } // namespace internal |
| 849 } // namespace mojo | 885 } // namespace mojo |
| OLD | NEW |