Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1105)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2165153002: Mojo C++ bindings: fix Pause/Resume for MultiplexRouter. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
96 return; 96 return;
97 97
98 EnsureEventMessagePipeExists(); 98 EnsureEventMessagePipeExists();
99 event_signalled_ = true; 99 event_signalled_ = true;
100 MojoResult result = 100 MojoResult result =
101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr, 101 WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
102 0, MOJO_WRITE_MESSAGE_FLAG_NONE); 102 0, MOJO_WRITE_MESSAGE_FLAG_NONE);
103 DCHECK_EQ(MOJO_RESULT_OK, result); 103 DCHECK_EQ(MOJO_RESULT_OK, result);
104 } 104 }
105 105
106 void ResetSyncMessageSignal() {
107 router_->lock_.AssertAcquired();
108
109 if (!event_signalled_)
110 return;
111
112 DCHECK(sync_message_event_receiver_.is_valid());
113 MojoResult result =
114 ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr,
115 nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
116 DCHECK_EQ(MOJO_RESULT_OK, result);
117 event_signalled_ = false;
118 }
119
106 // --------------------------------------------------------------------------- 120 // ---------------------------------------------------------------------------
107 // The following public methods (i.e., InterfaceEndpointController 121 // The following public methods (i.e., InterfaceEndpointController
108 // implementation) are called by the client on the same thread as the 122 // implementation) are called by the client on the same thread as the
109 // AttachClient() call. They are called outside of the router's lock. 123 // AttachClient() call. They are called outside of the router's lock.
110 124
111 bool SendMessage(Message* message) override { 125 bool SendMessage(Message* message) override {
112 DCHECK(task_runner_->BelongsToCurrentThread()); 126 DCHECK(task_runner_->BelongsToCurrentThread());
113 message->set_interface_id(id_); 127 message->set_interface_id(id_);
114 return router_->connector_.Accept(message); 128 return router_->connector_.Accept(message);
115 } 129 }
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
192 router_->lock_.AssertAcquired(); 206 router_->lock_.AssertAcquired();
193 207
194 if (sync_message_event_receiver_.is_valid()) 208 if (sync_message_event_receiver_.is_valid())
195 return; 209 return;
196 210
197 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_, 211 MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
198 &sync_message_event_receiver_); 212 &sync_message_event_receiver_);
199 DCHECK_EQ(MOJO_RESULT_OK, result); 213 DCHECK_EQ(MOJO_RESULT_OK, result);
200 } 214 }
201 215
202 void ResetSyncMessageSignal() {
203 router_->lock_.AssertAcquired();
204
205 if (!event_signalled_)
206 return;
207
208 DCHECK(sync_message_event_receiver_.is_valid());
209 MojoResult result = ReadMessageRaw(sync_message_event_receiver_.get(),
210 nullptr, nullptr, nullptr, nullptr,
211 MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
212 DCHECK_EQ(MOJO_RESULT_OK, result);
213 event_signalled_ = false;
214 }
215
216 // --------------------------------------------------------------------------- 216 // ---------------------------------------------------------------------------
217 // The following members are safe to access from any threads. 217 // The following members are safe to access from any threads.
218 218
219 MultiplexRouter* const router_; 219 MultiplexRouter* const router_;
220 const InterfaceId id_; 220 const InterfaceId id_;
221 221
222 // --------------------------------------------------------------------------- 222 // ---------------------------------------------------------------------------
223 // The following members are accessed under the router's lock. 223 // The following members are accessed under the router's lock.
224 224
225 // Whether the endpoint has been closed. 225 // Whether the endpoint has been closed.
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 289 set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
290 header_validator_(this), 290 header_validator_(this),
291 connector_(std::move(message_pipe), 291 connector_(std::move(message_pipe),
292 Connector::MULTI_THREADED_SEND, 292 Connector::MULTI_THREADED_SEND,
293 std::move(runner)), 293 std::move(runner)),
294 control_message_handler_(this), 294 control_message_handler_(this),
295 control_message_proxy_(&connector_), 295 control_message_proxy_(&connector_),
296 next_interface_id_value_(1), 296 next_interface_id_value_(1),
297 posted_to_process_tasks_(false), 297 posted_to_process_tasks_(false),
298 encountered_error_(false), 298 encountered_error_(false),
299 paused_(false),
299 testing_mode_(false) { 300 testing_mode_(false) {
300 // Always participate in sync handle watching, because even if it doesn't 301 // Always participate in sync handle watching, because even if it doesn't
301 // expect sync requests during sync handle watching, it may still need to 302 // expect sync requests during sync handle watching, it may still need to
302 // dispatch messages to associated endpoints on a different thread. 303 // dispatch messages to associated endpoints on a different thread.
303 connector_.AllowWokenUpBySyncWatchOnSameThread(); 304 connector_.AllowWokenUpBySyncWatchOnSameThread();
304 connector_.set_incoming_receiver(&header_validator_); 305 connector_.set_incoming_receiver(&header_validator_);
305 connector_.set_connection_error_handler( 306 connector_.set_connection_error_handler(
306 base::Bind(&MultiplexRouter::OnPipeConnectionError, 307 base::Bind(&MultiplexRouter::OnPipeConnectionError,
307 base::Unretained(this))); 308 base::Unretained(this)));
308 } 309 }
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after
449 450
450 void MultiplexRouter::CloseMessagePipe() { 451 void MultiplexRouter::CloseMessagePipe() {
451 DCHECK(thread_checker_.CalledOnValidThread()); 452 DCHECK(thread_checker_.CalledOnValidThread());
452 connector_.CloseMessagePipe(); 453 connector_.CloseMessagePipe();
453 // CloseMessagePipe() above won't trigger connection error handler. 454 // CloseMessagePipe() above won't trigger connection error handler.
454 // Explicitly call OnPipeConnectionError() so that associated endpoints will 455 // Explicitly call OnPipeConnectionError() so that associated endpoints will
455 // get notified. 456 // get notified.
456 OnPipeConnectionError(); 457 OnPipeConnectionError();
457 } 458 }
458 459
460 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
461 DCHECK(thread_checker_.CalledOnValidThread());
462 connector_.PauseIncomingMethodCallProcessing();
463
464 base::AutoLock locker(lock_);
465 paused_ = true;
466
467 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
468 iter->second->ResetSyncMessageSignal();
469 }
470
471 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
472 DCHECK(thread_checker_.CalledOnValidThread());
473 connector_.ResumeIncomingMethodCallProcessing();
474
475 base::AutoLock locker(lock_);
476 paused_ = false;
477
478 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
479 auto sync_iter = sync_message_tasks_.find(iter->first);
480 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty())
481 iter->second->SignalSyncMessageEvent();
482 }
483
484 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
485 }
486
459 bool MultiplexRouter::HasAssociatedEndpoints() const { 487 bool MultiplexRouter::HasAssociatedEndpoints() const {
460 DCHECK(thread_checker_.CalledOnValidThread()); 488 DCHECK(thread_checker_.CalledOnValidThread());
461 base::AutoLock locker(lock_); 489 base::AutoLock locker(lock_);
462 490
463 if (endpoints_.size() > 1) 491 if (endpoints_.size() > 1)
464 return true; 492 return true;
465 if (endpoints_.size() == 0) 493 if (endpoints_.size() == 0)
466 return false; 494 return false;
467 495
468 return !ContainsKey(endpoints_, kMasterInterfaceId); 496 return !ContainsKey(endpoints_, kMasterInterfaceId);
469 } 497 }
470 498
471 void MultiplexRouter::EnableTestingMode() { 499 void MultiplexRouter::EnableTestingMode() {
472 DCHECK(thread_checker_.CalledOnValidThread()); 500 DCHECK(thread_checker_.CalledOnValidThread());
473 base::AutoLock locker(lock_); 501 base::AutoLock locker(lock_);
474 502
475 testing_mode_ = true; 503 testing_mode_ = true;
476 connector_.set_enforce_errors_from_incoming_receiver(false); 504 connector_.set_enforce_errors_from_incoming_receiver(false);
477 } 505 }
478 506
479 bool MultiplexRouter::Accept(Message* message) { 507 bool MultiplexRouter::Accept(Message* message) {
480 DCHECK(thread_checker_.CalledOnValidThread()); 508 DCHECK(thread_checker_.CalledOnValidThread());
481 509
482 scoped_refptr<MultiplexRouter> protector(this); 510 scoped_refptr<MultiplexRouter> protector(this);
483 base::AutoLock locker(lock_); 511 base::AutoLock locker(lock_);
484 512
513 DCHECK(!paused_);
514
485 ClientCallBehavior client_call_behavior = 515 ClientCallBehavior client_call_behavior =
486 connector_.during_sync_handle_watcher_callback() 516 connector_.during_sync_handle_watcher_callback()
487 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 517 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
488 : ALLOW_DIRECT_CLIENT_CALLS; 518 : ALLOW_DIRECT_CLIENT_CALLS;
489 519
490 bool processed = 520 bool processed =
491 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, 521 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
492 connector_.task_runner()); 522 connector_.task_runner());
493 523
494 if (!processed) { 524 if (!processed) {
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
583 } 613 }
584 614
585 void MultiplexRouter::ProcessTasks( 615 void MultiplexRouter::ProcessTasks(
586 ClientCallBehavior client_call_behavior, 616 ClientCallBehavior client_call_behavior,
587 base::SingleThreadTaskRunner* current_task_runner) { 617 base::SingleThreadTaskRunner* current_task_runner) {
588 lock_.AssertAcquired(); 618 lock_.AssertAcquired();
589 619
590 if (posted_to_process_tasks_) 620 if (posted_to_process_tasks_)
591 return; 621 return;
592 622
593 while (!tasks_.empty()) { 623 while (!tasks_.empty() && !paused_) {
594 std::unique_ptr<Task> task(std::move(tasks_.front())); 624 std::unique_ptr<Task> task(std::move(tasks_.front()));
595 tasks_.pop_front(); 625 tasks_.pop_front();
596 626
597 InterfaceId id = kInvalidInterfaceId; 627 InterfaceId id = kInvalidInterfaceId;
598 bool sync_message = task->IsMessageTask() && task->message && 628 bool sync_message = task->IsMessageTask() && task->message &&
599 task->message->has_flag(Message::kFlagIsSync); 629 task->message->has_flag(Message::kFlagIsSync);
600 if (sync_message) { 630 if (sync_message) {
601 id = task->message->interface_id(); 631 id = task->message->interface_id();
602 auto& sync_message_queue = sync_message_tasks_[id]; 632 auto& sync_message_queue = sync_message_tasks_[id];
603 DCHECK_EQ(task.get(), sync_message_queue.front()); 633 DCHECK_EQ(task.get(), sync_message_queue.front());
(...skipping 24 matching lines...) Expand all
628 } 658 }
629 } 659 }
630 660
631 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { 661 bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) {
632 lock_.AssertAcquired(); 662 lock_.AssertAcquired();
633 663
634 auto iter = sync_message_tasks_.find(id); 664 auto iter = sync_message_tasks_.find(id);
635 if (iter == sync_message_tasks_.end()) 665 if (iter == sync_message_tasks_.end())
636 return false; 666 return false;
637 667
668 if (paused_)
669 return true;
670
638 MultiplexRouter::Task* task = iter->second.front(); 671 MultiplexRouter::Task* task = iter->second.front();
639 iter->second.pop_front(); 672 iter->second.pop_front();
640 673
641 DCHECK(task->IsMessageTask()); 674 DCHECK(task->IsMessageTask());
642 std::unique_ptr<Message> message(std::move(task->message)); 675 std::unique_ptr<Message> message(std::move(task->message));
643 676
644 // Note: after this call, |task| and |iter| may be invalidated. 677 // Note: after this call, |task| and |iter| may be invalidated.
645 bool processed = ProcessIncomingMessage( 678 bool processed = ProcessIncomingMessage(
646 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); 679 message.get(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
647 DCHECK(processed); 680 DCHECK(processed);
648 681
649 iter = sync_message_tasks_.find(id); 682 iter = sync_message_tasks_.find(id);
650 if (iter == sync_message_tasks_.end()) 683 if (iter == sync_message_tasks_.end())
651 return false; 684 return false;
652 685
653 if (iter->second.empty()) { 686 if (iter->second.empty()) {
654 sync_message_tasks_.erase(iter); 687 sync_message_tasks_.erase(iter);
655 return false; 688 return false;
656 } 689 }
657 690
658 return true; 691 return true;
659 } 692 }
660 693
661 bool MultiplexRouter::ProcessNotifyErrorTask( 694 bool MultiplexRouter::ProcessNotifyErrorTask(
662 Task* task, 695 Task* task,
663 ClientCallBehavior client_call_behavior, 696 ClientCallBehavior client_call_behavior,
664 base::SingleThreadTaskRunner* current_task_runner) { 697 base::SingleThreadTaskRunner* current_task_runner) {
665 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 698 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
699 DCHECK(!paused_);
700
666 lock_.AssertAcquired(); 701 lock_.AssertAcquired();
667 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 702 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
668 if (!endpoint->client()) 703 if (!endpoint->client())
669 return true; 704 return true;
670 705
671 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || 706 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
672 endpoint->task_runner() != current_task_runner) { 707 endpoint->task_runner() != current_task_runner) {
673 MaybePostToProcessTasks(endpoint->task_runner()); 708 MaybePostToProcessTasks(endpoint->task_runner());
674 return false; 709 return false;
675 } 710 }
(...skipping 11 matching lines...) Expand all
687 client->NotifyError(); 722 client->NotifyError();
688 } 723 }
689 return true; 724 return true;
690 } 725 }
691 726
692 bool MultiplexRouter::ProcessIncomingMessage( 727 bool MultiplexRouter::ProcessIncomingMessage(
693 Message* message, 728 Message* message,
694 ClientCallBehavior client_call_behavior, 729 ClientCallBehavior client_call_behavior,
695 base::SingleThreadTaskRunner* current_task_runner) { 730 base::SingleThreadTaskRunner* current_task_runner) {
696 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 731 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
732 DCHECK(!paused_);
697 lock_.AssertAcquired(); 733 lock_.AssertAcquired();
698 734
699 if (!message) { 735 if (!message) {
700 // This is a sync message and has been processed during sync handle 736 // This is a sync message and has been processed during sync handle
701 // watching. 737 // watching.
702 return true; 738 return true;
703 } 739 }
704 740
705 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 741 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
706 if (!control_message_handler_.Accept(message)) 742 if (!control_message_handler_.Accept(message))
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
840 *inserted = true; 876 *inserted = true;
841 } else { 877 } else {
842 endpoint = iter->second.get(); 878 endpoint = iter->second.get();
843 } 879 }
844 880
845 return endpoint; 881 return endpoint;
846 } 882 }
847 883
848 } // namespace internal 884 } // namespace internal
849 } // namespace mojo 885 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/router.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698