Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(20)

Side by Side Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2147493006: Adds Channel-associated interface support on ChannelProxy's thread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_mojo_bootstrap.h" 5 #include "ipc/ipc_mojo_bootstrap.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <map> 9 #include <map>
10 #include <memory> 10 #include <memory>
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 } 114 }
115 115
116 void CloseEndpointHandle(mojo::InterfaceId id, bool is_local) override { 116 void CloseEndpointHandle(mojo::InterfaceId id, bool is_local) override {
117 if (!mojo::IsValidInterfaceId(id)) 117 if (!mojo::IsValidInterfaceId(id))
118 return; 118 return;
119 119
120 base::AutoLock locker(lock_); 120 base::AutoLock locker(lock_);
121 if (!is_local) { 121 if (!is_local) {
122 DCHECK(ContainsKey(endpoints_, id)); 122 DCHECK(ContainsKey(endpoints_, id));
123 DCHECK(!mojo::IsMasterInterfaceId(id)); 123 DCHECK(!mojo::IsMasterInterfaceId(id));
124 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); 124 NotifyEndpointClosedBeforeSent(id);
125 return; 125 return;
126 } 126 }
127 127
128 DCHECK(ContainsKey(endpoints_, id)); 128 DCHECK(ContainsKey(endpoints_, id));
129 Endpoint* endpoint = endpoints_[id].get(); 129 Endpoint* endpoint = endpoints_[id].get();
130 DCHECK(!endpoint->client()); 130 DCHECK(!endpoint->client());
131 DCHECK(!endpoint->closed()); 131 DCHECK(!endpoint->closed());
132 MarkClosedAndMaybeRemove(endpoint); 132 MarkClosedAndMaybeRemove(endpoint);
133 133
134 if (!mojo::IsMasterInterfaceId(id)) 134 if (!mojo::IsMasterInterfaceId(id))
135 control_message_proxy_.NotifyPeerEndpointClosed(id); 135 NotifyPeerEndpointClosed(id);
136 } 136 }
137 137
138 mojo::InterfaceEndpointController* AttachEndpointClient( 138 mojo::InterfaceEndpointController* AttachEndpointClient(
139 const mojo::ScopedInterfaceEndpointHandle& handle, 139 const mojo::ScopedInterfaceEndpointHandle& handle,
140 mojo::InterfaceEndpointClient* client, 140 mojo::InterfaceEndpointClient* client,
141 scoped_refptr<base::SingleThreadTaskRunner> runner) override { 141 scoped_refptr<base::SingleThreadTaskRunner> runner) override {
142 const mojo::InterfaceId id = handle.id(); 142 const mojo::InterfaceId id = handle.id();
143 143
144 DCHECK(mojo::IsValidInterfaceId(id)); 144 DCHECK(mojo::IsValidInterfaceId(id));
145 DCHECK(client); 145 DCHECK(client);
(...skipping 239 matching lines...) Expand 10 before | Expand all | Expand 10 after
385 endpoints_.erase(endpoint->id()); 385 endpoints_.erase(endpoint->id());
386 } 386 }
387 387
388 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { 388 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
389 lock_.AssertAcquired(); 389 lock_.AssertAcquired();
390 endpoint->set_peer_closed(); 390 endpoint->set_peer_closed();
391 if (endpoint->closed() && endpoint->peer_closed()) 391 if (endpoint->closed() && endpoint->peer_closed())
392 endpoints_.erase(endpoint->id()); 392 endpoints_.erase(endpoint->id());
393 } 393 }
394 394
395 void NotifyPeerEndpointClosed(mojo::InterfaceId id) {
396 if (task_runner_->BelongsToCurrentThread()) {
397 if (connector_.is_valid())
398 control_message_proxy_.NotifyPeerEndpointClosed(id);
399 } else {
400 task_runner_->PostTask(
401 FROM_HERE,
402 base::Bind(&ChannelAssociatedGroupController
403 ::NotifyPeerEndpointClosed, this, id));
404 }
405 }
406
407 void NotifyEndpointClosedBeforeSent(mojo::InterfaceId id) {
408 if (task_runner_->BelongsToCurrentThread()) {
409 if (connector_.is_valid())
410 control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
411 } else {
412 task_runner_->PostTask(
413 FROM_HERE,
414 base::Bind(&ChannelAssociatedGroupController
415 ::NotifyEndpointClosedBeforeSent, this, id));
416 }
417 }
418
395 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { 419 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
396 lock_.AssertAcquired(); 420 lock_.AssertAcquired();
397 DCHECK(!inserted || !*inserted); 421 DCHECK(!inserted || !*inserted);
398 422
399 auto iter = endpoints_.find(id); 423 auto iter = endpoints_.find(id);
400 if (iter != endpoints_.end()) 424 if (iter != endpoints_.end())
401 return iter->second.get(); 425 return iter->second.get();
402 426
403 Endpoint* endpoint = new Endpoint(this, id); 427 Endpoint* endpoint = new Endpoint(this, id);
404 endpoints_.insert({ id, endpoint }); 428 endpoints_.insert({ id, endpoint });
405 if (inserted) 429 if (inserted)
406 *inserted = true; 430 *inserted = true;
407 return endpoint; 431 return endpoint;
408 } 432 }
409 433
410 // mojo::MessageReceiver: 434 // mojo::MessageReceiver:
411 bool Accept(mojo::Message* message) override { 435 bool Accept(mojo::Message* message) override {
412 DCHECK(thread_checker_.CalledOnValidThread()); 436 DCHECK(thread_checker_.CalledOnValidThread());
413 437
414 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) { 438 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) {
415 if (!control_message_handler_.Accept(message)) 439 if (!control_message_handler_.Accept(message))
yzshen1 2016/07/14 15:04:04 can we directly return false instead of raising er
Ken Rockot(use gerrit already) 2016/07/14 15:27:37 Good point! Done
416 RaiseError(); 440 RaiseError();
417 return true; 441 return true;
418 } 442 }
419 443
420 mojo::InterfaceId id = message->interface_id(); 444 mojo::InterfaceId id = message->interface_id();
421 DCHECK(mojo::IsValidInterfaceId(id)); 445 DCHECK(mojo::IsValidInterfaceId(id));
422 446
423 base::AutoLock locker(lock_); 447 base::AutoLock locker(lock_);
424 bool inserted = false; 448 Endpoint* endpoint = GetEndpointForDispatch(id);
425 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 449 if (!endpoint)
426 if (inserted) {
427 MarkClosedAndMaybeRemove(endpoint);
428 if (!mojo::IsMasterInterfaceId(id))
429 control_message_proxy_.NotifyPeerEndpointClosed(id);
430 return true;
431 }
432
433 if (endpoint->closed())
434 return true; 450 return true;
435 451
436 mojo::InterfaceEndpointClient* client = endpoint->client(); 452 mojo::InterfaceEndpointClient* client = endpoint->client();
437 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { 453 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) {
438 // No client has been bound yet or the client runs tasks on another 454 // No client has been bound yet or the client runs tasks on another
439 // thread. We assume the other thread must always be the one on which 455 // thread. We assume the other thread must always be the one on which
440 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. 456 // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
441 // 457 //
442 // If the client is not yet bound, it must be bound by the time this task 458 // If the client is not yet bound, it must be bound by the time this task
443 // runs or else it's programmer error. 459 // runs or else it's programmer error.
444 DCHECK(proxy_task_runner_); 460 DCHECK(proxy_task_runner_);
445 CHECK(false);
446 std::unique_ptr<mojo::Message> passed_message(new mojo::Message); 461 std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
447 message->MoveTo(passed_message.get()); 462 message->MoveTo(passed_message.get());
448 proxy_task_runner_->PostTask( 463 proxy_task_runner_->PostTask(
449 FROM_HERE, 464 FROM_HERE,
450 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, 465 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
451 this, base::Passed(&passed_message))); 466 this, base::Passed(&passed_message)));
452 return true; 467 return true;
453 } 468 }
454 469
455 // We do not expect to receive sync responses on the master endpoint thread. 470 // We do not expect to receive sync responses on the master endpoint thread.
456 // If it's happening, it's a bug. 471 // If it's happening, it's a bug.
457 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); 472 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
458 473
459 bool result = false; 474 bool result = false;
460 { 475 {
461 base::AutoUnlock unlocker(lock_); 476 base::AutoUnlock unlocker(lock_);
462 result = client->HandleIncomingMessage(message); 477 result = client->HandleIncomingMessage(message);
463 } 478 }
464 479
465 if (!result) 480 if (!result)
466 RaiseError(); 481 RaiseError();
yzshen1 2016/07/14 15:04:04 I think we could return false directly here?
Ken Rockot(use gerrit already) 2016/07/14 15:27:37 Yep, done
467 482
468 return true; 483 return true;
469 } 484 }
470 485
471 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { 486 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) {
472 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 487 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
473 488
474 // TODO(rockot): Implement this. 489 mojo::InterfaceId id = message->interface_id();
475 NOTREACHED(); 490 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
491
492 base::AutoLock locker(lock_);
493 Endpoint* endpoint = GetEndpointForDispatch(id);
494 if (!endpoint)
495 return;
496
497 mojo::InterfaceEndpointClient* client = endpoint->client();
498 if (!client)
499 return;
500
501 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
502
503 // TODO(rockot): Implement sync dispatch. For now, sync messages are
504 // unsupported here.
505 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
506
507 bool result = false;
508 {
509 base::AutoUnlock unlocker(lock_);
510 result = client->HandleIncomingMessage(message.get());
511 }
512
513 if (!result)
514 RaiseError();
515 }
516
517 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) {
518 lock_.AssertAcquired();
519 bool inserted = false;
520 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
521 if (inserted) {
522 MarkClosedAndMaybeRemove(endpoint);
523 if (!mojo::IsMasterInterfaceId(id))
524 NotifyPeerEndpointClosed(id);
525 return nullptr;
526 }
527
528 if (endpoint->closed())
529 return nullptr;
530
531 return endpoint;
476 } 532 }
477 533
478 // mojo::PipeControlMessageHandlerDelegate: 534 // mojo::PipeControlMessageHandlerDelegate:
479 bool OnPeerAssociatedEndpointClosed(mojo::InterfaceId id) override { 535 bool OnPeerAssociatedEndpointClosed(mojo::InterfaceId id) override {
480 DCHECK(thread_checker_.CalledOnValidThread()); 536 DCHECK(thread_checker_.CalledOnValidThread());
481 537
482 if (mojo::IsMasterInterfaceId(id)) 538 if (mojo::IsMasterInterfaceId(id))
483 return false; 539 return false;
484 540
485 base::AutoLock locker(lock_); 541 base::AutoLock locker(lock_);
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
554 void set_connection_error_handler(const base::Closure& handler) { 610 void set_connection_error_handler(const base::Closure& handler) {
555 DCHECK(endpoint_client_); 611 DCHECK(endpoint_client_);
556 endpoint_client_->set_connection_error_handler(handler); 612 endpoint_client_->set_connection_error_handler(handler);
557 } 613 }
558 614
559 mojo::AssociatedGroup* associated_group() { 615 mojo::AssociatedGroup* associated_group() {
560 DCHECK(controller_); 616 DCHECK(controller_);
561 return controller_->associated_group(); 617 return controller_->associated_group();
562 } 618 }
563 619
620 ChannelAssociatedGroupController* controller() {
621 DCHECK(controller_);
622 return controller_.get();
623 }
624
564 mojom::Bootstrap* operator->() { 625 mojom::Bootstrap* operator->() {
565 DCHECK(proxy_); 626 DCHECK(proxy_);
566 return proxy_.get(); 627 return proxy_.get();
567 } 628 }
568 629
569 private: 630 private:
570 std::unique_ptr<mojom::BootstrapProxy> proxy_; 631 std::unique_ptr<mojom::BootstrapProxy> proxy_;
571 scoped_refptr<ChannelAssociatedGroupController> controller_; 632 scoped_refptr<ChannelAssociatedGroupController> controller_;
572 std::unique_ptr<mojo::InterfaceEndpointClient> endpoint_client_; 633 std::unique_ptr<mojo::InterfaceEndpointClient> endpoint_client_;
573 634
(...skipping 15 matching lines...) Expand all
589 void set_connection_error_handler(const base::Closure& handler) { 650 void set_connection_error_handler(const base::Closure& handler) {
590 DCHECK(endpoint_client_); 651 DCHECK(endpoint_client_);
591 endpoint_client_->set_connection_error_handler(handler); 652 endpoint_client_->set_connection_error_handler(handler);
592 } 653 }
593 654
594 mojo::AssociatedGroup* associated_group() { 655 mojo::AssociatedGroup* associated_group() {
595 DCHECK(controller_); 656 DCHECK(controller_);
596 return controller_->associated_group(); 657 return controller_->associated_group();
597 } 658 }
598 659
660 ChannelAssociatedGroupController* controller() {
661 DCHECK(controller_);
662 return controller_.get();
663 }
664
599 void Bind(mojo::ScopedMessagePipeHandle handle) { 665 void Bind(mojo::ScopedMessagePipeHandle handle) {
600 DCHECK(!controller_); 666 DCHECK(!controller_);
601 controller_ = 667 controller_ =
602 new ChannelAssociatedGroupController(false, std::move(handle)); 668 new ChannelAssociatedGroupController(false, std::move(handle));
603 stub_.serialization_context()->group_controller = controller_; 669 stub_.serialization_context()->group_controller = controller_;
604 670
605 endpoint_client_.reset(new mojo::InterfaceEndpointClient( 671 endpoint_client_.reset(new mojo::InterfaceEndpointClient(
606 controller_->CreateLocalEndpointHandle(mojo::kMasterInterfaceId), 672 controller_->CreateLocalEndpointHandle(mojo::kMasterInterfaceId),
607 &stub_, 673 &stub_,
608 base::MakeUnique<typename mojom::Bootstrap::RequestValidator_>(), 674 base::MakeUnique<typename mojom::Bootstrap::RequestValidator_>(),
(...skipping 10 matching lines...) Expand all
619 685
620 // MojoBootstrap for the server process. You should create the instance 686 // MojoBootstrap for the server process. You should create the instance
621 // using MojoBootstrap::Create(). 687 // using MojoBootstrap::Create().
622 class MojoServerBootstrap : public MojoBootstrap { 688 class MojoServerBootstrap : public MojoBootstrap {
623 public: 689 public:
624 MojoServerBootstrap(); 690 MojoServerBootstrap();
625 691
626 private: 692 private:
627 // MojoBootstrap implementation. 693 // MojoBootstrap implementation.
628 void Connect() override; 694 void Connect() override;
695
629 mojo::AssociatedGroup* GetAssociatedGroup() override { 696 mojo::AssociatedGroup* GetAssociatedGroup() override {
630 return bootstrap_.associated_group(); 697 return bootstrap_.associated_group();
631 } 698 }
632 699
700 void SetProxyTaskRunner(
701 scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
702 bootstrap_.controller()->SetProxyTaskRunner(task_runner);
703 }
704
633 void OnInitDone(int32_t peer_pid); 705 void OnInitDone(int32_t peer_pid);
634 706
635 BootstrapMasterProxy bootstrap_; 707 BootstrapMasterProxy bootstrap_;
636 IPC::mojom::ChannelAssociatedPtrInfo send_channel_; 708 IPC::mojom::ChannelAssociatedPtrInfo send_channel_;
637 IPC::mojom::ChannelAssociatedRequest receive_channel_request_; 709 IPC::mojom::ChannelAssociatedRequest receive_channel_request_;
638 710
639 DISALLOW_COPY_AND_ASSIGN(MojoServerBootstrap); 711 DISALLOW_COPY_AND_ASSIGN(MojoServerBootstrap);
640 }; 712 };
641 713
642 MojoServerBootstrap::MojoServerBootstrap() = default; 714 MojoServerBootstrap::MojoServerBootstrap() = default;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
681 753
682 // MojoBootstrap for client processes. You should create the instance 754 // MojoBootstrap for client processes. You should create the instance
683 // using MojoBootstrap::Create(). 755 // using MojoBootstrap::Create().
684 class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap { 756 class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap {
685 public: 757 public:
686 MojoClientBootstrap(); 758 MojoClientBootstrap();
687 759
688 private: 760 private:
689 // MojoBootstrap implementation. 761 // MojoBootstrap implementation.
690 void Connect() override; 762 void Connect() override;
763
691 mojo::AssociatedGroup* GetAssociatedGroup() override { 764 mojo::AssociatedGroup* GetAssociatedGroup() override {
692 return binding_.associated_group(); 765 return binding_.associated_group();
693 } 766 }
694 767
768 void SetProxyTaskRunner(
769 scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
770 binding_.controller()->SetProxyTaskRunner(task_runner);
771 }
772
695 // mojom::Bootstrap implementation. 773 // mojom::Bootstrap implementation.
696 void Init(mojom::ChannelAssociatedRequest receive_channel, 774 void Init(mojom::ChannelAssociatedRequest receive_channel,
697 mojom::ChannelAssociatedPtrInfo send_channel, 775 mojom::ChannelAssociatedPtrInfo send_channel,
698 int32_t peer_pid, 776 int32_t peer_pid,
699 const InitCallback& callback) override; 777 const InitCallback& callback) override;
700 778
701 BootstrapMasterBinding binding_; 779 BootstrapMasterBinding binding_;
702 780
703 DISALLOW_COPY_AND_ASSIGN(MojoClientBootstrap); 781 DISALLOW_COPY_AND_ASSIGN(MojoClientBootstrap);
704 }; 782 };
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
771 849
772 bool MojoBootstrap::HasFailed() const { 850 bool MojoBootstrap::HasFailed() const {
773 return state() == STATE_ERROR; 851 return state() == STATE_ERROR;
774 } 852 }
775 853
776 mojo::ScopedMessagePipeHandle MojoBootstrap::TakeHandle() { 854 mojo::ScopedMessagePipeHandle MojoBootstrap::TakeHandle() {
777 return std::move(handle_); 855 return std::move(handle_);
778 } 856 }
779 857
780 } // namespace IPC 858 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698