| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <map> | 9 #include <map> |
| 10 #include <memory> | 10 #include <memory> |
| (...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 100 sender_id = 1; | 100 sender_id = 1; |
| 101 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask; | 101 receiver_id = 1 | mojo::kInterfaceIdNamespaceMask; |
| 102 } | 102 } |
| 103 | 103 |
| 104 { | 104 { |
| 105 base::AutoLock locker(lock_); | 105 base::AutoLock locker(lock_); |
| 106 Endpoint* sender_endpoint = new Endpoint(this, sender_id); | 106 Endpoint* sender_endpoint = new Endpoint(this, sender_id); |
| 107 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id); | 107 Endpoint* receiver_endpoint = new Endpoint(this, receiver_id); |
| 108 endpoints_.insert({ sender_id, sender_endpoint }); | 108 endpoints_.insert({ sender_id, sender_endpoint }); |
| 109 endpoints_.insert({ receiver_id, receiver_endpoint }); | 109 endpoints_.insert({ receiver_id, receiver_endpoint }); |
| 110 sender_endpoint->set_handle_created(); |
| 111 receiver_endpoint->set_handle_created(); |
| 110 } | 112 } |
| 111 | 113 |
| 112 mojo::ScopedInterfaceEndpointHandle sender_handle = | 114 mojo::ScopedInterfaceEndpointHandle sender_handle = |
| 113 CreateScopedInterfaceEndpointHandle(sender_id, true); | 115 CreateScopedInterfaceEndpointHandle(sender_id, true); |
| 114 mojo::ScopedInterfaceEndpointHandle receiver_handle = | 116 mojo::ScopedInterfaceEndpointHandle receiver_handle = |
| 115 CreateScopedInterfaceEndpointHandle(receiver_id, true); | 117 CreateScopedInterfaceEndpointHandle(receiver_id, true); |
| 116 | 118 |
| 117 sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0)); | 119 sender->Bind(mojom::ChannelAssociatedPtrInfo(std::move(sender_handle), 0)); |
| 118 receiver->Bind(std::move(receiver_handle)); | 120 receiver->Bind(std::move(receiver_handle)); |
| 119 } | 121 } |
| (...skipping 17 matching lines...) Expand all Loading... |
| 137 id = next_interface_id_++; | 139 id = next_interface_id_++; |
| 138 if (set_interface_id_namespace_bit_) | 140 if (set_interface_id_namespace_bit_) |
| 139 id |= mojo::kInterfaceIdNamespaceMask; | 141 id |= mojo::kInterfaceIdNamespaceMask; |
| 140 } while (ContainsKey(endpoints_, id)); | 142 } while (ContainsKey(endpoints_, id)); |
| 141 | 143 |
| 142 Endpoint* endpoint = new Endpoint(this, id); | 144 Endpoint* endpoint = new Endpoint(this, id); |
| 143 if (encountered_error_) | 145 if (encountered_error_) |
| 144 endpoint->set_peer_closed(); | 146 endpoint->set_peer_closed(); |
| 145 endpoints_.insert({ id, endpoint }); | 147 endpoints_.insert({ id, endpoint }); |
| 146 | 148 |
| 149 endpoint->set_handle_created(); |
| 147 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); | 150 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); |
| 148 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); | 151 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); |
| 149 } | 152 } |
| 150 | 153 |
| 151 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( | 154 mojo::ScopedInterfaceEndpointHandle CreateLocalEndpointHandle( |
| 152 mojo::InterfaceId id) override { | 155 mojo::InterfaceId id) override { |
| 153 if (!mojo::IsValidInterfaceId(id)) | 156 if (!mojo::IsValidInterfaceId(id)) |
| 154 return mojo::ScopedInterfaceEndpointHandle(); | 157 return mojo::ScopedInterfaceEndpointHandle(); |
| 155 | 158 |
| 156 base::AutoLock locker(lock_); | 159 base::AutoLock locker(lock_); |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 240 base::Bind(&ChannelAssociatedGroupController::RaiseError, this)); | 243 base::Bind(&ChannelAssociatedGroupController::RaiseError, this)); |
| 241 } | 244 } |
| 242 } | 245 } |
| 243 | 246 |
| 244 private: | 247 private: |
| 245 class Endpoint; | 248 class Endpoint; |
| 246 class ControlMessageProxyThunk; | 249 class ControlMessageProxyThunk; |
| 247 friend class Endpoint; | 250 friend class Endpoint; |
| 248 friend class ControlMessageProxyThunk; | 251 friend class ControlMessageProxyThunk; |
| 249 | 252 |
| 250 // Message objects cannot be destroyed under the controller's lock, if they | 253 // MessageWrapper objects are always destroyed under the controller's lock. On |
| 251 // contain ScopedInterfaceEndpointHandle objects. | 254 // destruction, if the message it wrappers contains |
| 252 // IncomingMessageWrapper is used to wrap messages which haven't got the | 255 // ScopedInterfaceEndpointHandles (which cannot be destructed under the |
| 253 // payload interface IDs deserialized into ScopedInterfaceEndpointHandles. | 256 // controller's lock), the wrapper unlocks to clean them up. |
| 254 // Wrapper objects are always destroyed under the controller's lock. When a | 257 class MessageWrapper { |
| 255 // wrapper is destroyed and the message hasn't been consumed, the wrapper is | |
| 256 // responsible to send endpoint closed notifications. | |
| 257 class IncomingMessageWrapper { | |
| 258 public: | 258 public: |
| 259 IncomingMessageWrapper() = default; | 259 MessageWrapper() = default; |
| 260 | 260 |
| 261 IncomingMessageWrapper(ChannelAssociatedGroupController* controller, | 261 MessageWrapper(ChannelAssociatedGroupController* controller, |
| 262 mojo::Message* message) | 262 mojo::Message message) |
| 263 : controller_(controller), value_(std::move(*message)) { | 263 : controller_(controller), value_(std::move(message)) {} |
| 264 DCHECK(value_.associated_endpoint_handles()->empty()); | |
| 265 } | |
| 266 | 264 |
| 267 IncomingMessageWrapper(IncomingMessageWrapper&& other) | 265 MessageWrapper(MessageWrapper&& other) |
| 268 : controller_(other.controller_), value_(std::move(other.value_)) {} | 266 : controller_(other.controller_), value_(std::move(other.value_)) {} |
| 269 | 267 |
| 270 ~IncomingMessageWrapper() { | 268 ~MessageWrapper() { |
| 271 if (value_.IsNull()) | 269 if (value_.associated_endpoint_handles()->empty()) |
| 272 return; | 270 return; |
| 273 | 271 |
| 274 controller_->lock_.AssertAcquired(); | 272 controller_->lock_.AssertAcquired(); |
| 275 | 273 { |
| 276 uint32_t num_ids = value_.payload_num_interface_ids(); | |
| 277 const uint32_t* ids = value_.payload_interface_ids(); | |
| 278 for (uint32_t i = 0; i < num_ids; ++i) { | |
| 279 base::AutoUnlock unlocker(controller_->lock_); | 274 base::AutoUnlock unlocker(controller_->lock_); |
| 280 controller_->control_message_proxy_.NotifyPeerEndpointClosed( | 275 value_.mutable_associated_endpoint_handles()->clear(); |
| 281 ids[i], base::nullopt); | |
| 282 } | 276 } |
| 283 } | 277 } |
| 284 | 278 |
| 285 IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) { | 279 MessageWrapper& operator=(MessageWrapper&& other) { |
| 286 controller_ = other.controller_; | 280 controller_ = other.controller_; |
| 287 value_ = std::move(other.value_); | 281 value_ = std::move(other.value_); |
| 288 return *this; | 282 return *this; |
| 289 } | 283 } |
| 290 | 284 |
| 291 // Must be called outside of the controller's lock. | 285 mojo::Message& value() { return value_; } |
| 292 bool TakeMessage(mojo::Message* output) { | |
| 293 DCHECK(!value_.IsNull()); | |
| 294 | |
| 295 *output = std::move(value_); | |
| 296 return output->DeserializeAssociatedEndpointHandles(controller_); | |
| 297 } | |
| 298 | |
| 299 const mojo::Message& value() const { return value_; } | |
| 300 | 286 |
| 301 private: | 287 private: |
| 302 ChannelAssociatedGroupController* controller_ = nullptr; | 288 ChannelAssociatedGroupController* controller_ = nullptr; |
| 303 // It must not hold any ScopedInterfaceEndpointHandle objects. | |
| 304 mojo::Message value_; | 289 mojo::Message value_; |
| 305 | 290 |
| 306 DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper); | 291 DISALLOW_COPY_AND_ASSIGN(MessageWrapper); |
| 307 }; | 292 }; |
| 308 | 293 |
| 309 class Endpoint : public base::RefCountedThreadSafe<Endpoint>, | 294 class Endpoint : public base::RefCountedThreadSafe<Endpoint>, |
| 310 public mojo::InterfaceEndpointController { | 295 public mojo::InterfaceEndpointController { |
| 311 public: | 296 public: |
| 312 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id) | 297 Endpoint(ChannelAssociatedGroupController* controller, mojo::InterfaceId id) |
| 313 : controller_(controller), id_(id) {} | 298 : controller_(controller), id_(id) {} |
| 314 | 299 |
| 315 mojo::InterfaceId id() const { return id_; } | 300 mojo::InterfaceId id() const { return id_; } |
| 316 | 301 |
| (...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 377 controller_->lock_.AssertAcquired(); | 362 controller_->lock_.AssertAcquired(); |
| 378 DCHECK(client_); | 363 DCHECK(client_); |
| 379 DCHECK(task_runner_->BelongsToCurrentThread()); | 364 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 380 DCHECK(!closed_); | 365 DCHECK(!closed_); |
| 381 | 366 |
| 382 task_runner_ = nullptr; | 367 task_runner_ = nullptr; |
| 383 client_ = nullptr; | 368 client_ = nullptr; |
| 384 sync_watcher_.reset(); | 369 sync_watcher_.reset(); |
| 385 } | 370 } |
| 386 | 371 |
| 387 uint32_t EnqueueSyncMessage(IncomingMessageWrapper message) { | 372 uint32_t EnqueueSyncMessage(MessageWrapper message) { |
| 388 controller_->lock_.AssertAcquired(); | 373 controller_->lock_.AssertAcquired(); |
| 389 uint32_t id = GenerateSyncMessageId(); | 374 uint32_t id = GenerateSyncMessageId(); |
| 390 sync_messages_.emplace(id, std::move(message)); | 375 sync_messages_.emplace(id, std::move(message)); |
| 391 SignalSyncMessageEvent(); | 376 SignalSyncMessageEvent(); |
| 392 return id; | 377 return id; |
| 393 } | 378 } |
| 394 | 379 |
| 395 void SignalSyncMessageEvent() { | 380 void SignalSyncMessageEvent() { |
| 396 controller_->lock_.AssertAcquired(); | 381 controller_->lock_.AssertAcquired(); |
| 397 EnsureSyncMessageEventExists(); | 382 EnsureSyncMessageEventExists(); |
| 398 sync_message_event_->Signal(); | 383 sync_message_event_->Signal(); |
| 399 } | 384 } |
| 400 | 385 |
| 401 IncomingMessageWrapper PopSyncMessage(uint32_t id) { | 386 MessageWrapper PopSyncMessage(uint32_t id) { |
| 402 controller_->lock_.AssertAcquired(); | 387 controller_->lock_.AssertAcquired(); |
| 403 if (sync_messages_.empty() || sync_messages_.front().first != id) | 388 if (sync_messages_.empty() || sync_messages_.front().first != id) |
| 404 return IncomingMessageWrapper(); | 389 return MessageWrapper(); |
| 405 IncomingMessageWrapper message = std::move(sync_messages_.front().second); | 390 MessageWrapper message = std::move(sync_messages_.front().second); |
| 406 sync_messages_.pop(); | 391 sync_messages_.pop(); |
| 407 return message; | 392 return message; |
| 408 } | 393 } |
| 409 | 394 |
| 410 // mojo::InterfaceEndpointController: | 395 // mojo::InterfaceEndpointController: |
| 411 bool SendMessage(mojo::Message* message) override { | 396 bool SendMessage(mojo::Message* message) override { |
| 412 DCHECK(task_runner_->BelongsToCurrentThread()); | 397 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 413 message->set_interface_id(id_); | 398 message->set_interface_id(id_); |
| 414 message->SerializeAssociatedEndpointHandles(controller_); | 399 message->SerializeAssociatedEndpointHandles(controller_); |
| 415 return controller_->SendMessage(message); | 400 return controller_->SendMessage(message); |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 450 | 435 |
| 451 scoped_refptr<Endpoint> keepalive(this); | 436 scoped_refptr<Endpoint> keepalive(this); |
| 452 scoped_refptr<AssociatedGroupController> controller_keepalive( | 437 scoped_refptr<AssociatedGroupController> controller_keepalive( |
| 453 controller_); | 438 controller_); |
| 454 | 439 |
| 455 bool reset_sync_watcher = false; | 440 bool reset_sync_watcher = false; |
| 456 { | 441 { |
| 457 base::AutoLock locker(controller_->lock_); | 442 base::AutoLock locker(controller_->lock_); |
| 458 bool more_to_process = false; | 443 bool more_to_process = false; |
| 459 if (!sync_messages_.empty()) { | 444 if (!sync_messages_.empty()) { |
| 460 IncomingMessageWrapper message_wrapper = | 445 MessageWrapper message_wrapper = |
| 461 std::move(sync_messages_.front().second); | 446 std::move(sync_messages_.front().second); |
| 462 sync_messages_.pop(); | 447 sync_messages_.pop(); |
| 463 | 448 |
| 464 bool dispatch_succeeded; | 449 bool dispatch_succeeded; |
| 465 mojo::InterfaceEndpointClient* client = client_; | 450 mojo::InterfaceEndpointClient* client = client_; |
| 466 { | 451 { |
| 467 base::AutoUnlock unlocker(controller_->lock_); | 452 base::AutoUnlock unlocker(controller_->lock_); |
| 468 mojo::Message message; | 453 dispatch_succeeded = |
| 469 dispatch_succeeded = message_wrapper.TakeMessage(&message) && | 454 client->HandleIncomingMessage(&message_wrapper.value()); |
| 470 client->HandleIncomingMessage(&message); | |
| 471 } | 455 } |
| 472 | 456 |
| 473 if (!sync_messages_.empty()) | 457 if (!sync_messages_.empty()) |
| 474 more_to_process = true; | 458 more_to_process = true; |
| 475 | 459 |
| 476 if (!dispatch_succeeded) | 460 if (!dispatch_succeeded) |
| 477 controller_->RaiseError(); | 461 controller_->RaiseError(); |
| 478 } | 462 } |
| 479 | 463 |
| 480 if (!more_to_process) | 464 if (!more_to_process) |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 528 const mojo::InterfaceId id_; | 512 const mojo::InterfaceId id_; |
| 529 | 513 |
| 530 bool closed_ = false; | 514 bool closed_ = false; |
| 531 bool peer_closed_ = false; | 515 bool peer_closed_ = false; |
| 532 bool handle_created_ = false; | 516 bool handle_created_ = false; |
| 533 base::Optional<mojo::DisconnectReason> disconnect_reason_; | 517 base::Optional<mojo::DisconnectReason> disconnect_reason_; |
| 534 mojo::InterfaceEndpointClient* client_ = nullptr; | 518 mojo::InterfaceEndpointClient* client_ = nullptr; |
| 535 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 519 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 536 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; | 520 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; |
| 537 std::unique_ptr<MojoEvent> sync_message_event_; | 521 std::unique_ptr<MojoEvent> sync_message_event_; |
| 538 std::queue<std::pair<uint32_t, IncomingMessageWrapper>> sync_messages_; | 522 std::queue<std::pair<uint32_t, MessageWrapper>> sync_messages_; |
| 539 uint32_t next_sync_message_id_ = 0; | 523 uint32_t next_sync_message_id_ = 0; |
| 540 | 524 |
| 541 DISALLOW_COPY_AND_ASSIGN(Endpoint); | 525 DISALLOW_COPY_AND_ASSIGN(Endpoint); |
| 542 }; | 526 }; |
| 543 | 527 |
| 544 class ControlMessageProxyThunk : public MessageReceiver { | 528 class ControlMessageProxyThunk : public MessageReceiver { |
| 545 public: | 529 public: |
| 546 explicit ControlMessageProxyThunk( | 530 explicit ControlMessageProxyThunk( |
| 547 ChannelAssociatedGroupController* controller) | 531 ChannelAssociatedGroupController* controller) |
| 548 : controller_(controller) {} | 532 : controller_(controller) {} |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 682 endpoint->set_peer_closed(); | 666 endpoint->set_peer_closed(); |
| 683 endpoint->SignalSyncMessageEvent(); | 667 endpoint->SignalSyncMessageEvent(); |
| 684 if (endpoint->closed() && endpoint->peer_closed()) | 668 if (endpoint->closed() && endpoint->peer_closed()) |
| 685 endpoints_.erase(endpoint->id()); | 669 endpoints_.erase(endpoint->id()); |
| 686 } | 670 } |
| 687 | 671 |
| 688 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { | 672 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { |
| 689 lock_.AssertAcquired(); | 673 lock_.AssertAcquired(); |
| 690 DCHECK(!inserted || !*inserted); | 674 DCHECK(!inserted || !*inserted); |
| 691 | 675 |
| 676 Endpoint* endpoint = FindEndpoint(id); |
| 677 if (!endpoint) { |
| 678 endpoint = new Endpoint(this, id); |
| 679 endpoints_.insert({id, endpoint}); |
| 680 if (inserted) |
| 681 *inserted = true; |
| 682 } |
| 683 return endpoint; |
| 684 } |
| 685 |
| 686 Endpoint* FindEndpoint(mojo::InterfaceId id) { |
| 687 lock_.AssertAcquired(); |
| 692 auto iter = endpoints_.find(id); | 688 auto iter = endpoints_.find(id); |
| 693 if (iter != endpoints_.end()) | 689 return iter != endpoints_.end() ? iter->second.get() : nullptr; |
| 694 return iter->second.get(); | |
| 695 | |
| 696 Endpoint* endpoint = new Endpoint(this, id); | |
| 697 endpoints_.insert({ id, endpoint }); | |
| 698 if (inserted) | |
| 699 *inserted = true; | |
| 700 return endpoint; | |
| 701 } | 690 } |
| 702 | 691 |
| 703 // mojo::MessageReceiver: | 692 // mojo::MessageReceiver: |
| 704 bool Accept(mojo::Message* message) override { | 693 bool Accept(mojo::Message* message) override { |
| 705 DCHECK(thread_checker_.CalledOnValidThread()); | 694 DCHECK(thread_checker_.CalledOnValidThread()); |
| 706 | 695 |
| 707 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) { | 696 if (!message->DeserializeAssociatedEndpointHandles(this)) |
| 708 return message->DeserializeAssociatedEndpointHandles(this) && | 697 return false; |
| 709 control_message_handler_.Accept(message); | 698 |
| 710 } | 699 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) |
| 700 return control_message_handler_.Accept(message); |
| 711 | 701 |
| 712 mojo::InterfaceId id = message->interface_id(); | 702 mojo::InterfaceId id = message->interface_id(); |
| 713 DCHECK(mojo::IsValidInterfaceId(id)); | 703 DCHECK(mojo::IsValidInterfaceId(id)); |
| 714 | 704 |
| 715 base::AutoLock locker(lock_); | 705 base::AutoLock locker(lock_); |
| 716 Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */); | 706 Endpoint* endpoint = FindEndpoint(id); |
| 717 mojo::InterfaceEndpointClient* client = | 707 if (!endpoint) |
| 718 endpoint ? endpoint->client() : nullptr; | 708 return true; |
| 709 |
| 710 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 719 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { | 711 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { |
| 720 // No client has been bound yet or the client runs tasks on another | 712 // No client has been bound yet or the client runs tasks on another |
| 721 // thread. We assume the other thread must always be the one on which | 713 // thread. We assume the other thread must always be the one on which |
| 722 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 714 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
| 723 // | 715 // |
| 724 // If the client is not yet bound, it must be bound by the time this task | 716 // If the client is not yet bound, it must be bound by the time this task |
| 725 // runs or else it's programmer error. | 717 // runs or else it's programmer error. |
| 726 DCHECK(proxy_task_runner_); | 718 DCHECK(proxy_task_runner_); |
| 727 | 719 |
| 728 if (message->has_flag(mojo::Message::kFlagIsSync)) { | 720 if (message->has_flag(mojo::Message::kFlagIsSync)) { |
| 729 IncomingMessageWrapper message_wrapper(this, message); | 721 MessageWrapper message_wrapper(this, std::move(*message)); |
| 730 // Sync messages may need to be handled by the endpoint if it's blocking | 722 // Sync messages may need to be handled by the endpoint if it's blocking |
| 731 // on a sync reply. We pass ownership of the message to the endpoint's | 723 // on a sync reply. We pass ownership of the message to the endpoint's |
| 732 // sync message queue. If the endpoint was blocking, it will dequeue the | 724 // sync message queue. If the endpoint was blocking, it will dequeue the |
| 733 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| | 725 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| |
| 734 // call will dequeue the message and dispatch it. | 726 // call will dequeue the message and dispatch it. |
| 735 uint32_t message_id = | 727 uint32_t message_id = |
| 736 endpoint->EnqueueSyncMessage(std::move(message_wrapper)); | 728 endpoint->EnqueueSyncMessage(std::move(message_wrapper)); |
| 737 proxy_task_runner_->PostTask( | 729 proxy_task_runner_->PostTask( |
| 738 FROM_HERE, | 730 FROM_HERE, |
| 739 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, | 731 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, |
| 740 this, id, message_id)); | 732 this, id, message_id)); |
| 741 return true; | 733 return true; |
| 742 } | 734 } |
| 743 | 735 |
| 744 proxy_task_runner_->PostTask( | 736 proxy_task_runner_->PostTask( |
| 745 FROM_HERE, | 737 FROM_HERE, |
| 746 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, | 738 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, |
| 747 this, base::Passed(message))); | 739 this, base::Passed(message))); |
| 748 return true; | 740 return true; |
| 749 } | 741 } |
| 750 | 742 |
| 751 // We do not expect to receive sync responses on the master endpoint thread. | 743 // We do not expect to receive sync responses on the master endpoint thread. |
| 752 // If it's happening, it's a bug. | 744 // If it's happening, it's a bug. |
| 753 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || | 745 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || |
| 754 !message->has_flag(mojo::Message::kFlagIsResponse)); | 746 !message->has_flag(mojo::Message::kFlagIsResponse)); |
| 755 | 747 |
| 756 base::AutoUnlock unlocker(lock_); | 748 base::AutoUnlock unlocker(lock_); |
| 757 return message->DeserializeAssociatedEndpointHandles(this) && | 749 return client->HandleIncomingMessage(message); |
| 758 client->HandleIncomingMessage(message); | |
| 759 } | 750 } |
| 760 | 751 |
| 761 void AcceptOnProxyThread(mojo::Message message) { | 752 void AcceptOnProxyThread(mojo::Message message) { |
| 762 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 753 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 763 | 754 |
| 764 mojo::InterfaceId id = message.interface_id(); | 755 mojo::InterfaceId id = message.interface_id(); |
| 765 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); | 756 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
| 766 | 757 |
| 767 base::AutoLock locker(lock_); | 758 base::AutoLock locker(lock_); |
| 768 IncomingMessageWrapper message_wrapper(this, &message); | 759 Endpoint* endpoint = FindEndpoint(id); |
| 769 | |
| 770 Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */); | |
| 771 if (!endpoint) | 760 if (!endpoint) |
| 772 return; | 761 return; |
| 773 | 762 |
| 774 mojo::InterfaceEndpointClient* client = endpoint->client(); | 763 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 775 if (!client) | 764 if (!client) |
| 776 return; | 765 return; |
| 777 | 766 |
| 778 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 767 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 779 | 768 |
| 780 // Sync messages should never make their way to this method. | 769 // Sync messages should never make their way to this method. |
| 781 DCHECK(!message_wrapper.value().has_flag(mojo::Message::kFlagIsSync)); | 770 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); |
| 782 | 771 |
| 783 bool result = false; | 772 bool result = false; |
| 784 { | 773 { |
| 785 base::AutoUnlock unlocker(lock_); | 774 base::AutoUnlock unlocker(lock_); |
| 786 mojo::Message message; | 775 result = client->HandleIncomingMessage(&message); |
| 787 result = message_wrapper.TakeMessage(&message) && | |
| 788 client->HandleIncomingMessage(&message); | |
| 789 } | 776 } |
| 790 | 777 |
| 791 if (!result) | 778 if (!result) |
| 792 RaiseError(); | 779 RaiseError(); |
| 793 } | 780 } |
| 794 | 781 |
| 795 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { | 782 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
| 796 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 783 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 797 | 784 |
| 798 base::AutoLock locker(lock_); | 785 base::AutoLock locker(lock_); |
| 799 Endpoint* endpoint = | 786 Endpoint* endpoint = FindEndpoint(interface_id); |
| 800 GetEndpointForDispatch(interface_id, false /* create */); | |
| 801 if (!endpoint) | 787 if (!endpoint) |
| 802 return; | 788 return; |
| 803 | 789 |
| 804 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 790 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 805 IncomingMessageWrapper message_wrapper = | 791 MessageWrapper message_wrapper = endpoint->PopSyncMessage(message_id); |
| 806 endpoint->PopSyncMessage(message_id); | |
| 807 | 792 |
| 808 // The message must have already been dequeued by the endpoint waking up | 793 // The message must have already been dequeued by the endpoint waking up |
| 809 // from a sync wait. Nothing to do. | 794 // from a sync wait. Nothing to do. |
| 810 if (message_wrapper.value().IsNull()) | 795 if (message_wrapper.value().IsNull()) |
| 811 return; | 796 return; |
| 812 | 797 |
| 813 mojo::InterfaceEndpointClient* client = endpoint->client(); | 798 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 814 if (!client) | 799 if (!client) |
| 815 return; | 800 return; |
| 816 | 801 |
| 817 bool result = false; | 802 bool result = false; |
| 818 { | 803 { |
| 819 base::AutoUnlock unlocker(lock_); | 804 base::AutoUnlock unlocker(lock_); |
| 820 mojo::Message message; | 805 result = client->HandleIncomingMessage(&message_wrapper.value()); |
| 821 result = message_wrapper.TakeMessage(&message) && | |
| 822 client->HandleIncomingMessage(&message); | |
| 823 } | 806 } |
| 824 | 807 |
| 825 if (!result) | 808 if (!result) |
| 826 RaiseError(); | 809 RaiseError(); |
| 827 } | 810 } |
| 828 | 811 |
| 829 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id, bool create) { | |
| 830 lock_.AssertAcquired(); | |
| 831 auto iter = endpoints_.find(id); | |
| 832 if (iter != endpoints_.end()) | |
| 833 return iter->second.get(); | |
| 834 if (!create) | |
| 835 return nullptr; | |
| 836 bool inserted = false; | |
| 837 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | |
| 838 DCHECK(inserted); | |
| 839 return endpoint; | |
| 840 } | |
| 841 | |
| 842 // mojo::PipeControlMessageHandlerDelegate: | 812 // mojo::PipeControlMessageHandlerDelegate: |
| 843 bool OnPeerAssociatedEndpointClosed( | 813 bool OnPeerAssociatedEndpointClosed( |
| 844 mojo::InterfaceId id, | 814 mojo::InterfaceId id, |
| 845 const base::Optional<mojo::DisconnectReason>& reason) override { | 815 const base::Optional<mojo::DisconnectReason>& reason) override { |
| 846 DCHECK(thread_checker_.CalledOnValidThread()); | 816 DCHECK(thread_checker_.CalledOnValidThread()); |
| 847 | 817 |
| 848 DCHECK(!mojo::IsMasterInterfaceId(id) || reason); | 818 DCHECK(!mojo::IsMasterInterfaceId(id) || reason); |
| 849 | 819 |
| 850 scoped_refptr<ChannelAssociatedGroupController> keepalive(this); | 820 scoped_refptr<ChannelAssociatedGroupController> keepalive(this); |
| 851 base::AutoLock locker(lock_); | 821 base::AutoLock locker(lock_); |
| (...skipping 120 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 972 Channel::Mode mode, | 942 Channel::Mode mode, |
| 973 Delegate* delegate, | 943 Delegate* delegate, |
| 974 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 944 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
| 975 return base::MakeUnique<MojoBootstrapImpl>( | 945 return base::MakeUnique<MojoBootstrapImpl>( |
| 976 std::move(handle), delegate, | 946 std::move(handle), delegate, |
| 977 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, | 947 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, |
| 978 ipc_task_runner)); | 948 ipc_task_runner)); |
| 979 } | 949 } |
| 980 | 950 |
| 981 } // namespace IPC | 951 } // namespace IPC |
| OLD | NEW |