Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1025)

Side by Side Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2147493006: Adds Channel-associated interface support on ChannelProxy's thread (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_mojo_bootstrap.h" 5 #include "ipc/ipc_mojo_bootstrap.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <map> 9 #include <map>
10 #include <memory> 10 #include <memory>
(...skipping 103 matching lines...) Expand 10 before | Expand all | Expand 10 after
114 } 114 }
115 115
116 void CloseEndpointHandle(mojo::InterfaceId id, bool is_local) override { 116 void CloseEndpointHandle(mojo::InterfaceId id, bool is_local) override {
117 if (!mojo::IsValidInterfaceId(id)) 117 if (!mojo::IsValidInterfaceId(id))
118 return; 118 return;
119 119
120 base::AutoLock locker(lock_); 120 base::AutoLock locker(lock_);
121 if (!is_local) { 121 if (!is_local) {
122 DCHECK(ContainsKey(endpoints_, id)); 122 DCHECK(ContainsKey(endpoints_, id));
123 DCHECK(!mojo::IsMasterInterfaceId(id)); 123 DCHECK(!mojo::IsMasterInterfaceId(id));
124 control_message_proxy_.NotifyEndpointClosedBeforeSent(id); 124 NotifyEndpointClosedBeforeSent(id);
125 return; 125 return;
126 } 126 }
127 127
128 DCHECK(ContainsKey(endpoints_, id)); 128 DCHECK(ContainsKey(endpoints_, id));
129 Endpoint* endpoint = endpoints_[id].get(); 129 Endpoint* endpoint = endpoints_[id].get();
130 DCHECK(!endpoint->client()); 130 DCHECK(!endpoint->client());
131 DCHECK(!endpoint->closed()); 131 DCHECK(!endpoint->closed());
132 MarkClosedAndMaybeRemove(endpoint); 132 MarkClosedAndMaybeRemove(endpoint);
133 133
134 if (!mojo::IsMasterInterfaceId(id)) 134 if (!mojo::IsMasterInterfaceId(id))
135 control_message_proxy_.NotifyPeerEndpointClosed(id); 135 NotifyPeerEndpointClosed(id);
136 } 136 }
137 137
138 mojo::InterfaceEndpointController* AttachEndpointClient( 138 mojo::InterfaceEndpointController* AttachEndpointClient(
139 const mojo::ScopedInterfaceEndpointHandle& handle, 139 const mojo::ScopedInterfaceEndpointHandle& handle,
140 mojo::InterfaceEndpointClient* client, 140 mojo::InterfaceEndpointClient* client,
141 scoped_refptr<base::SingleThreadTaskRunner> runner) override { 141 scoped_refptr<base::SingleThreadTaskRunner> runner) override {
142 const mojo::InterfaceId id = handle.id(); 142 const mojo::InterfaceId id = handle.id();
143 143
144 DCHECK(mojo::IsValidInterfaceId(id)); 144 DCHECK(mojo::IsValidInterfaceId(id));
145 DCHECK(client); 145 DCHECK(client);
(...skipping 239 matching lines...) Expand 10 before | Expand all | Expand 10 after
385 endpoints_.erase(endpoint->id()); 385 endpoints_.erase(endpoint->id());
386 } 386 }
387 387
388 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { 388 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) {
389 lock_.AssertAcquired(); 389 lock_.AssertAcquired();
390 endpoint->set_peer_closed(); 390 endpoint->set_peer_closed();
391 if (endpoint->closed() && endpoint->peer_closed()) 391 if (endpoint->closed() && endpoint->peer_closed())
392 endpoints_.erase(endpoint->id()); 392 endpoints_.erase(endpoint->id());
393 } 393 }
394 394
395 void NotifyPeerEndpointClosed(mojo::InterfaceId id) {
396 if (task_runner_->BelongsToCurrentThread()) {
397 if (connector_.is_valid())
398 control_message_proxy_.NotifyPeerEndpointClosed(id);
399 } else {
400 task_runner_->PostTask(
401 FROM_HERE,
402 base::Bind(&ChannelAssociatedGroupController
403 ::NotifyPeerEndpointClosed, this, id));
404 }
405 }
406
407 void NotifyEndpointClosedBeforeSent(mojo::InterfaceId id) {
408 if (task_runner_->BelongsToCurrentThread()) {
409 if (connector_.is_valid())
410 control_message_proxy_.NotifyEndpointClosedBeforeSent(id);
411 } else {
412 task_runner_->PostTask(
413 FROM_HERE,
414 base::Bind(&ChannelAssociatedGroupController
415 ::NotifyEndpointClosedBeforeSent, this, id));
416 }
417 }
418
395 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { 419 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
396 lock_.AssertAcquired(); 420 lock_.AssertAcquired();
397 DCHECK(!inserted || !*inserted); 421 DCHECK(!inserted || !*inserted);
398 422
399 auto iter = endpoints_.find(id); 423 auto iter = endpoints_.find(id);
400 if (iter != endpoints_.end()) 424 if (iter != endpoints_.end())
401 return iter->second.get(); 425 return iter->second.get();
402 426
403 Endpoint* endpoint = new Endpoint(this, id); 427 Endpoint* endpoint = new Endpoint(this, id);
404 endpoints_.insert({ id, endpoint }); 428 endpoints_.insert({ id, endpoint });
405 if (inserted) 429 if (inserted)
406 *inserted = true; 430 *inserted = true;
407 return endpoint; 431 return endpoint;
408 } 432 }
409 433
410 // mojo::MessageReceiver: 434 // mojo::MessageReceiver:
411 bool Accept(mojo::Message* message) override { 435 bool Accept(mojo::Message* message) override {
412 DCHECK(thread_checker_.CalledOnValidThread()); 436 DCHECK(thread_checker_.CalledOnValidThread());
413 437
414 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) { 438 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
415 if (!control_message_handler_.Accept(message)) 439 return control_message_handler_.Accept(message);
416 RaiseError();
417 return true;
418 }
419 440
420 mojo::InterfaceId id = message->interface_id(); 441 mojo::InterfaceId id = message->interface_id();
421 DCHECK(mojo::IsValidInterfaceId(id)); 442 DCHECK(mojo::IsValidInterfaceId(id));
422 443
423 base::AutoLock locker(lock_); 444 base::AutoLock locker(lock_);
424 bool inserted = false; 445 Endpoint* endpoint = GetEndpointForDispatch(id);
425 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 446 if (!endpoint)
426 if (inserted) {
427 MarkClosedAndMaybeRemove(endpoint);
428 if (!mojo::IsMasterInterfaceId(id))
429 control_message_proxy_.NotifyPeerEndpointClosed(id);
430 return true;
431 }
432
433 if (endpoint->closed())
434 return true; 447 return true;
435 448
436 mojo::InterfaceEndpointClient* client = endpoint->client(); 449 mojo::InterfaceEndpointClient* client = endpoint->client();
437 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { 450 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) {
438 // No client has been bound yet or the client runs tasks on another 451 // No client has been bound yet or the client runs tasks on another
439 // thread. We assume the other thread must always be the one on which 452 // thread. We assume the other thread must always be the one on which
440 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. 453 // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
441 // 454 //
442 // If the client is not yet bound, it must be bound by the time this task 455 // If the client is not yet bound, it must be bound by the time this task
443 // runs or else it's programmer error. 456 // runs or else it's programmer error.
444 DCHECK(proxy_task_runner_); 457 DCHECK(proxy_task_runner_);
445 CHECK(false);
446 std::unique_ptr<mojo::Message> passed_message(new mojo::Message); 458 std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
447 message->MoveTo(passed_message.get()); 459 message->MoveTo(passed_message.get());
448 proxy_task_runner_->PostTask( 460 proxy_task_runner_->PostTask(
449 FROM_HERE, 461 FROM_HERE,
450 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, 462 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
451 this, base::Passed(&passed_message))); 463 this, base::Passed(&passed_message)));
452 return true; 464 return true;
453 } 465 }
454 466
455 // We do not expect to receive sync responses on the master endpoint thread. 467 // We do not expect to receive sync responses on the master endpoint thread.
456 // If it's happening, it's a bug. 468 // If it's happening, it's a bug.
457 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); 469 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
458 470
471 base::AutoUnlock unlocker(lock_);
472 return client->HandleIncomingMessage(message);
473 }
474
475 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) {
476 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
477
478 mojo::InterfaceId id = message->interface_id();
479 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
480
481 base::AutoLock locker(lock_);
482 Endpoint* endpoint = GetEndpointForDispatch(id);
483 if (!endpoint)
484 return;
485
486 mojo::InterfaceEndpointClient* client = endpoint->client();
487 if (!client)
488 return;
489
490 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
491
492 // TODO(rockot): Implement sync dispatch. For now, sync messages are
493 // unsupported here.
494 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
495
459 bool result = false; 496 bool result = false;
460 { 497 {
461 base::AutoUnlock unlocker(lock_); 498 base::AutoUnlock unlocker(lock_);
462 result = client->HandleIncomingMessage(message); 499 result = client->HandleIncomingMessage(message.get());
463 } 500 }
464 501
465 if (!result) 502 if (!result)
466 RaiseError(); 503 RaiseError();
467
468 return true;
469 } 504 }
470 505
471 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { 506 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) {
472 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 507 lock_.AssertAcquired();
508 bool inserted = false;
509 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
510 if (inserted) {
511 MarkClosedAndMaybeRemove(endpoint);
512 if (!mojo::IsMasterInterfaceId(id))
513 NotifyPeerEndpointClosed(id);
514 return nullptr;
515 }
473 516
474 // TODO(rockot): Implement this. 517 if (endpoint->closed())
475 NOTREACHED(); 518 return nullptr;
519
520 return endpoint;
476 } 521 }
477 522
478 // mojo::PipeControlMessageHandlerDelegate: 523 // mojo::PipeControlMessageHandlerDelegate:
479 bool OnPeerAssociatedEndpointClosed(mojo::InterfaceId id) override { 524 bool OnPeerAssociatedEndpointClosed(mojo::InterfaceId id) override {
480 DCHECK(thread_checker_.CalledOnValidThread()); 525 DCHECK(thread_checker_.CalledOnValidThread());
481 526
482 if (mojo::IsMasterInterfaceId(id)) 527 if (mojo::IsMasterInterfaceId(id))
483 return false; 528 return false;
484 529
485 base::AutoLock locker(lock_); 530 base::AutoLock locker(lock_);
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
554 void set_connection_error_handler(const base::Closure& handler) { 599 void set_connection_error_handler(const base::Closure& handler) {
555 DCHECK(endpoint_client_); 600 DCHECK(endpoint_client_);
556 endpoint_client_->set_connection_error_handler(handler); 601 endpoint_client_->set_connection_error_handler(handler);
557 } 602 }
558 603
559 mojo::AssociatedGroup* associated_group() { 604 mojo::AssociatedGroup* associated_group() {
560 DCHECK(controller_); 605 DCHECK(controller_);
561 return controller_->associated_group(); 606 return controller_->associated_group();
562 } 607 }
563 608
609 ChannelAssociatedGroupController* controller() {
610 DCHECK(controller_);
611 return controller_.get();
612 }
613
564 mojom::Bootstrap* operator->() { 614 mojom::Bootstrap* operator->() {
565 DCHECK(proxy_); 615 DCHECK(proxy_);
566 return proxy_.get(); 616 return proxy_.get();
567 } 617 }
568 618
569 private: 619 private:
570 std::unique_ptr<mojom::BootstrapProxy> proxy_; 620 std::unique_ptr<mojom::BootstrapProxy> proxy_;
571 scoped_refptr<ChannelAssociatedGroupController> controller_; 621 scoped_refptr<ChannelAssociatedGroupController> controller_;
572 std::unique_ptr<mojo::InterfaceEndpointClient> endpoint_client_; 622 std::unique_ptr<mojo::InterfaceEndpointClient> endpoint_client_;
573 623
(...skipping 15 matching lines...) Expand all
589 void set_connection_error_handler(const base::Closure& handler) { 639 void set_connection_error_handler(const base::Closure& handler) {
590 DCHECK(endpoint_client_); 640 DCHECK(endpoint_client_);
591 endpoint_client_->set_connection_error_handler(handler); 641 endpoint_client_->set_connection_error_handler(handler);
592 } 642 }
593 643
594 mojo::AssociatedGroup* associated_group() { 644 mojo::AssociatedGroup* associated_group() {
595 DCHECK(controller_); 645 DCHECK(controller_);
596 return controller_->associated_group(); 646 return controller_->associated_group();
597 } 647 }
598 648
649 ChannelAssociatedGroupController* controller() {
650 DCHECK(controller_);
651 return controller_.get();
652 }
653
599 void Bind(mojo::ScopedMessagePipeHandle handle) { 654 void Bind(mojo::ScopedMessagePipeHandle handle) {
600 DCHECK(!controller_); 655 DCHECK(!controller_);
601 controller_ = 656 controller_ =
602 new ChannelAssociatedGroupController(false, std::move(handle)); 657 new ChannelAssociatedGroupController(false, std::move(handle));
603 stub_.serialization_context()->group_controller = controller_; 658 stub_.serialization_context()->group_controller = controller_;
604 659
605 endpoint_client_.reset(new mojo::InterfaceEndpointClient( 660 endpoint_client_.reset(new mojo::InterfaceEndpointClient(
606 controller_->CreateLocalEndpointHandle(mojo::kMasterInterfaceId), 661 controller_->CreateLocalEndpointHandle(mojo::kMasterInterfaceId),
607 &stub_, 662 &stub_,
608 base::MakeUnique<typename mojom::Bootstrap::RequestValidator_>(), 663 base::MakeUnique<typename mojom::Bootstrap::RequestValidator_>(),
(...skipping 10 matching lines...) Expand all
619 674
620 // MojoBootstrap for the server process. You should create the instance 675 // MojoBootstrap for the server process. You should create the instance
621 // using MojoBootstrap::Create(). 676 // using MojoBootstrap::Create().
622 class MojoServerBootstrap : public MojoBootstrap { 677 class MojoServerBootstrap : public MojoBootstrap {
623 public: 678 public:
624 MojoServerBootstrap(); 679 MojoServerBootstrap();
625 680
626 private: 681 private:
627 // MojoBootstrap implementation. 682 // MojoBootstrap implementation.
628 void Connect() override; 683 void Connect() override;
684
629 mojo::AssociatedGroup* GetAssociatedGroup() override { 685 mojo::AssociatedGroup* GetAssociatedGroup() override {
630 return bootstrap_.associated_group(); 686 return bootstrap_.associated_group();
631 } 687 }
632 688
689 void SetProxyTaskRunner(
690 scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
691 bootstrap_.controller()->SetProxyTaskRunner(task_runner);
692 }
693
633 void OnInitDone(int32_t peer_pid); 694 void OnInitDone(int32_t peer_pid);
634 695
635 BootstrapMasterProxy bootstrap_; 696 BootstrapMasterProxy bootstrap_;
636 IPC::mojom::ChannelAssociatedPtrInfo send_channel_; 697 IPC::mojom::ChannelAssociatedPtrInfo send_channel_;
637 IPC::mojom::ChannelAssociatedRequest receive_channel_request_; 698 IPC::mojom::ChannelAssociatedRequest receive_channel_request_;
638 699
639 DISALLOW_COPY_AND_ASSIGN(MojoServerBootstrap); 700 DISALLOW_COPY_AND_ASSIGN(MojoServerBootstrap);
640 }; 701 };
641 702
642 MojoServerBootstrap::MojoServerBootstrap() = default; 703 MojoServerBootstrap::MojoServerBootstrap() = default;
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
681 742
682 // MojoBootstrap for client processes. You should create the instance 743 // MojoBootstrap for client processes. You should create the instance
683 // using MojoBootstrap::Create(). 744 // using MojoBootstrap::Create().
684 class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap { 745 class MojoClientBootstrap : public MojoBootstrap, public mojom::Bootstrap {
685 public: 746 public:
686 MojoClientBootstrap(); 747 MojoClientBootstrap();
687 748
688 private: 749 private:
689 // MojoBootstrap implementation. 750 // MojoBootstrap implementation.
690 void Connect() override; 751 void Connect() override;
752
691 mojo::AssociatedGroup* GetAssociatedGroup() override { 753 mojo::AssociatedGroup* GetAssociatedGroup() override {
692 return binding_.associated_group(); 754 return binding_.associated_group();
693 } 755 }
694 756
757 void SetProxyTaskRunner(
758 scoped_refptr<base::SingleThreadTaskRunner> task_runner) override {
759 binding_.controller()->SetProxyTaskRunner(task_runner);
760 }
761
695 // mojom::Bootstrap implementation. 762 // mojom::Bootstrap implementation.
696 void Init(mojom::ChannelAssociatedRequest receive_channel, 763 void Init(mojom::ChannelAssociatedRequest receive_channel,
697 mojom::ChannelAssociatedPtrInfo send_channel, 764 mojom::ChannelAssociatedPtrInfo send_channel,
698 int32_t peer_pid, 765 int32_t peer_pid,
699 const InitCallback& callback) override; 766 const InitCallback& callback) override;
700 767
701 BootstrapMasterBinding binding_; 768 BootstrapMasterBinding binding_;
702 769
703 DISALLOW_COPY_AND_ASSIGN(MojoClientBootstrap); 770 DISALLOW_COPY_AND_ASSIGN(MojoClientBootstrap);
704 }; 771 };
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
771 838
772 bool MojoBootstrap::HasFailed() const { 839 bool MojoBootstrap::HasFailed() const {
773 return state() == STATE_ERROR; 840 return state() == STATE_ERROR;
774 } 841 }
775 842
776 mojo::ScopedMessagePipeHandle MojoBootstrap::TakeHandle() { 843 mojo::ScopedMessagePipeHandle MojoBootstrap::TakeHandle() {
777 return std::move(handle_); 844 return std::move(handle_);
778 } 845 }
779 846
780 } // namespace IPC 847 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/ipc_mojo_bootstrap.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698