Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(417)

Side by Side Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2674483002: Mojo C++ bindings: fix MultiplexRouter and ChannelAssociatedGroupController. (Closed)
Patch Set: . Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | mojo/public/cpp/bindings/lib/multiplex_router.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_mojo_bootstrap.h" 5 #include "ipc/ipc_mojo_bootstrap.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <map> 9 #include <map>
10 #include <memory> 10 #include <memory>
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
100 sender_id = 1; 100 sender_id = 1;
101 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask; 101 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask;
102 } 102 }
103 103
104 { 104 {
105 base::AutoLock locker(lock_); 105 base::AutoLock locker(lock_);
106 Endpoint* sender_endpoint = new Endpoint(this, sender_id); 106 Endpoint* sender_endpoint = new Endpoint(this, sender_id);
107 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id); 107 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id);
108 endpoints_.insert({ sender_id, sender_endpoint }); 108 endpoints_.insert({ sender_id, sender_endpoint });
109 endpoints_.insert({ receiver_id, receiver_endpoint }); 109 endpoints_.insert({ receiver_id, receiver_endpoint });
110 sender_endpoint->set_handle_created();
111 receiver_endpoint->set_handle_created();
110 } 112 }
111 113
112 mojo::ScopedInterfaceEndpointHandle sender_handle = 114 mojo::ScopedInterfaceEndpointHandle sender_handle =
113 CreateScopedInterfaceEndpointHandle(sender_id, true); 115 CreateScopedInterfaceEndpointHandle(sender_id, true);
114 mojo::ScopedInterfaceEndpointHandle receiver_handle = 116 mojo::ScopedInterfaceEndpointHandle receiver_handle =
115 CreateScopedInterfaceEndpointHandle(receiver_id, true); 117 CreateScopedInterfaceEndpointHandle(receiver_id, true);
116 118
117 sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0)); 119 sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0));
118 receiver->Bind(std::move(receiver_handle)); 120 receiver->Bind(std::move(receiver_handle));
119 } 121 }
(...skipping 17 matching lines...) Expand all
137 id = next_interface_id_++; 139 id = next_interface_id_++;
138 if (set_interface_id_namespace_bit_) 140 if (set_interface_id_namespace_bit_)
139 id |= mojo::kInterfaceIdNamespaceMask; 141 id |= mojo::kInterfaceIdNamespaceMask;
140 } while (ContainsKey(endpoints_, id)); 142 } while (ContainsKey(endpoints_, id));
141 143
142 Endpoint* endpoint = new Endpoint(this, id); 144 Endpoint* endpoint = new Endpoint(this, id);
143 if (encountered_error_) 145 if (encountered_error_)
144 endpoint->set_peer_closed(); 146 endpoint->set_peer_closed();
145 endpoints_.insert({ id, endpoint }); 147 endpoints_.insert({ id, endpoint });
146 148
149 endpoint->set_handle_created();
147 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); 150 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true);
148 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); 151 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false);
149 } 152 }
150 153
151 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( 154 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle(
152 mojo::InterfaceId id) override { 155 mojo::InterfaceId id) override {
153 if (!mojo::IsValidInterfaceId(id)) 156 if (!mojo::IsValidInterfaceId(id))
154 return mojo::ScopedInterfaceEndpointHandle(); 157 return mojo::ScopedInterfaceEndpointHandle();
155 158
156 base::AutoLock locker(lock_); 159 base::AutoLock locker(lock_);
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
240 base::Bind(&ChannelAssociatedGroupController::RaiseError, this)); 243 base::Bind(&ChannelAssociatedGroupController::RaiseError, this));
241 } 244 }
242 } 245 }
243 246
244 private: 247 private:
245 class Endpoint; 248 class Endpoint;
246 class ControlMessageProxyThunk; 249 class ControlMessageProxyThunk;
247 friend class Endpoint; 250 friend class Endpoint;
248 friend class ControlMessageProxyThunk; 251 friend class ControlMessageProxyThunk;
249 252
250 // Message objects cannot be destroyed under the controller's lock, if they 253 // MessageWrapper objects are always destroyed under the controller's lock. On
251 // contain ScopedInterfaceEndpointHandle objects. 254 // destruction, if the message it wrappers contains
252 // IncomingMessageWrapper is used to wrap messages which haven't got the 255 // ScopedInterfaceEndpointHandles (which cannot be destructed under the
253 // payload interface IDs deserialized into ScopedInterfaceEndpointHandles. 256 // controller's lock), the wrapper unlocks to clean them up.
254 // Wrapper objects are always destroyed under the controller's lock. When a 257 class MessageWrapper {
255 // wrapper is destroyed and the message hasn't been consumed, the wrapper is
256 // responsible to send endpoint closed notifications.
257 class IncomingMessageWrapper {
258 public: 258 public:
259 IncomingMessageWrapper() = default; 259 MessageWrapper() = default;
260 260
261 IncomingMessageWrapper(ChannelAssociatedGroupController* controller, 261 MessageWrapper(ChannelAssociatedGroupController* controller,
262 mojo::Message* message) 262 mojo::Message message)
263 : controller_(controller), value_(std::move(*message)) { 263 : controller_(controller), value_(std::move(message)) {}
264 DCHECK(value_.associated_endpoint_handles()->empty());
265 }
266 264
267 IncomingMessageWrapper(IncomingMessageWrapper&& other) 265 MessageWrapper(MessageWrapper&& other)
268 : controller_(other.controller_), value_(std::move(other.value_)) {} 266 : controller_(other.controller_), value_(std::move(other.value_)) {}
269 267
270 ~IncomingMessageWrapper() { 268 ~MessageWrapper() {
271 if (value_.IsNull()) 269 if (value_.associated_endpoint_handles()->empty())
272 return; 270 return;
273 271
274 controller_->lock_.AssertAcquired(); 272 controller_->lock_.AssertAcquired();
275 273 {
276 uint32_t num_ids = value_.payload_num_interface_ids();
277 const uint32_t* ids = value_.payload_interface_ids();
278 for (uint32_t i = 0; i < num_ids; ++i) {
279 base::AutoUnlock unlocker(controller_->lock_); 274 base::AutoUnlock unlocker(controller_->lock_);
280 controller_->control_message_proxy_.NotifyPeerEndpointClosed( 275 value_.mutable_associated_endpoint_handles()->clear();
281 ids[i], base::nullopt);
282 } 276 }
283 } 277 }
284 278
285 IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) { 279 MessageWrapper& operator=(MessageWrapper&& other) {
286 controller_ = other.controller_; 280 controller_ = other.controller_;
287 value_ = std::move(other.value_); 281 value_ = std::move(other.value_);
288 return *this; 282 return *this;
289 } 283 }
290 284
291 // Must be called outside of the controller's lock. 285 mojo::Message& value() { return value_; }
292 bool TakeMessage(mojo::Message* output) {
293 DCHECK(!value_.IsNull());
294
295 *output = std::move(value_);
296 return output->DeserializeAssociatedEndpointHandles(controller_);
297 }
298
299 const mojo::Message& value() const { return value_; }
300 286
301 private: 287 private:
302 ChannelAssociatedGroupController* controller_ = nullptr; 288 ChannelAssociatedGroupController* controller_ = nullptr;
303 // It must not hold any ScopedInterfaceEndpointHandle objects.
304 mojo::Message value_; 289 mojo::Message value_;
305 290
306 DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper); 291 DISALLOW_COPY_AND_ASSIGN(MessageWrapper);
307 }; 292 };
308 293
309 class Endpoint : public base::RefCountedThreadSafe<Endpoint>, 294 class Endpoint : public base::RefCountedThreadSafe<Endpoint>,
310 public mojo::InterfaceEndpointController { 295 public mojo::InterfaceEndpointController {
311 public: 296 public:
312 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id) 297 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id)
313 : controller_(controller), id_(id) {} 298 : controller_(controller), id_(id) {}
314 299
315 mojo::InterfaceId id() const { return id_; } 300 mojo::InterfaceId id() const { return id_; }
316 301
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
377 controller_->lock_.AssertAcquired(); 362 controller_->lock_.AssertAcquired();
378 DCHECK(client_); 363 DCHECK(client_);
379 DCHECK(task_runner_->BelongsToCurrentThread()); 364 DCHECK(task_runner_->BelongsToCurrentThread());
380 DCHECK(!closed_); 365 DCHECK(!closed_);
381 366
382 task_runner_ = nullptr; 367 task_runner_ = nullptr;
383 client_ = nullptr; 368 client_ = nullptr;
384 sync_watcher_.reset(); 369 sync_watcher_.reset();
385 } 370 }
386 371
387 uint32_t EnqueueSyncMessage(IncomingMessageWrapper message) { 372 uint32_t EnqueueSyncMessage(MessageWrapper message) {
388 controller_->lock_.AssertAcquired(); 373 controller_->lock_.AssertAcquired();
389 uint32_t id = GenerateSyncMessageId(); 374 uint32_t id = GenerateSyncMessageId();
390 sync_messages_.emplace(id, std::move(message)); 375 sync_messages_.emplace(id, std::move(message));
391 SignalSyncMessageEvent(); 376 SignalSyncMessageEvent();
392 return id; 377 return id;
393 } 378 }
394 379
395 void SignalSyncMessageEvent() { 380 void SignalSyncMessageEvent() {
396 controller_->lock_.AssertAcquired(); 381 controller_->lock_.AssertAcquired();
397 EnsureSyncMessageEventExists(); 382 EnsureSyncMessageEventExists();
398 sync_message_event_->Signal(); 383 sync_message_event_->Signal();
399 } 384 }
400 385
401 IncomingMessageWrapper PopSyncMessage(uint32_t id) { 386 MessageWrapper PopSyncMessage(uint32_t id) {
402 controller_->lock_.AssertAcquired(); 387 controller_->lock_.AssertAcquired();
403 if (sync_messages_.empty() || sync_messages_.front().first != id) 388 if (sync_messages_.empty() || sync_messages_.front().first != id)
404 return IncomingMessageWrapper(); 389 return MessageWrapper();
405 IncomingMessageWrapper message = std::move(sync_messages_.front().second); 390 MessageWrapper message = std::move(sync_messages_.front().second);
406 sync_messages_.pop(); 391 sync_messages_.pop();
407 return message; 392 return message;
408 } 393 }
409 394
410 // mojo::InterfaceEndpointController: 395 // mojo::InterfaceEndpointController:
411 bool SendMessage(mojo::Message* message) override { 396 bool SendMessage(mojo::Message* message) override {
412 DCHECK(task_runner_->BelongsToCurrentThread()); 397 DCHECK(task_runner_->BelongsToCurrentThread());
413 message->set_interface_id(id_); 398 message->set_interface_id(id_);
414 message->SerializeAssociatedEndpointHandles(controller_); 399 message->SerializeAssociatedEndpointHandles(controller_);
415 return controller_->SendMessage(message); 400 return controller_->SendMessage(message);
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
450 435
451 scoped_refptr<Endpoint> keepalive(this); 436 scoped_refptr<Endpoint> keepalive(this);
452 scoped_refptr<AssociatedGroupController> controller_keepalive( 437 scoped_refptr<AssociatedGroupController> controller_keepalive(
453 controller_); 438 controller_);
454 439
455 bool reset_sync_watcher = false; 440 bool reset_sync_watcher = false;
456 { 441 {
457 base::AutoLock locker(controller_->lock_); 442 base::AutoLock locker(controller_->lock_);
458 bool more_to_process = false; 443 bool more_to_process = false;
459 if (!sync_messages_.empty()) { 444 if (!sync_messages_.empty()) {
460 IncomingMessageWrapper message_wrapper = 445 MessageWrapper message_wrapper =
461 std::move(sync_messages_.front().second); 446 std::move(sync_messages_.front().second);
462 sync_messages_.pop(); 447 sync_messages_.pop();
463 448
464 bool dispatch_succeeded; 449 bool dispatch_succeeded;
465 mojo::InterfaceEndpointClient* client = client_; 450 mojo::InterfaceEndpointClient* client = client_;
466 { 451 {
467 base::AutoUnlock unlocker(controller_->lock_); 452 base::AutoUnlock unlocker(controller_->lock_);
468 mojo::Message message; 453 dispatch_succeeded =
469 dispatch_succeeded = message_wrapper.TakeMessage(&message) && 454 client->HandleIncomingMessage(&message_wrapper.value());
470 client->HandleIncomingMessage(&message);
471 } 455 }
472 456
473 if (!sync_messages_.empty()) 457 if (!sync_messages_.empty())
474 more_to_process = true; 458 more_to_process = true;
475 459
476 if (!dispatch_succeeded) 460 if (!dispatch_succeeded)
477 controller_->RaiseError(); 461 controller_->RaiseError();
478 } 462 }
479 463
480 if (!more_to_process) 464 if (!more_to_process)
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
528 const mojo::InterfaceId id_; 512 const mojo::InterfaceId id_;
529 513
530 bool closed_ = false; 514 bool closed_ = false;
531 bool peer_closed_ = false; 515 bool peer_closed_ = false;
532 bool handle_created_ = false; 516 bool handle_created_ = false;
533 base::Optional<mojo::DisconnectReason> disconnect_reason_; 517 base::Optional<mojo::DisconnectReason> disconnect_reason_;
534 mojo::InterfaceEndpointClient* client_ = nullptr; 518 mojo::InterfaceEndpointClient* client_ = nullptr;
535 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 519 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
536 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; 520 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
537 std::unique_ptr<MojoEvent> sync_message_event_; 521 std::unique_ptr<MojoEvent> sync_message_event_;
538 std::queue<std::pair<uint32_t, IncomingMessageWrapper>> sync_messages_; 522 std::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_;
539 uint32_t next_sync_message_id_ = 0; 523 uint32_t next_sync_message_id_ = 0;
540 524
541 DISALLOW_COPY_AND_ASSIGN(Endpoint); 525 DISALLOW_COPY_AND_ASSIGN(Endpoint);
542 }; 526 };
543 527
544 class ControlMessageProxyThunk : public MessageReceiver { 528 class ControlMessageProxyThunk : public MessageReceiver {
545 public: 529 public:
546 explicit ControlMessageProxyThunk( 530 explicit ControlMessageProxyThunk(
547 ChannelAssociatedGroupController* controller) 531 ChannelAssociatedGroupController* controller)
548 : controller_(controller) {} 532 : controller_(controller) {}
(...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after
682 endpoint->set_peer_closed(); 666 endpoint->set_peer_closed();
683 endpoint->SignalSyncMessageEvent(); 667 endpoint->SignalSyncMessageEvent();
684 if (endpoint->closed() && endpoint->peer_closed()) 668 if (endpoint->closed() && endpoint->peer_closed())
685 endpoints_.erase(endpoint->id()); 669 endpoints_.erase(endpoint->id());
686 } 670 }
687 671
688 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { 672 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) {
689 lock_.AssertAcquired(); 673 lock_.AssertAcquired();
690 DCHECK(!inserted || !*inserted); 674 DCHECK(!inserted || !*inserted);
691 675
676 Endpoint* endpoint = FindEndpoint(id);
677 if (!endpoint) {
678 endpoint = new Endpoint(this, id);
679 endpoints_.insert({id, endpoint});
680 if (inserted)
681 *inserted = true;
682 }
683 return endpoint;
684 }
685
686 Endpoint* FindEndpoint(mojo::InterfaceId id) {
687 lock_.AssertAcquired();
692 auto iter = endpoints_.find(id); 688 auto iter = endpoints_.find(id);
693 if (iter != endpoints_.end()) 689 return iter != endpoints_.end() ? iter->second.get() : nullptr;
694 return iter->second.get();
695
696 Endpoint* endpoint = new Endpoint(this, id);
697 endpoints_.insert({ id, endpoint });
698 if (inserted)
699 *inserted = true;
700 return endpoint;
701 } 690 }
702 691
703 // mojo::MessageReceiver: 692 // mojo::MessageReceiver:
704 bool Accept(mojo::Message* message) override { 693 bool Accept(mojo::Message* message) override {
705 DCHECK(thread_checker_.CalledOnValidThread()); 694 DCHECK(thread_checker_.CalledOnValidThread());
706 695
707 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) { 696 if (!message->DeserializeAssociatedEndpointHandles(this))
708 return message->DeserializeAssociatedEndpointHandles(this) && 697 return false;
709 control_message_handler_.Accept(message); 698
710 } 699 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
700 return control_message_handler_.Accept(message);
711 701
712 mojo::InterfaceId id = message->interface_id(); 702 mojo::InterfaceId id = message->interface_id();
713 DCHECK(mojo::IsValidInterfaceId(id)); 703 DCHECK(mojo::IsValidInterfaceId(id));
714 704
715 base::AutoLock locker(lock_); 705 base::AutoLock locker(lock_);
716 Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */); 706 Endpoint* endpoint = FindEndpoint(id);
717 mojo::InterfaceEndpointClient* client = 707 if (!endpoint)
718 endpoint ? endpoint->client() : nullptr; 708 return true;
709
710 mojo::InterfaceEndpointClient* client = endpoint->client();
719 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { 711 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) {
720 // No client has been bound yet or the client runs tasks on another 712 // No client has been bound yet or the client runs tasks on another
721 // thread. We assume the other thread must always be the one on which 713 // thread. We assume the other thread must always be the one on which
722 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. 714 // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
723 // 715 //
724 // If the client is not yet bound, it must be bound by the time this task 716 // If the client is not yet bound, it must be bound by the time this task
725 // runs or else it's programmer error. 717 // runs or else it's programmer error.
726 DCHECK(proxy_task_runner_); 718 DCHECK(proxy_task_runner_);
727 719
728 if (message->has_flag(mojo::Message::kFlagIsSync)) { 720 if (message->has_flag(mojo::Message::kFlagIsSync)) {
729 IncomingMessageWrapper message_wrapper(this, message); 721 MessageWrapper message_wrapper(this, std::move(*message));
730 // Sync messages may need to be handled by the endpoint if it's blocking 722 // Sync messages may need to be handled by the endpoint if it's blocking
731 // on a sync reply. We pass ownership of the message to the endpoint's 723 // on a sync reply. We pass ownership of the message to the endpoint's
732 // sync message queue. If the endpoint was blocking, it will dequeue the 724 // sync message queue. If the endpoint was blocking, it will dequeue the
733 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| 725 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
734 // call will dequeue the message and dispatch it. 726 // call will dequeue the message and dispatch it.
735 uint32_t message_id = 727 uint32_t message_id =
736 endpoint->EnqueueSyncMessage(std::move(message_wrapper)); 728 endpoint->EnqueueSyncMessage(std::move(message_wrapper));
737 proxy_task_runner_->PostTask( 729 proxy_task_runner_->PostTask(
738 FROM_HERE, 730 FROM_HERE,
739 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, 731 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
740 this, id, message_id)); 732 this, id, message_id));
741 return true; 733 return true;
742 } 734 }
743 735
744 proxy_task_runner_->PostTask( 736 proxy_task_runner_->PostTask(
745 FROM_HERE, 737 FROM_HERE,
746 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, 738 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
747 this, base::Passed(message))); 739 this, base::Passed(message)));
748 return true; 740 return true;
749 } 741 }
750 742
751 // We do not expect to receive sync responses on the master endpoint thread. 743 // We do not expect to receive sync responses on the master endpoint thread.
752 // If it's happening, it's a bug. 744 // If it's happening, it's a bug.
753 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || 745 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) ||
754 !message->has_flag(mojo::Message::kFlagIsResponse)); 746 !message->has_flag(mojo::Message::kFlagIsResponse));
755 747
756 base::AutoUnlock unlocker(lock_); 748 base::AutoUnlock unlocker(lock_);
757 return message->DeserializeAssociatedEndpointHandles(this) && 749 return client->HandleIncomingMessage(message);
758 client->HandleIncomingMessage(message);
759 } 750 }
760 751
761 void AcceptOnProxyThread(mojo::Message message) { 752 void AcceptOnProxyThread(mojo::Message message) {
762 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 753 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
763 754
764 mojo::InterfaceId id = message.interface_id(); 755 mojo::InterfaceId id = message.interface_id();
765 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); 756 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));
766 757
767 base::AutoLock locker(lock_); 758 base::AutoLock locker(lock_);
768 IncomingMessageWrapper message_wrapper(this, &message); 759 Endpoint* endpoint = FindEndpoint(id);
769
770 Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */);
771 if (!endpoint) 760 if (!endpoint)
772 return; 761 return;
773 762
774 mojo::InterfaceEndpointClient* client = endpoint->client(); 763 mojo::InterfaceEndpointClient* client = endpoint->client();
775 if (!client) 764 if (!client)
776 return; 765 return;
777 766
778 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 767 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
779 768
780 // Sync messages should never make their way to this method. 769 // Sync messages should never make their way to this method.
781 DCHECK(!message_wrapper.value().has_flag(mojo::Message::kFlagIsSync)); 770 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
782 771
783 bool result = false; 772 bool result = false;
784 { 773 {
785 base::AutoUnlock unlocker(lock_); 774 base::AutoUnlock unlocker(lock_);
786 mojo::Message message; 775 result = client->HandleIncomingMessage(&message);
787 result = message_wrapper.TakeMessage(&message) &&
788 client->HandleIncomingMessage(&message);
789 } 776 }
790 777
791 if (!result) 778 if (!result)
792 RaiseError(); 779 RaiseError();
793 } 780 }
794 781
795 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { 782 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
796 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 783 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
797 784
798 base::AutoLock locker(lock_); 785 base::AutoLock locker(lock_);
799 Endpoint* endpoint = 786 Endpoint* endpoint = FindEndpoint(interface_id);
800 GetEndpointForDispatch(interface_id, false /* create */);
801 if (!endpoint) 787 if (!endpoint)
802 return; 788 return;
803 789
804 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 790 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
805 IncomingMessageWrapper message_wrapper = 791 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id);
806 endpoint->PopSyncMessage(message_id);
807 792
808 // The message must have already been dequeued by the endpoint waking up 793 // The message must have already been dequeued by the endpoint waking up
809 // from a sync wait. Nothing to do. 794 // from a sync wait. Nothing to do.
810 if (message_wrapper.value().IsNull()) 795 if (message_wrapper.value().IsNull())
811 return; 796 return;
812 797
813 mojo::InterfaceEndpointClient* client = endpoint->client(); 798 mojo::InterfaceEndpointClient* client = endpoint->client();
814 if (!client) 799 if (!client)
815 return; 800 return;
816 801
817 bool result = false; 802 bool result = false;
818 { 803 {
819 base::AutoUnlock unlocker(lock_); 804 base::AutoUnlock unlocker(lock_);
820 mojo::Message message; 805 result = client->HandleIncomingMessage(&message_wrapper.value());
821 result = message_wrapper.TakeMessage(&message) &&
822 client->HandleIncomingMessage(&message);
823 } 806 }
824 807
825 if (!result) 808 if (!result)
826 RaiseError(); 809 RaiseError();
827 } 810 }
828 811
829 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool create) {
830 lock_.AssertAcquired();
831 auto iter = endpoints_.find(id);
832 if (iter != endpoints_.end())
833 return iter->second.get();
834 if (!create)
835 return nullptr;
836 bool inserted = false;
837 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
838 DCHECK(inserted);
839 return endpoint;
840 }
841
842 // mojo::PipeControlMessageHandlerDelegate: 812 // mojo::PipeControlMessageHandlerDelegate:
843 bool OnPeerAssociatedEndpointClosed( 813 bool OnPeerAssociatedEndpointClosed(
844 mojo::InterfaceId id, 814 mojo::InterfaceId id,
845 const base::Optional<mojo::DisconnectReason>& reason) override { 815 const base::Optional<mojo::DisconnectReason>& reason) override {
846 DCHECK(thread_checker_.CalledOnValidThread()); 816 DCHECK(thread_checker_.CalledOnValidThread());
847 817
848 DCHECK(!mojo::IsMasterInterfaceId(id) || reason); 818 DCHECK(!mojo::IsMasterInterfaceId(id) || reason);
849 819
850 scoped_refptr<ChannelAssociatedGroupController> keepalive(this); 820 scoped_refptr<ChannelAssociatedGroupController> keepalive(this);
851 base::AutoLock locker(lock_); 821 base::AutoLock locker(lock_);
(...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after
972 Channel::Mode mode, 942 Channel::Mode mode,
973 Delegate* delegate, 943 Delegate* delegate,
974 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 944 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
975 return base::MakeUnique<MojoBootstrapImpl>( 945 return base::MakeUnique<MojoBootstrapImpl>(
976 std::move(handle), delegate, 946 std::move(handle), delegate,
977 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, 947 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
978 ipc_task_runner)); 948 ipc_task_runner));
979 } 949 }
980 950
981 } // namespace IPC 951 } // namespace IPC
OLDNEW
« no previous file with comments | « no previous file | mojo/public/cpp/bindings/lib/multiplex_router.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698