Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(52)

Side by Side Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2958703002: Rename TaskRunner::RunsTasksOnCurrentThread() in //ipc, //mojo (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | mojo/public/cpp/bindings/lib/multiplex_router.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_mojo_bootstrap.h" 5 #include "ipc/ipc_mojo_bootstrap.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <map> 9 #include <map>
10 #include <memory> 10 #include <memory>
(...skipping 340 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 mojo::InterfaceEndpointClient* client() const { 351 mojo::InterfaceEndpointClient* client() const {
352 controller_->lock_.AssertAcquired(); 352 controller_->lock_.AssertAcquired();
353 return client_; 353 return client_;
354 } 354 }
355 355
356 void AttachClient(mojo::InterfaceEndpointClient* client, 356 void AttachClient(mojo::InterfaceEndpointClient* client,
357 scoped_refptr<base::SequencedTaskRunner> runner) { 357 scoped_refptr<base::SequencedTaskRunner> runner) {
358 controller_->lock_.AssertAcquired(); 358 controller_->lock_.AssertAcquired();
359 DCHECK(!client_); 359 DCHECK(!client_);
360 DCHECK(!closed_); 360 DCHECK(!closed_);
361 DCHECK(runner->RunsTasksOnCurrentThread()); 361 DCHECK(runner->RunsTasksInCurrentSequence());
362 362
363 task_runner_ = std::move(runner); 363 task_runner_ = std::move(runner);
364 client_ = client; 364 client_ = client;
365 } 365 }
366 366
367 void DetachClient() { 367 void DetachClient() {
368 controller_->lock_.AssertAcquired(); 368 controller_->lock_.AssertAcquired();
369 DCHECK(client_); 369 DCHECK(client_);
370 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 370 DCHECK(task_runner_->RunsTasksInCurrentSequence());
371 DCHECK(!closed_); 371 DCHECK(!closed_);
372 372
373 task_runner_ = nullptr; 373 task_runner_ = nullptr;
374 client_ = nullptr; 374 client_ = nullptr;
375 sync_watcher_.reset(); 375 sync_watcher_.reset();
376 } 376 }
377 377
378 uint32_t EnqueueSyncMessage(MessageWrapper message) { 378 uint32_t EnqueueSyncMessage(MessageWrapper message) {
379 controller_->lock_.AssertAcquired(); 379 controller_->lock_.AssertAcquired();
380 uint32_t id = GenerateSyncMessageId(); 380 uint32_t id = GenerateSyncMessageId();
(...skipping 13 matching lines...) Expand all
394 controller_->lock_.AssertAcquired(); 394 controller_->lock_.AssertAcquired();
395 if (sync_messages_.empty() || sync_messages_.front().first != id) 395 if (sync_messages_.empty() || sync_messages_.front().first != id)
396 return MessageWrapper(); 396 return MessageWrapper();
397 MessageWrapper message = std::move(sync_messages_.front().second); 397 MessageWrapper message = std::move(sync_messages_.front().second);
398 sync_messages_.pop(); 398 sync_messages_.pop();
399 return message; 399 return message;
400 } 400 }
401 401
402 // mojo::InterfaceEndpointController: 402 // mojo::InterfaceEndpointController:
403 bool SendMessage(mojo::Message* message) override { 403 bool SendMessage(mojo::Message* message) override {
404 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 404 DCHECK(task_runner_->RunsTasksInCurrentSequence());
405 message->set_interface_id(id_); 405 message->set_interface_id(id_);
406 return controller_->SendMessage(message); 406 return controller_->SendMessage(message);
407 } 407 }
408 408
409 void AllowWokenUpBySyncWatchOnSameThread() override { 409 void AllowWokenUpBySyncWatchOnSameThread() override {
410 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 410 DCHECK(task_runner_->RunsTasksInCurrentSequence());
411 411
412 EnsureSyncWatcherExists(); 412 EnsureSyncWatcherExists();
413 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 413 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
414 } 414 }
415 415
416 bool SyncWatch(const bool* should_stop) override { 416 bool SyncWatch(const bool* should_stop) override {
417 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 417 DCHECK(task_runner_->RunsTasksInCurrentSequence());
418 418
419 // It's not legal to make sync calls from the master endpoint's thread, 419 // It's not legal to make sync calls from the master endpoint's thread,
420 // and in fact they must only happen from the proxy task runner. 420 // and in fact they must only happen from the proxy task runner.
421 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); 421 DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
422 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); 422 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
423 423
424 EnsureSyncWatcherExists(); 424 EnsureSyncWatcherExists();
425 return sync_watcher_->SyncWatch(should_stop); 425 return sync_watcher_->SyncWatch(should_stop);
426 } 426 }
427 427
428 private: 428 private:
429 friend class base::RefCountedThreadSafe<Endpoint>; 429 friend class base::RefCountedThreadSafe<Endpoint>;
430 430
431 ~Endpoint() override { 431 ~Endpoint() override {
432 controller_->lock_.AssertAcquired(); 432 controller_->lock_.AssertAcquired();
433 DCHECK(!client_); 433 DCHECK(!client_);
434 DCHECK(closed_); 434 DCHECK(closed_);
435 DCHECK(peer_closed_); 435 DCHECK(peer_closed_);
436 DCHECK(!sync_watcher_); 436 DCHECK(!sync_watcher_);
437 } 437 }
438 438
439 void OnSyncMessageEventReady() { 439 void OnSyncMessageEventReady() {
440 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 440 DCHECK(task_runner_->RunsTasksInCurrentSequence());
441 441
442 scoped_refptr<Endpoint> keepalive(this); 442 scoped_refptr<Endpoint> keepalive(this);
443 scoped_refptr<AssociatedGroupController> controller_keepalive( 443 scoped_refptr<AssociatedGroupController> controller_keepalive(
444 controller_); 444 controller_);
445 445
446 bool reset_sync_watcher = false; 446 bool reset_sync_watcher = false;
447 { 447 {
448 base::AutoLock locker(controller_->lock_); 448 base::AutoLock locker(controller_->lock_);
449 bool more_to_process = false; 449 bool more_to_process = false;
450 if (!sync_messages_.empty()) { 450 if (!sync_messages_.empty()) {
(...skipping 26 matching lines...) Expand all
477 477
478 if (reset_sync_watcher) { 478 if (reset_sync_watcher) {
479 // If a SyncWatch() call (or multiple ones) of this interface endpoint 479 // If a SyncWatch() call (or multiple ones) of this interface endpoint
480 // is on the call stack, resetting the sync watcher will allow it to 480 // is on the call stack, resetting the sync watcher will allow it to
481 // exit when the call stack unwinds to that frame. 481 // exit when the call stack unwinds to that frame.
482 sync_watcher_.reset(); 482 sync_watcher_.reset();
483 } 483 }
484 } 484 }
485 485
486 void EnsureSyncWatcherExists() { 486 void EnsureSyncWatcherExists() {
487 DCHECK(task_runner_->RunsTasksOnCurrentThread()); 487 DCHECK(task_runner_->RunsTasksInCurrentSequence());
488 if (sync_watcher_) 488 if (sync_watcher_)
489 return; 489 return;
490 490
491 { 491 {
492 base::AutoLock locker(controller_->lock_); 492 base::AutoLock locker(controller_->lock_);
493 if (!sync_message_event_) { 493 if (!sync_message_event_) {
494 sync_message_event_ = base::MakeUnique<base::WaitableEvent>( 494 sync_message_event_ = base::MakeUnique<base::WaitableEvent>(
495 base::WaitableEvent::ResetPolicy::MANUAL, 495 base::WaitableEvent::ResetPolicy::MANUAL,
496 base::WaitableEvent::InitialState::NOT_SIGNALED); 496 base::WaitableEvent::InitialState::NOT_SIGNALED);
497 if (peer_closed_ || !sync_messages_.empty()) 497 if (peer_closed_ || !sync_messages_.empty())
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after
620 // Because a notification may in turn detach any endpoint, we have to 620 // Because a notification may in turn detach any endpoint, we have to
621 // check each client again here. 621 // check each client again here.
622 if (endpoint->client()) 622 if (endpoint->client())
623 NotifyEndpointOfError(endpoint.get(), false /* force_async */); 623 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
624 } 624 }
625 } 625 }
626 626
627 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { 627 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
628 lock_.AssertAcquired(); 628 lock_.AssertAcquired();
629 DCHECK(endpoint->task_runner() && endpoint->client()); 629 DCHECK(endpoint->task_runner() && endpoint->client());
630 if (endpoint->task_runner()->RunsTasksOnCurrentThread() && !force_async) { 630 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) {
631 mojo::InterfaceEndpointClient* client = endpoint->client(); 631 mojo::InterfaceEndpointClient* client = endpoint->client();
632 base::Optional<mojo::DisconnectReason> reason( 632 base::Optional<mojo::DisconnectReason> reason(
633 endpoint->disconnect_reason()); 633 endpoint->disconnect_reason());
634 634
635 base::AutoUnlock unlocker(lock_); 635 base::AutoUnlock unlocker(lock_);
636 client->NotifyError(reason); 636 client->NotifyError(reason);
637 } else { 637 } else {
638 endpoint->task_runner()->PostTask( 638 endpoint->task_runner()->PostTask(
639 FROM_HERE, 639 FROM_HERE,
640 base::Bind(&ChannelAssociatedGroupController 640 base::Bind(&ChannelAssociatedGroupController
641 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), 641 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(),
642 endpoint)); 642 endpoint));
643 } 643 }
644 } 644 }
645 645
646 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, 646 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
647 Endpoint* endpoint) { 647 Endpoint* endpoint) {
648 base::AutoLock locker(lock_); 648 base::AutoLock locker(lock_);
649 auto iter = endpoints_.find(id); 649 auto iter = endpoints_.find(id);
650 if (iter == endpoints_.end() || iter->second.get() != endpoint) 650 if (iter == endpoints_.end() || iter->second.get() != endpoint)
651 return; 651 return;
652 if (!endpoint->client()) 652 if (!endpoint->client())
653 return; 653 return;
654 654
655 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); 655 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
656 NotifyEndpointOfError(endpoint, false /* force_async */); 656 NotifyEndpointOfError(endpoint, false /* force_async */);
657 } 657 }
658 658
659 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { 659 void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
660 lock_.AssertAcquired(); 660 lock_.AssertAcquired();
661 endpoint->set_closed(); 661 endpoint->set_closed();
662 if (endpoint->closed() && endpoint->peer_closed()) 662 if (endpoint->closed() && endpoint->peer_closed())
663 endpoints_.erase(endpoint->id()); 663 endpoints_.erase(endpoint->id());
664 } 664 }
665 665
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
703 703
704 mojo::InterfaceId id = message->interface_id(); 704 mojo::InterfaceId id = message->interface_id();
705 DCHECK(mojo::IsValidInterfaceId(id)); 705 DCHECK(mojo::IsValidInterfaceId(id));
706 706
707 base::AutoLock locker(lock_); 707 base::AutoLock locker(lock_);
708 Endpoint* endpoint = FindEndpoint(id); 708 Endpoint* endpoint = FindEndpoint(id);
709 if (!endpoint) 709 if (!endpoint)
710 return true; 710 return true;
711 711
712 mojo::InterfaceEndpointClient* client = endpoint->client(); 712 mojo::InterfaceEndpointClient* client = endpoint->client();
713 if (!client || !endpoint->task_runner()->RunsTasksOnCurrentThread()) { 713 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) {
714 // No client has been bound yet or the client runs tasks on another 714 // No client has been bound yet or the client runs tasks on another
715 // thread. We assume the other thread must always be the one on which 715 // thread. We assume the other thread must always be the one on which
716 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. 716 // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
717 // 717 //
718 // If the client is not yet bound, it must be bound by the time this task 718 // If the client is not yet bound, it must be bound by the time this task
719 // runs or else it's programmer error. 719 // runs or else it's programmer error.
720 DCHECK(proxy_task_runner_); 720 DCHECK(proxy_task_runner_);
721 721
722 if (message->has_flag(mojo::Message::kFlagIsSync)) { 722 if (message->has_flag(mojo::Message::kFlagIsSync)) {
723 MessageWrapper message_wrapper(this, std::move(*message)); 723 MessageWrapper message_wrapper(this, std::move(*message));
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
759 759
760 base::AutoLock locker(lock_); 760 base::AutoLock locker(lock_);
761 Endpoint* endpoint = FindEndpoint(id); 761 Endpoint* endpoint = FindEndpoint(id);
762 if (!endpoint) 762 if (!endpoint)
763 return; 763 return;
764 764
765 mojo::InterfaceEndpointClient* client = endpoint->client(); 765 mojo::InterfaceEndpointClient* client = endpoint->client();
766 if (!client) 766 if (!client)
767 return; 767 return;
768 768
769 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); 769 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
770 770
771 // Sync messages should never make their way to this method. 771 // Sync messages should never make their way to this method.
772 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); 772 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
773 773
774 bool result = false; 774 bool result = false;
775 { 775 {
776 base::AutoUnlock unlocker(lock_); 776 base::AutoUnlock unlocker(lock_);
777 result = client->HandleIncomingMessage(&message); 777 result = client->HandleIncomingMessage(&message);
778 } 778 }
779 779
780 if (!result) 780 if (!result)
781 RaiseError(); 781 RaiseError();
782 } 782 }
783 783
784 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { 784 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
785 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 785 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
786 786
787 base::AutoLock locker(lock_); 787 base::AutoLock locker(lock_);
788 Endpoint* endpoint = FindEndpoint(interface_id); 788 Endpoint* endpoint = FindEndpoint(interface_id);
789 if (!endpoint) 789 if (!endpoint)
790 return; 790 return;
791 791
792 // Careful, if the endpoint is detached its members are cleared. Check for 792 // Careful, if the endpoint is detached its members are cleared. Check for
793 // that before dereferencing. 793 // that before dereferencing.
794 mojo::InterfaceEndpointClient* client = endpoint->client(); 794 mojo::InterfaceEndpointClient* client = endpoint->client();
795 if (!client) 795 if (!client)
796 return; 796 return;
797 797
798 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); 798 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence());
799 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); 799 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
800 800
801 // The message must have already been dequeued by the endpoint waking up 801 // The message must have already been dequeued by the endpoint waking up
802 // from a sync wait. Nothing to do. 802 // from a sync wait. Nothing to do.
803 if (message_wrapper.value().IsNull()) 803 if (message_wrapper.value().IsNull())
804 return; 804 return;
805 805
806 bool result = false; 806 bool result = false;
807 { 807 {
808 base::AutoUnlock unlocker(lock_); 808 base::AutoUnlock unlocker(lock_);
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
918 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( 918 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create(
919 mojo::ScopedMessagePipeHandle handle, 919 mojo::ScopedMessagePipeHandle handle,
920 Channel::Mode mode, 920 Channel::Mode mode,
921 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 921 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
922 return base::MakeUnique<MojoBootstrapImpl>( 922 return base::MakeUnique<MojoBootstrapImpl>(
923 std::move(handle), new ChannelAssociatedGroupController( 923 std::move(handle), new ChannelAssociatedGroupController(
924 mode == Channel::MODE_SERVER, ipc_task_runner)); 924 mode == Channel::MODE_SERVER, ipc_task_runner));
925 } 925 }
926 926
927 } // namespace IPC 927 } // namespace IPC
OLDNEW
« no previous file with comments | « no previous file | mojo/public/cpp/bindings/lib/multiplex_router.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698