OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
96 return; | 96 return; |
97 | 97 |
98 EnsureEventMessagePipeExists(); | 98 EnsureEventMessagePipeExists(); |
99 event_signalled_ = true; | 99 event_signalled_ = true; |
100 MojoResult result = | 100 MojoResult result = |
101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, | 101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, |
102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); | 102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); |
103 DCHECK_EQ(MOJO_RESULT_OK, result); | 103 DCHECK_EQ(MOJO_RESULT_OK, result); |
104 } | 104 } |
105 | 105 |
| 106 void ResetSyncMessageSignal() { |
| 107 router_->lock_.AssertAcquired(); |
| 108 |
| 109 if (!event_signalled_) |
| 110 return; |
| 111 |
| 112 DCHECK(sync_message_event_receiver_.is_valid()); |
| 113 MojoResult result = |
| 114 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr, |
| 115 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); |
| 116 DCHECK_EQ(MOJO_RESULT_OK, result); |
| 117 event_signalled_ = false; |
| 118 } |
| 119 |
106 // --------------------------------------------------------------------------- | 120 // --------------------------------------------------------------------------- |
107 // The following public methods (i.e., InterfaceEndpointController | 121 // The following public methods (i.e., InterfaceEndpointController |
108 // implementation) are called by the client on the same thread as the | 122 // implementation) are called by the client on the same thread as the |
109 // AttachClient() call. They are called outside of the router's lock. | 123 // AttachClient() call. They are called outside of the router's lock. |
110 | 124 |
111 bool SendMessage(Message* message) override { | 125 bool SendMessage(Message* message) override { |
112 DCHECK(task_runner_->BelongsToCurrentThread()); | 126 DCHECK(task_runner_->BelongsToCurrentThread()); |
113 message->set_interface_id(id_); | 127 message->set_interface_id(id_); |
114 return router_->connector_.Accept(message); | 128 return router_->connector_.Accept(message); |
115 } | 129 } |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
192 router_->lock_.AssertAcquired(); | 206 router_->lock_.AssertAcquired(); |
193 | 207 |
194 if (sync_message_event_receiver_.is_valid()) | 208 if (sync_message_event_receiver_.is_valid()) |
195 return; | 209 return; |
196 | 210 |
197 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, | 211 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, |
198 &sync_message_event_receiver_); | 212 &sync_message_event_receiver_); |
199 DCHECK_EQ(MOJO_RESULT_OK, result); | 213 DCHECK_EQ(MOJO_RESULT_OK, result); |
200 } | 214 } |
201 | 215 |
202 void ResetSyncMessageSignal() { | |
203 router_->lock_.AssertAcquired(); | |
204 | |
205 if (!event_signalled_) | |
206 return; | |
207 | |
208 DCHECK(sync_message_event_receiver_.is_valid()); | |
209 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(), | |
210 nullptr, nullptr, nullptr, nullptr, | |
211 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD); | |
212 DCHECK_EQ(MOJO_RESULT_OK, result); | |
213 event_signalled_ = false; | |
214 } | |
215 | |
216 // --------------------------------------------------------------------------- | 216 // --------------------------------------------------------------------------- |
217 // The following members are safe to access from any threads. | 217 // The following members are safe to access from any threads. |
218 | 218 |
219 MultiplexRouter* const router_; | 219 MultiplexRouter* const router_; |
220 const InterfaceId id_; | 220 const InterfaceId id_; |
221 | 221 |
222 // --------------------------------------------------------------------------- | 222 // --------------------------------------------------------------------------- |
223 // The following members are accessed under the router's lock. | 223 // The following members are accessed under the router's lock. |
224 | 224 |
225 // Whether the endpoint has been closed. | 225 // Whether the endpoint has been closed. |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
290 header_validator_(this), | 290 header_validator_(this), |
291 connector_(std::move(message_pipe), | 291 connector_(std::move(message_pipe), |
292 Connector::MULTI_THREADED_SEND, | 292 Connector::MULTI_THREADED_SEND, |
293 std::move(runner)), | 293 std::move(runner)), |
294 control_message_handler_(this), | 294 control_message_handler_(this), |
295 control_message_proxy_(&connector_), | 295 control_message_proxy_(&connector_), |
296 next_interface_id_value_(1), | 296 next_interface_id_value_(1), |
297 posted_to_process_tasks_(false), | 297 posted_to_process_tasks_(false), |
298 encountered_error_(false), | 298 encountered_error_(false), |
| 299 paused_(false), |
299 testing_mode_(false) { | 300 testing_mode_(false) { |
300 // Always participate in sync handle watching, because even if it doesn't | 301 // Always participate in sync handle watching, because even if it doesn't |
301 // expect sync requests during sync handle watching, it may still need to | 302 // expect sync requests during sync handle watching, it may still need to |
302 // dispatch messages to associated endpoints on a different thread. | 303 // dispatch messages to associated endpoints on a different thread. |
303 connector_.AllowWokenUpBySyncWatchOnSameThread(); | 304 connector_.AllowWokenUpBySyncWatchOnSameThread(); |
304 connector_.set_incoming_receiver(&header_validator_); | 305 connector_.set_incoming_receiver(&header_validator_); |
305 connector_.set_connection_error_handler( | 306 connector_.set_connection_error_handler( |
306 base::Bind(&MultiplexRouter::OnPipeConnectionError, | 307 base::Bind(&MultiplexRouter::OnPipeConnectionError, |
307 base::Unretained(this))); | 308 base::Unretained(this))); |
308 } | 309 } |
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
449 | 450 |
450 void MultiplexRouter::CloseMessagePipe() { | 451 void MultiplexRouter::CloseMessagePipe() { |
451 DCHECK(thread_checker_.CalledOnValidThread()); | 452 DCHECK(thread_checker_.CalledOnValidThread()); |
452 connector_.CloseMessagePipe(); | 453 connector_.CloseMessagePipe(); |
453 // CloseMessagePipe() above won't trigger connection error handler. | 454 // CloseMessagePipe() above won't trigger connection error handler. |
454 // Explicitly call OnPipeConnectionError() so that associated endpoints will | 455 // Explicitly call OnPipeConnectionError() so that associated endpoints will |
455 // get notified. | 456 // get notified. |
456 OnPipeConnectionError(); | 457 OnPipeConnectionError(); |
457 } | 458 } |
458 | 459 |
| 460 void MultiplexRouter::PauseIncomingMethodCallProcessing() { |
| 461 DCHECK(thread_checker_.CalledOnValidThread()); |
| 462 connector_.PauseIncomingMethodCallProcessing(); |
| 463 |
| 464 base::AutoLock locker(lock_); |
| 465 paused_ = true; |
| 466 |
| 467 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) |
| 468 iter->second->ResetSyncMessageSignal(); |
| 469 } |
| 470 |
| 471 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { |
| 472 DCHECK(thread_checker_.CalledOnValidThread()); |
| 473 connector_.ResumeIncomingMethodCallProcessing(); |
| 474 |
| 475 base::AutoLock locker(lock_); |
| 476 paused_ = false; |
| 477 |
| 478 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { |
| 479 auto sync_iter = sync_message_tasks_.find(iter->first); |
| 480 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) |
| 481 iter->second->SignalSyncMessageEvent(); |
| 482 } |
| 483 |
| 484 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); |
| 485 } |
| 486 |
459 bool MultiplexRouter::HasAssociatedEndpoints() const { | 487 bool MultiplexRouter::HasAssociatedEndpoints() const { |
460 DCHECK(thread_checker_.CalledOnValidThread()); | 488 DCHECK(thread_checker_.CalledOnValidThread()); |
461 base::AutoLock locker(lock_); | 489 base::AutoLock locker(lock_); |
462 | 490 |
463 if (endpoints_.size() > 1) | 491 if (endpoints_.size() > 1) |
464 return true; | 492 return true; |
465 if (endpoints_.size() == 0) | 493 if (endpoints_.size() == 0) |
466 return false; | 494 return false; |
467 | 495 |
468 return !ContainsKey(endpoints_, kMasterInterfaceId); | 496 return !ContainsKey(endpoints_, kMasterInterfaceId); |
469 } | 497 } |
470 | 498 |
471 void MultiplexRouter::EnableTestingMode() { | 499 void MultiplexRouter::EnableTestingMode() { |
472 DCHECK(thread_checker_.CalledOnValidThread()); | 500 DCHECK(thread_checker_.CalledOnValidThread()); |
473 base::AutoLock locker(lock_); | 501 base::AutoLock locker(lock_); |
474 | 502 |
475 testing_mode_ = true; | 503 testing_mode_ = true; |
476 connector_.set_enforce_errors_from_incoming_receiver(false); | 504 connector_.set_enforce_errors_from_incoming_receiver(false); |
477 } | 505 } |
478 | 506 |
479 bool MultiplexRouter::Accept(Message* message) { | 507 bool MultiplexRouter::Accept(Message* message) { |
480 DCHECK(thread_checker_.CalledOnValidThread()); | 508 DCHECK(thread_checker_.CalledOnValidThread()); |
481 | 509 |
482 scoped_refptr<MultiplexRouter> protector(this); | 510 scoped_refptr<MultiplexRouter> protector(this); |
483 base::AutoLock locker(lock_); | 511 base::AutoLock locker(lock_); |
484 | 512 |
| 513 DCHECK(!paused_); |
| 514 |
485 ClientCallBehavior client_call_behavior = | 515 ClientCallBehavior client_call_behavior = |
486 connector_.during_sync_handle_watcher_callback() | 516 connector_.during_sync_handle_watcher_callback() |
487 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 517 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
488 : ALLOW_DIRECT_CLIENT_CALLS; | 518 : ALLOW_DIRECT_CLIENT_CALLS; |
489 | 519 |
490 bool processed = | 520 bool processed = |
491 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, | 521 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, |
492 connector_.task_runner()); | 522 connector_.task_runner()); |
493 | 523 |
494 if (!processed) { | 524 if (!processed) { |
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
583 } | 613 } |
584 | 614 |
585 void MultiplexRouter::ProcessTasks( | 615 void MultiplexRouter::ProcessTasks( |
586 ClientCallBehavior client_call_behavior, | 616 ClientCallBehavior client_call_behavior, |
587 base::SingleThreadTaskRunner* current_task_runner) { | 617 base::SingleThreadTaskRunner* current_task_runner) { |
588 lock_.AssertAcquired(); | 618 lock_.AssertAcquired(); |
589 | 619 |
590 if (posted_to_process_tasks_) | 620 if (posted_to_process_tasks_) |
591 return; | 621 return; |
592 | 622 |
593 while (!tasks_.empty()) { | 623 while (!tasks_.empty() && !paused_) { |
594 std::unique_ptr<Task> task(std::move(tasks_.front())); | 624 std::unique_ptr<Task> task(std::move(tasks_.front())); |
595 tasks_.pop_front(); | 625 tasks_.pop_front(); |
596 | 626 |
597 InterfaceId id = kInvalidInterfaceId; | 627 InterfaceId id = kInvalidInterfaceId; |
598 bool sync_message = task->IsMessageTask() && task->message && | 628 bool sync_message = task->IsMessageTask() && task->message && |
599 task->message->has_flag(Message::kFlagIsSync); | 629 task->message->has_flag(Message::kFlagIsSync); |
600 if (sync_message) { | 630 if (sync_message) { |
601 id = task->message->interface_id(); | 631 id = task->message->interface_id(); |
602 auto& sync_message_queue = sync_message_tasks_[id]; | 632 auto& sync_message_queue = sync_message_tasks_[id]; |
603 DCHECK_EQ(task.get(), sync_message_queue.front()); | 633 DCHECK_EQ(task.get(), sync_message_queue.front()); |
(...skipping 24 matching lines...) Expand all Loading... |
628 } | 658 } |
629 } | 659 } |
630 | 660 |
631 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { | 661 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { |
632 lock_.AssertAcquired(); | 662 lock_.AssertAcquired(); |
633 | 663 |
634 auto iter = sync_message_tasks_.find(id); | 664 auto iter = sync_message_tasks_.find(id); |
635 if (iter == sync_message_tasks_.end()) | 665 if (iter == sync_message_tasks_.end()) |
636 return false; | 666 return false; |
637 | 667 |
| 668 if (paused_) |
| 669 return true; |
| 670 |
638 MultiplexRouter::Task* task = iter->second.front(); | 671 MultiplexRouter::Task* task = iter->second.front(); |
639 iter->second.pop_front(); | 672 iter->second.pop_front(); |
640 | 673 |
641 DCHECK(task->IsMessageTask()); | 674 DCHECK(task->IsMessageTask()); |
642 std::unique_ptr<Message> message(std::move(task->message)); | 675 std::unique_ptr<Message> message(std::move(task->message)); |
643 | 676 |
644 // Note: after this call, |task| and |iter| may be invalidated. | 677 // Note: after this call, |task| and |iter| may be invalidated. |
645 bool processed = ProcessIncomingMessage( | 678 bool processed = ProcessIncomingMessage( |
646 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); | 679 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); |
647 DCHECK(processed); | 680 DCHECK(processed); |
648 | 681 |
649 iter = sync_message_tasks_.find(id); | 682 iter = sync_message_tasks_.find(id); |
650 if (iter == sync_message_tasks_.end()) | 683 if (iter == sync_message_tasks_.end()) |
651 return false; | 684 return false; |
652 | 685 |
653 if (iter->second.empty()) { | 686 if (iter->second.empty()) { |
654 sync_message_tasks_.erase(iter); | 687 sync_message_tasks_.erase(iter); |
655 return false; | 688 return false; |
656 } | 689 } |
657 | 690 |
658 return true; | 691 return true; |
659 } | 692 } |
660 | 693 |
661 bool MultiplexRouter::ProcessNotifyErrorTask( | 694 bool MultiplexRouter::ProcessNotifyErrorTask( |
662 Task* task, | 695 Task* task, |
663 ClientCallBehavior client_call_behavior, | 696 ClientCallBehavior client_call_behavior, |
664 base::SingleThreadTaskRunner* current_task_runner) { | 697 base::SingleThreadTaskRunner* current_task_runner) { |
665 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 698 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 699 DCHECK(!paused_); |
| 700 |
666 lock_.AssertAcquired(); | 701 lock_.AssertAcquired(); |
667 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); | 702 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); |
668 if (!endpoint->client()) | 703 if (!endpoint->client()) |
669 return true; | 704 return true; |
670 | 705 |
671 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || | 706 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || |
672 endpoint->task_runner() != current_task_runner) { | 707 endpoint->task_runner() != current_task_runner) { |
673 MaybePostToProcessTasks(endpoint->task_runner()); | 708 MaybePostToProcessTasks(endpoint->task_runner()); |
674 return false; | 709 return false; |
675 } | 710 } |
(...skipping 11 matching lines...) Expand all Loading... |
687 client->NotifyError(); | 722 client->NotifyError(); |
688 } | 723 } |
689 return true; | 724 return true; |
690 } | 725 } |
691 | 726 |
692 bool MultiplexRouter::ProcessIncomingMessage( | 727 bool MultiplexRouter::ProcessIncomingMessage( |
693 Message* message, | 728 Message* message, |
694 ClientCallBehavior client_call_behavior, | 729 ClientCallBehavior client_call_behavior, |
695 base::SingleThreadTaskRunner* current_task_runner) { | 730 base::SingleThreadTaskRunner* current_task_runner) { |
696 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 731 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 732 DCHECK(!paused_); |
697 lock_.AssertAcquired(); | 733 lock_.AssertAcquired(); |
698 | 734 |
699 if (!message) { | 735 if (!message) { |
700 // This is a sync message and has been processed during sync handle | 736 // This is a sync message and has been processed during sync handle |
701 // watching. | 737 // watching. |
702 return true; | 738 return true; |
703 } | 739 } |
704 | 740 |
705 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 741 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
706 if (!control_message_handler_.Accept(message)) | 742 if (!control_message_handler_.Accept(message)) |
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
840 *inserted = true; | 876 *inserted = true; |
841 } else { | 877 } else { |
842 endpoint = iter->second.get(); | 878 endpoint = iter->second.get(); |
843 } | 879 } |
844 | 880 |
845 return endpoint; | 881 return endpoint; |
846 } | 882 } |
847 | 883 |
848 } // namespace internal | 884 } // namespace internal |
849 } // namespace mojo | 885 } // namespace mojo |
OLD | NEW |