| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <map> | 9 #include <map> |
| 10 #include <memory> | 10 #include <memory> |
| (...skipping 340 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 351 mojo::InterfaceEndpointClient* client() const { | 351 mojo::InterfaceEndpointClient* client() const { |
| 352 controller_->lock_.AssertAcquired(); | 352 controller_->lock_.AssertAcquired(); |
| 353 return client_; | 353 return client_; |
| 354 } | 354 } |
| 355 | 355 |
| 356 void AttachClient(mojo::InterfaceEndpointClient* client, | 356 void AttachClient(mojo::InterfaceEndpointClient* client, |
| 357 scoped_refptr<base::SequencedTaskRunner> runner) { | 357 scoped_refptr<base::SequencedTaskRunner> runner) { |
| 358 controller_->lock_.AssertAcquired(); | 358 controller_->lock_.AssertAcquired(); |
| 359 DCHECK(!client_); | 359 DCHECK(!client_); |
| 360 DCHECK(!closed_); | 360 DCHECK(!closed_); |
| 361 DCHECK(runner->RunsTasksOnCurrentThread()); | 361 DCHECK(runner->RunsTasksInCurrentSequence()); |
| 362 | 362 |
| 363 task_runner_ = std::move(runner); | 363 task_runner_ = std::move(runner); |
| 364 client_ = client; | 364 client_ = client; |
| 365 } | 365 } |
| 366 | 366 |
| 367 void DetachClient() { | 367 void DetachClient() { |
| 368 controller_->lock_.AssertAcquired(); | 368 controller_->lock_.AssertAcquired(); |
| 369 DCHECK(client_); | 369 DCHECK(client_); |
| 370 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 370 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| 371 DCHECK(!closed_); | 371 DCHECK(!closed_); |
| 372 | 372 |
| 373 task_runner_ = nullptr; | 373 task_runner_ = nullptr; |
| 374 client_ = nullptr; | 374 client_ = nullptr; |
| 375 sync_watcher_.reset(); | 375 sync_watcher_.reset(); |
| 376 } | 376 } |
| 377 | 377 |
| 378 uint32_t EnqueueSyncMessage(MessageWrapper message) { | 378 uint32_t EnqueueSyncMessage(MessageWrapper message) { |
| 379 controller_->lock_.AssertAcquired(); | 379 controller_->lock_.AssertAcquired(); |
| 380 uint32_t id = GenerateSyncMessageId(); | 380 uint32_t id = GenerateSyncMessageId(); |
| (...skipping 13 matching lines...) Expand all Loading... |
| 394 controller_->lock_.AssertAcquired(); | 394 controller_->lock_.AssertAcquired(); |
| 395 if (sync_messages_.empty() || sync_messages_.front().first != id) | 395 if (sync_messages_.empty() || sync_messages_.front().first != id) |
| 396 return MessageWrapper(); | 396 return MessageWrapper(); |
| 397 MessageWrapper message = std::move(sync_messages_.front().second); | 397 MessageWrapper message = std::move(sync_messages_.front().second); |
| 398 sync_messages_.pop(); | 398 sync_messages_.pop(); |
| 399 return message; | 399 return message; |
| 400 } | 400 } |
| 401 | 401 |
| 402 // mojo::InterfaceEndpointController: | 402 // mojo::InterfaceEndpointController: |
| 403 bool SendMessage(mojo::Message* message) override { | 403 bool SendMessage(mojo::Message* message) override { |
| 404 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 404 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| 405 message->set_interface_id(id_); | 405 message->set_interface_id(id_); |
| 406 return controller_->SendMessage(message); | 406 return controller_->SendMessage(message); |
| 407 } | 407 } |
| 408 | 408 |
| 409 void AllowWokenUpBySyncWatchOnSameThread() override { | 409 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 410 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 410 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| 411 | 411 |
| 412 EnsureSyncWatcherExists(); | 412 EnsureSyncWatcherExists(); |
| 413 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 413 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 414 } | 414 } |
| 415 | 415 |
| 416 bool SyncWatch(const bool* should_stop) override { | 416 bool SyncWatch(const bool* should_stop) override { |
| 417 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 417 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| 418 | 418 |
| 419 // It's not legal to make sync calls from the master endpoint's thread, | 419 // It's not legal to make sync calls from the master endpoint's thread, |
| 420 // and in fact they must only happen from the proxy task runner. | 420 // and in fact they must only happen from the proxy task runner. |
| 421 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); | 421 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
| 422 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); | 422 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
| 423 | 423 |
| 424 EnsureSyncWatcherExists(); | 424 EnsureSyncWatcherExists(); |
| 425 return sync_watcher_->SyncWatch(should_stop); | 425 return sync_watcher_->SyncWatch(should_stop); |
| 426 } | 426 } |
| 427 | 427 |
| 428 private: | 428 private: |
| 429 friend class base::RefCountedThreadSafe<Endpoint>; | 429 friend class base::RefCountedThreadSafe<Endpoint>; |
| 430 | 430 |
| 431 ~Endpoint() override { | 431 ~Endpoint() override { |
| 432 controller_->lock_.AssertAcquired(); | 432 controller_->lock_.AssertAcquired(); |
| 433 DCHECK(!client_); | 433 DCHECK(!client_); |
| 434 DCHECK(closed_); | 434 DCHECK(closed_); |
| 435 DCHECK(peer_closed_); | 435 DCHECK(peer_closed_); |
| 436 DCHECK(!sync_watcher_); | 436 DCHECK(!sync_watcher_); |
| 437 } | 437 } |
| 438 | 438 |
| 439 void OnSyncMessageEventReady() { | 439 void OnSyncMessageEventReady() { |
| 440 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 440 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| 441 | 441 |
| 442 scoped_refptr<Endpoint> keepalive(this); | 442 scoped_refptr<Endpoint> keepalive(this); |
| 443 scoped_refptr<AssociatedGroupController> controller_keepalive( | 443 scoped_refptr<AssociatedGroupController> controller_keepalive( |
| 444 controller_); | 444 controller_); |
| 445 | 445 |
| 446 bool reset_sync_watcher = false; | 446 bool reset_sync_watcher = false; |
| 447 { | 447 { |
| 448 base::AutoLock locker(controller_->lock_); | 448 base::AutoLock locker(controller_->lock_); |
| 449 bool more_to_process = false; | 449 bool more_to_process = false; |
| 450 if (!sync_messages_.empty()) { | 450 if (!sync_messages_.empty()) { |
| (...skipping 26 matching lines...) Expand all Loading... |
| 477 | 477 |
| 478 if (reset_sync_watcher) { | 478 if (reset_sync_watcher) { |
| 479 // If a SyncWatch() call (or multiple ones) of this interface endpoint | 479 // If a SyncWatch() call (or multiple ones) of this interface endpoint |
| 480 // is on the call stack, resetting the sync watcher will allow it to | 480 // is on the call stack, resetting the sync watcher will allow it to |
| 481 // exit when the call stack unwinds to that frame. | 481 // exit when the call stack unwinds to that frame. |
| 482 sync_watcher_.reset(); | 482 sync_watcher_.reset(); |
| 483 } | 483 } |
| 484 } | 484 } |
| 485 | 485 |
| 486 void EnsureSyncWatcherExists() { | 486 void EnsureSyncWatcherExists() { |
| 487 DCHECK(task_runner_->RunsTasksOnCurrentThread()); | 487 DCHECK(task_runner_->RunsTasksInCurrentSequence()); |
| 488 if (sync_watcher_) | 488 if (sync_watcher_) |
| 489 return; | 489 return; |
| 490 | 490 |
| 491 { | 491 { |
| 492 base::AutoLock locker(controller_->lock_); | 492 base::AutoLock locker(controller_->lock_); |
| 493 if (!sync_message_event_) { | 493 if (!sync_message_event_) { |
| 494 sync_message_event_ = base::MakeUnique<base::WaitableEvent>( | 494 sync_message_event_ = base::MakeUnique<base::WaitableEvent>( |
| 495 base::WaitableEvent::ResetPolicy::MANUAL, | 495 base::WaitableEvent::ResetPolicy::MANUAL, |
| 496 base::WaitableEvent::InitialState::NOT_SIGNALED); | 496 base::WaitableEvent::InitialState::NOT_SIGNALED); |
| 497 if (peer_closed_ || !sync_messages_.empty()) | 497 if (peer_closed_ || !sync_messages_.empty()) |
| (...skipping 122 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 620 // Because a notification may in turn detach any endpoint, we have to | 620 // Because a notification may in turn detach any endpoint, we have to |
| 621 // check each client again here. | 621 // check each client again here. |
| 622 if (endpoint->client()) | 622 if (endpoint->client()) |
| 623 NotifyEndpointOfError(endpoint.get(), false /* force_async */); | 623 NotifyEndpointOfError(endpoint.get(), false /* force_async */); |
| 624 } | 624 } |
| 625 } | 625 } |
| 626 | 626 |
| 627 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { | 627 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { |
| 628 lock_.AssertAcquired(); | 628 lock_.AssertAcquired(); |
| 629 DCHECK(endpoint->task_runner() && endpoint->client()); | 629 DCHECK(endpoint->task_runner() && endpoint->client()); |
| 630 if (endpoint->task_runner()->RunsTasksOnCurrentThread() && !force_async) { | 630 if (endpoint->task_runner()->RunsTasksInCurrentSequence() && !force_async) { |
| 631 mojo::InterfaceEndpointClient* client = endpoint->client(); | 631 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 632 base::Optional<mojo::DisconnectReason> reason( | 632 base::Optional<mojo::DisconnectReason> reason( |
| 633 endpoint->disconnect_reason()); | 633 endpoint->disconnect_reason()); |
| 634 | 634 |
| 635 base::AutoUnlock unlocker(lock_); | 635 base::AutoUnlock unlocker(lock_); |
| 636 client->NotifyError(reason); | 636 client->NotifyError(reason); |
| 637 } else { | 637 } else { |
| 638 endpoint->task_runner()->PostTask( | 638 endpoint->task_runner()->PostTask( |
| 639 FROM_HERE, | 639 FROM_HERE, |
| 640 base::Bind(&ChannelAssociatedGroupController | 640 base::Bind(&ChannelAssociatedGroupController |
| 641 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), | 641 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
| 642 endpoint)); | 642 endpoint)); |
| 643 } | 643 } |
| 644 } | 644 } |
| 645 | 645 |
| 646 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, | 646 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
| 647 Endpoint* endpoint) { | 647 Endpoint* endpoint) { |
| 648 base::AutoLock locker(lock_); | 648 base::AutoLock locker(lock_); |
| 649 auto iter = endpoints_.find(id); | 649 auto iter = endpoints_.find(id); |
| 650 if (iter == endpoints_.end() || iter->second.get() != endpoint) | 650 if (iter == endpoints_.end() || iter->second.get() != endpoint) |
| 651 return; | 651 return; |
| 652 if (!endpoint->client()) | 652 if (!endpoint->client()) |
| 653 return; | 653 return; |
| 654 | 654 |
| 655 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); | 655 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); |
| 656 NotifyEndpointOfError(endpoint, false /* force_async */); | 656 NotifyEndpointOfError(endpoint, false /* force_async */); |
| 657 } | 657 } |
| 658 | 658 |
| 659 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { | 659 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
| 660 lock_.AssertAcquired(); | 660 lock_.AssertAcquired(); |
| 661 endpoint->set_closed(); | 661 endpoint->set_closed(); |
| 662 if (endpoint->closed() && endpoint->peer_closed()) | 662 if (endpoint->closed() && endpoint->peer_closed()) |
| 663 endpoints_.erase(endpoint->id()); | 663 endpoints_.erase(endpoint->id()); |
| 664 } | 664 } |
| 665 | 665 |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 703 | 703 |
| 704 mojo::InterfaceId id = message->interface_id(); | 704 mojo::InterfaceId id = message->interface_id(); |
| 705 DCHECK(mojo::IsValidInterfaceId(id)); | 705 DCHECK(mojo::IsValidInterfaceId(id)); |
| 706 | 706 |
| 707 base::AutoLock locker(lock_); | 707 base::AutoLock locker(lock_); |
| 708 Endpoint* endpoint = FindEndpoint(id); | 708 Endpoint* endpoint = FindEndpoint(id); |
| 709 if (!endpoint) | 709 if (!endpoint) |
| 710 return true; | 710 return true; |
| 711 | 711 |
| 712 mojo::InterfaceEndpointClient* client = endpoint->client(); | 712 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 713 if (!client || !endpoint->task_runner()->RunsTasksOnCurrentThread()) { | 713 if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) { |
| 714 // No client has been bound yet or the client runs tasks on another | 714 // No client has been bound yet or the client runs tasks on another |
| 715 // thread. We assume the other thread must always be the one on which | 715 // thread. We assume the other thread must always be the one on which |
| 716 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 716 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
| 717 // | 717 // |
| 718 // If the client is not yet bound, it must be bound by the time this task | 718 // If the client is not yet bound, it must be bound by the time this task |
| 719 // runs or else it's programmer error. | 719 // runs or else it's programmer error. |
| 720 DCHECK(proxy_task_runner_); | 720 DCHECK(proxy_task_runner_); |
| 721 | 721 |
| 722 if (message->has_flag(mojo::Message::kFlagIsSync)) { | 722 if (message->has_flag(mojo::Message::kFlagIsSync)) { |
| 723 MessageWrapper message_wrapper(this, std::move(*message)); | 723 MessageWrapper message_wrapper(this, std::move(*message)); |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 759 | 759 |
| 760 base::AutoLock locker(lock_); | 760 base::AutoLock locker(lock_); |
| 761 Endpoint* endpoint = FindEndpoint(id); | 761 Endpoint* endpoint = FindEndpoint(id); |
| 762 if (!endpoint) | 762 if (!endpoint) |
| 763 return; | 763 return; |
| 764 | 764 |
| 765 mojo::InterfaceEndpointClient* client = endpoint->client(); | 765 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 766 if (!client) | 766 if (!client) |
| 767 return; | 767 return; |
| 768 | 768 |
| 769 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); | 769 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); |
| 770 | 770 |
| 771 // Sync messages should never make their way to this method. | 771 // Sync messages should never make their way to this method. |
| 772 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); | 772 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); |
| 773 | 773 |
| 774 bool result = false; | 774 bool result = false; |
| 775 { | 775 { |
| 776 base::AutoUnlock unlocker(lock_); | 776 base::AutoUnlock unlocker(lock_); |
| 777 result = client->HandleIncomingMessage(&message); | 777 result = client->HandleIncomingMessage(&message); |
| 778 } | 778 } |
| 779 | 779 |
| 780 if (!result) | 780 if (!result) |
| 781 RaiseError(); | 781 RaiseError(); |
| 782 } | 782 } |
| 783 | 783 |
| 784 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { | 784 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
| 785 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 785 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 786 | 786 |
| 787 base::AutoLock locker(lock_); | 787 base::AutoLock locker(lock_); |
| 788 Endpoint* endpoint = FindEndpoint(interface_id); | 788 Endpoint* endpoint = FindEndpoint(interface_id); |
| 789 if (!endpoint) | 789 if (!endpoint) |
| 790 return; | 790 return; |
| 791 | 791 |
| 792 // Careful, if the endpoint is detached its members are cleared. Check for | 792 // Careful, if the endpoint is detached its members are cleared. Check for |
| 793 // that before dereferencing. | 793 // that before dereferencing. |
| 794 mojo::InterfaceEndpointClient* client = endpoint->client(); | 794 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 795 if (!client) | 795 if (!client) |
| 796 return; | 796 return; |
| 797 | 797 |
| 798 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); | 798 DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); |
| 799 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); | 799 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); |
| 800 | 800 |
| 801 // The message must have already been dequeued by the endpoint waking up | 801 // The message must have already been dequeued by the endpoint waking up |
| 802 // from a sync wait. Nothing to do. | 802 // from a sync wait. Nothing to do. |
| 803 if (message_wrapper.value().IsNull()) | 803 if (message_wrapper.value().IsNull()) |
| 804 return; | 804 return; |
| 805 | 805 |
| 806 bool result = false; | 806 bool result = false; |
| 807 { | 807 { |
| 808 base::AutoUnlock unlocker(lock_); | 808 base::AutoUnlock unlocker(lock_); |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 918 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( | 918 std::unique_ptr<MojoBootstrap> MojoBootstrap::Create( |
| 919 mojo::ScopedMessagePipeHandle handle, | 919 mojo::ScopedMessagePipeHandle handle, |
| 920 Channel::Mode mode, | 920 Channel::Mode mode, |
| 921 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 921 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
| 922 return base::MakeUnique<MojoBootstrapImpl>( | 922 return base::MakeUnique<MojoBootstrapImpl>( |
| 923 std::move(handle), new ChannelAssociatedGroupController( | 923 std::move(handle), new ChannelAssociatedGroupController( |
| 924 mode == Channel::MODE_SERVER, ipc_task_runner)); | 924 mode == Channel::MODE_SERVER, ipc_task_runner)); |
| 925 } | 925 } |
| 926 | 926 |
| 927 } // namespace IPC | 927 } // namespace IPC |
| OLD | NEW |