OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <map> | 9 #include <map> |
10 #include <memory> | 10 #include <memory> |
(...skipping 340 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
351 mojo::InterfaceEndpointClient* client() const { | 351 mojo::InterfaceEndpointClient* client() const { |
352 controller_->lock_.AssertAcquired(); | 352 controller_->lock_.AssertAcquired(); |
353 return client_; | 353 return client_; |
354 } | 354 } |
355 | 355 |
356 void AttachClient(mojo::InterfaceEndpointClient* client, | 356 void AttachClient(mojo::InterfaceEndpointClient* client, |
357 scoped_refptr<base::SequencedTaskRunner> runner) { | 357 scoped_refptr<base::SequencedTaskRunner> runner) { |
358 controller_->lock_.AssertAcquired(); | 358 controller_->lock_.AssertAcquired(); |
359 DCHECK(!client_); | 359 DCHECK(!client_); |
360 DCHECK(!closed_); | 360 DCHECK(!closed_); |
361 DCHECK(runner->RunsTasksOnCurrentThread()); | 361 DCHECK(runner->RunsTasksInCurrentSequence()); |
362 | 362 |
363 task_runner_ = std::move(runner); | 363 task_runner_ = std::move(runner); |
364 client_ = client; | 364 client_ = client; |
365 } | 365 } |
366 | 366 |
367 void DetachClient() { | 367 void DetachClient() { |
368 controller_->lock_.AssertAcquired(); | 368 controller_->lock_.AssertAcquired(); |
369 DCHECK(client_); | 369 DCHECK(client_); |
370 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 370 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
371 DCHECK(!closed_); | 371 DCHECK(!closed_); |
372 | 372 |
373 task_runner_ = nullptr; | 373 task_runner_ = nullptr; |
374 client_ = nullptr; | 374 client_ = nullptr; |
375 sync_watcher_.reset(); | 375 sync_watcher_.reset(); |
376 } | 376 } |
377 | 377 |
378 uint32_t EnqueueSyncMessage(MessageWrapper message) { | 378 uint32_t EnqueueSyncMessage(MessageWrapper message) { |
379 controller_->lock_.AssertAcquired(); | 379 controller_->lock_.AssertAcquired(); |
380 uint32_t id = GenerateSyncMessageId(); | 380 uint32_t id = GenerateSyncMessageId(); |
(...skipping 13 matching lines...) Expand all Loading... |
394 controller_->lock_.AssertAcquired(); | 394 controller_->lock_.AssertAcquired(); |
395 if (sync_messages_.empty() || sync_messages_.front().first != id) | 395 if (sync_messages_.empty() || sync_messages_.front().first != id) |
396 return MessageWrapper(); | 396 return MessageWrapper(); |
397 MessageWrapper message = std::move(sync_messages_.front().second); | 397 MessageWrapper message = std::move(sync_messages_.front().second); |
398 sync_messages_.pop(); | 398 sync_messages_.pop(); |
399 return message; | 399 return message; |
400 } | 400 } |
401 | 401 |
402 // mojo::InterfaceEndpointController: | 402 // mojo::InterfaceEndpointController: |
403 bool SendMessage(mojo::Message* message) override { | 403 bool SendMessage(mojo::Message* message) override { |
404 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 404 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
405 message->set_interface_id(id_); | 405 message->set_interface_id(id_); |
406 return controller_->SendMessage(message); | 406 return controller_->SendMessage(message); |
407 } | 407 } |
408 | 408 |
409 void AllowWokenUpBySyncWatchOnSameThread() override { | 409 void AllowWokenUpBySyncWatchOnSameThread() override { |
410 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 410 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
411 | 411 |
412 EnsureSyncWatcherExists(); | 412 EnsureSyncWatcherExists(); |
413 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 413 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
414 } | 414 } |
415 | 415 |
416 bool SyncWatch(const bool* should_stop) override { | 416 bool SyncWatch(const bool* should_stop) override { |
417 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 417 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
418 | 418 |
419 // It's not legal to make sync calls from the master endpoint's thread, | 419 // It's not legal to make sync calls from the master endpoint's thread, |
420 // and in fact they must only happen from the proxy task runner. | 420 // and in fact they must only happen from the proxy task runner. |
421 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); | 421 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
422 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); | 422 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
423 | 423 |
424 EnsureSyncWatcherExists(); | 424 EnsureSyncWatcherExists(); |
425 return sync_watcher_->SyncWatch(should_stop); | 425 return sync_watcher_->SyncWatch(should_stop); |
426 } | 426 } |
427 | 427 |
428 private: | 428 private: |
429 friend class base::RefCountedThreadSafe<Endpoint>; | 429 friend class base::RefCountedThreadSafe<Endpoint>; |
430 | 430 |
431 ~Endpoint() override { | 431 ~Endpoint() override { |
432 controller_->lock_.AssertAcquired(); | 432 controller_->lock_.AssertAcquired(); |
433 DCHECK(!client_); | 433 DCHECK(!client_); |
434 DCHECK(closed_); | 434 DCHECK(closed_); |
435 DCHECK(peer_closed_); | 435 DCHECK(peer_closed_); |
436 DCHECK(!sync_watcher_); | 436 DCHECK(!sync_watcher_); |
437 } | 437 } |
438 | 438 |
439 void OnSyncMessageEventReady() { | 439 void OnSyncMessageEventReady() { |
440 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 440 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
441 | 441 |
442 scoped_refptr<Endpoint> keepalive(this); | 442 scoped_refptr<Endpoint> keepalive(this); |
443 scoped_refptr<AssociatedGroupController> controller_keepalive( | 443 scoped_refptr<AssociatedGroupController> controller_keepalive( |
444 controller_); | 444 controller_); |
445 | 445 |
446 bool reset_sync_watcher = false; | 446 bool reset_sync_watcher = false; |
447 { | 447 { |
448 base::AutoLock locker(controller_->lock_); | 448 base::AutoLock locker(controller_->lock_); |
449 bool more_to_process = false; | 449 bool more_to_process = false; |
450 if (!sync_messages_.empty()) { | 450 if (!sync_messages_.empty()) { |
(...skipping 26 matching lines...) Expand all Loading... |
477 | 477 |
478 if (reset_sync_watcher) { | 478 if (reset_sync_watcher) { |
479 // If a SyncWatch() call (or multiple ones) of this interface endpoint | 479 // If a SyncWatch() call (or multiple ones) of this interface endpoint |
480 // is on the call stack, resetting the sync watcher will allow it to | 480 // is on the call stack, resetting the sync watcher will allow it to |
481 // exit when the call stack unwinds to that frame. | 481 // exit when the call stack unwinds to that frame. |
482 sync_watcher_.reset(); | 482 sync_watcher_.reset(); |
483 } | 483 } |
484 } | 484 } |
485 | 485 |
486 void EnsureSyncWatcherExists() { | 486 void EnsureSyncWatcherExists() { |
487 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 487 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
488 if (sync_watcher_) | 488 if (sync_watcher_) |
489 return; | 489 return; |
490 | 490 |
491 { | 491 { |
492 base::AutoLock locker(controller_->lock_); | 492 base::AutoLock locker(controller_->lock_); |
493 if (!sync_message_event_) { | 493 if (!sync_message_event_) { |
494 sync_message_event_ = base::MakeUnique<base::WaitableEvent>( | 494 sync_message_event_ = base::MakeUnique<base::WaitableEvent>( |
495 base::WaitableEvent::ResetPolicy::MANUAL, | 495 base::WaitableEvent::ResetPolicy::MANUAL, |
496 base::WaitableEvent::InitialState::NOT_SIGNALED); | 496 base::WaitableEvent::InitialState::NOT_SIGNALED); |
497 if (peer_closed_ || !sync_messages_.empty()) | 497 if (peer_closed_ || !sync_messages_.empty()) |
(...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
620 // Because a notification may in turn detach any endpoint, we have to | 620 // Because a notification may in turn detach any endpoint, we have to |
621 // check each client again here. | 621 // check each client again here. |
622 if (endpoint->client()) | 622 if (endpoint->client()) |
623 NotifyEndpointOfError(endpoint.get(), false /* force_async */); | 623 NotifyEndpointOfError(endpoint.get(), false /* force_async */); |
624 } | 624 } |
625 } | 625 } |
626 | 626 |
627 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { | 627 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { |
628 lock_.AssertAcquired(); | 628 lock_.AssertAcquired(); |
629 DCHECK(endpoint->task_runner() && endpoint->client()); | 629 DCHECK(endpoint->task_runner() && endpoint->client()); |
630 if (endpoint->task_runner()->RunsTasksOnCurrentThread() && !force_async) { | 630 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) { |
631 mojo::InterfaceEndpointClient* client = endpoint->client(); | 631 mojo::InterfaceEndpointClient* client = endpoint->client(); |
632 base::Optional<mojo::DisconnectReason> reason( | 632 base::Optional<mojo::DisconnectReason> reason( |
633 endpoint->disconnect_reason()); | 633 endpoint->disconnect_reason()); |
634 | 634 |
635 base::AutoUnlock unlocker(lock_); | 635 base::AutoUnlock unlocker(lock_); |
636 client->NotifyError(reason); | 636 client->NotifyError(reason); |
637 } else { | 637 } else { |
638 endpoint->task_runner()->PostTask( | 638 endpoint->task_runner()->PostTask( |
639 FROM_HERE, | 639 FROM_HERE, |
640 base::Bind(&ChannelAssociatedGroupController | 640 base::Bind(&ChannelAssociatedGroupController |
641 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), | 641 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
642 endpoint)); | 642 endpoint)); |
643 } | 643 } |
644 } | 644 } |
645 | 645 |
646 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, | 646 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
647 Endpoint* endpoint) { | 647 Endpoint* endpoint) { |
648 base::AutoLock locker(lock_); | 648 base::AutoLock locker(lock_); |
649 auto iter = endpoints_.find(id); | 649 auto iter = endpoints_.find(id); |
650 if (iter == endpoints_.end() || iter->second.get() != endpoint) | 650 if (iter == endpoints_.end() || iter->second.get() != endpoint) |
651 return; | 651 return; |
652 if (!endpoint->client()) | 652 if (!endpoint->client()) |
653 return; | 653 return; |
654 | 654 |
655 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); | 655 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); |
656 NotifyEndpointOfError(endpoint, false /* force_async */); | 656 NotifyEndpointOfError(endpoint, false /* force_async */); |
657 } | 657 } |
658 | 658 |
659 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { | 659 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
660 lock_.AssertAcquired(); | 660 lock_.AssertAcquired(); |
661 endpoint->set_closed(); | 661 endpoint->set_closed(); |
662 if (endpoint->closed() && endpoint->peer_closed()) | 662 if (endpoint->closed() && endpoint->peer_closed()) |
663 endpoints_.erase(endpoint->id()); | 663 endpoints_.erase(endpoint->id()); |
664 } | 664 } |
665 | 665 |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
703 | 703 |
704 mojo::InterfaceId id = message->interface_id(); | 704 mojo::InterfaceId id = message->interface_id(); |
705 DCHECK(mojo::IsValidInterfaceId(id)); | 705 DCHECK(mojo::IsValidInterfaceId(id)); |
706 | 706 |
707 base::AutoLock locker(lock_); | 707 base::AutoLock locker(lock_); |
708 Endpoint* endpoint = FindEndpoint(id); | 708 Endpoint* endpoint = FindEndpoint(id); |
709 if (!endpoint) | 709 if (!endpoint) |
710 return true; | 710 return true; |
711 | 711 |
712 mojo::InterfaceEndpointClient* client = endpoint->client(); | 712 mojo::InterfaceEndpointClient* client = endpoint->client(); |
713 if (!client || !endpoint->task_runner()->RunsTasksOnCurrentThread()) { | 713 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) { |
714 // No client has been bound yet or the client runs tasks on another | 714 // No client has been bound yet or the client runs tasks on another |
715 // thread. We assume the other thread must always be the one on which | 715 // thread. We assume the other thread must always be the one on which |
716 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 716 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
717 // | 717 // |
718 // If the client is not yet bound, it must be bound by the time this task | 718 // If the client is not yet bound, it must be bound by the time this task |
719 // runs or else it's programmer error. | 719 // runs or else it's programmer error. |
720 DCHECK(proxy_task_runner_); | 720 DCHECK(proxy_task_runner_); |
721 | 721 |
722 if (message->has_flag(mojo::Message::kFlagIsSync)) { | 722 if (message->has_flag(mojo::Message::kFlagIsSync)) { |
723 MessageWrapper message_wrapper(this, std::move(*message)); | 723 MessageWrapper message_wrapper(this, std::move(*message)); |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
759 | 759 |
760 base::AutoLock locker(lock_); | 760 base::AutoLock locker(lock_); |
761 Endpoint* endpoint = FindEndpoint(id); | 761 Endpoint* endpoint = FindEndpoint(id); |
762 if (!endpoint) | 762 if (!endpoint) |
763 return; | 763 return; |
764 | 764 |
765 mojo::InterfaceEndpointClient* client = endpoint->client(); | 765 mojo::InterfaceEndpointClient* client = endpoint->client(); |
766 if (!client) | 766 if (!client) |
767 return; | 767 return; |
768 | 768 |
769 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); | 769 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); |
770 | 770 |
771 // Sync messages should never make their way to this method. | 771 // Sync messages should never make their way to this method. |
772 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); | 772 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); |
773 | 773 |
774 bool result = false; | 774 bool result = false; |
775 { | 775 { |
776 base::AutoUnlock unlocker(lock_); | 776 base::AutoUnlock unlocker(lock_); |
777 result = client->HandleIncomingMessage(&message); | 777 result = client->HandleIncomingMessage(&message); |
778 } | 778 } |
779 | 779 |
780 if (!result) | 780 if (!result) |
781 RaiseError(); | 781 RaiseError(); |
782 } | 782 } |
783 | 783 |
784 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { | 784 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
785 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 785 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
786 | 786 |
787 base::AutoLock locker(lock_); | 787 base::AutoLock locker(lock_); |
788 Endpoint* endpoint = FindEndpoint(interface_id); | 788 Endpoint* endpoint = FindEndpoint(interface_id); |
789 if (!endpoint) | 789 if (!endpoint) |
790 return; | 790 return; |
791 | 791 |
792 // Careful, if the endpoint is detached its members are cleared. Check for | 792 // Careful, if the endpoint is detached its members are cleared. Check for |
793 // that before dereferencing. | 793 // that before dereferencing. |
794 mojo::InterfaceEndpointClient* client = endpoint->client(); | 794 mojo::InterfaceEndpointClient* client = endpoint->client(); |
795 if (!client) | 795 if (!client) |
796 return; | 796 return; |
797 | 797 |
798 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); | 798 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); |
799 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); | 799 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); |
800 | 800 |
801 // The message must have already been dequeued by the endpoint waking up | 801 // The message must have already been dequeued by the endpoint waking up |
802 // from a sync wait. Nothing to do. | 802 // from a sync wait. Nothing to do. |
803 if (message_wrapper.value().IsNull()) | 803 if (message_wrapper.value().IsNull()) |
804 return; | 804 return; |
805 | 805 |
806 bool result = false; | 806 bool result = false; |
807 { | 807 { |
808 base::AutoUnlock unlocker(lock_); | 808 base::AutoUnlock unlocker(lock_); |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
918 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( | 918 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( |
919 mojo::ScopedMessagePipeHandle handle, | 919 mojo::ScopedMessagePipeHandle handle, |
920 Channel::Mode mode, | 920 Channel::Mode mode, |
921 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 921 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
922 return base::MakeUnique<MojoBootstrapImpl>( | 922 return base::MakeUnique<MojoBootstrapImpl>( |
923 std::move(handle), new ChannelAssociatedGroupController( | 923 std::move(handle), new ChannelAssociatedGroupController( |
924 mode == Channel::MODE_SERVER, ipc_task_runner)); | 924 mode == Channel::MODE_SERVER, ipc_task_runner)); |
925 } | 925 } |
926 | 926 |
927 } // namespace IPC | 927 } // namespace IPC |
OLD | NEW |