| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 256 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 267 // --------------------------------------------------------------------------- | 267 // --------------------------------------------------------------------------- |
| 268 // The following members are only valid while a client is attached. They are | 268 // The following members are only valid while a client is attached. They are |
| 269 // used exclusively on the client's thread. They may be accessed outside of | 269 // used exclusively on the client's thread. They may be accessed outside of |
| 270 // the router's lock. | 270 // the router's lock. |
| 271 | 271 |
| 272 std::unique_ptr<SyncHandleWatcher> sync_watcher_; | 272 std::unique_ptr<SyncHandleWatcher> sync_watcher_; |
| 273 | 273 |
| 274 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | 274 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
| 275 }; | 275 }; |
| 276 | 276 |
| 277 // Message objects cannot be destroyed under the router's lock, if they contain | 277 // MessageWrapper objects are always destroyed under the router's lock. On |
| 278 // ScopedInterfaceEndpointHandle objects. | 278 // destruction, if the message it wrappers contains |
| 279 // IncomingMessageWrapper is used to wrap messages which haven't got the payload | 279 // ScopedInterfaceEndpointHandles (which cannot be destructed under the |
| 280 // interface IDs deserialized into ScopedInterfaceEndpointHandles. Wrapper | 280 // router's lock), the wrapper unlocks to clean them up. |
| 281 // objects are always destroyed under the router's lock. When a wrapper is | 281 class MultiplexRouter::MessageWrapper { |
| 282 // destroyed and the message hasn't been consumed, the wrapper is responsible | |
| 283 // to send endpoint closed notifications. | |
| 284 class MultiplexRouter::IncomingMessageWrapper { | |
| 285 public: | 282 public: |
| 286 IncomingMessageWrapper() = default; | 283 MessageWrapper() = default; |
| 287 | 284 |
| 288 IncomingMessageWrapper(MultiplexRouter* router, Message* message) | 285 MessageWrapper(MultiplexRouter* router, Message message) |
| 289 : router_(router), value_(std::move(*message)) { | 286 : router_(router), value_(std::move(message)) {} |
| 290 DCHECK(value_.associated_endpoint_handles()->empty()); | |
| 291 } | |
| 292 | 287 |
| 293 IncomingMessageWrapper(IncomingMessageWrapper&& other) | 288 MessageWrapper(MessageWrapper&& other) |
| 294 : router_(other.router_), value_(std::move(other.value_)) {} | 289 : router_(other.router_), value_(std::move(other.value_)) {} |
| 295 | 290 |
| 296 ~IncomingMessageWrapper() { | 291 ~MessageWrapper() { |
| 297 if (value_.IsNull()) | 292 if (value_.associated_endpoint_handles()->empty()) |
| 298 return; | 293 return; |
| 299 | 294 |
| 300 router_->AssertLockAcquired(); | 295 router_->AssertLockAcquired(); |
| 301 | 296 { |
| 302 uint32_t num_ids = value_.payload_num_interface_ids(); | |
| 303 const uint32_t* ids = value_.payload_interface_ids(); | |
| 304 for (uint32_t i = 0; i < num_ids; ++i) { | |
| 305 MayAutoUnlock unlocker(router_->lock_.get()); | 297 MayAutoUnlock unlocker(router_->lock_.get()); |
| 306 router_->control_message_proxy_.NotifyPeerEndpointClosed(ids[i], | 298 value_.mutable_associated_endpoint_handles()->clear(); |
| 307 base::nullopt); | |
| 308 } | 299 } |
| 309 } | 300 } |
| 310 | 301 |
| 311 IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) { | 302 MessageWrapper& operator=(MessageWrapper&& other) { |
| 312 router_ = other.router_; | 303 router_ = other.router_; |
| 313 value_ = std::move(other.value_); | 304 value_ = std::move(other.value_); |
| 314 return *this; | 305 return *this; |
| 315 } | 306 } |
| 316 | 307 |
| 317 // Must be called outside of the router's lock. | 308 Message& value() { return value_; } |
| 318 bool TakeMessage(Message* output) { | |
| 319 DCHECK(!value_.IsNull()); | |
| 320 | |
| 321 *output = std::move(value_); | |
| 322 return output->DeserializeAssociatedEndpointHandles(router_); | |
| 323 } | |
| 324 | |
| 325 const Message& value() const { return value_; } | |
| 326 | 309 |
| 327 private: | 310 private: |
| 328 MultiplexRouter* router_ = nullptr; | 311 MultiplexRouter* router_ = nullptr; |
| 329 // It must not hold any ScopedInterfaceEndpointHandle objects. | |
| 330 Message value_; | 312 Message value_; |
| 331 | 313 |
| 332 DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper); | 314 DISALLOW_COPY_AND_ASSIGN(MessageWrapper); |
| 333 }; | 315 }; |
| 334 | 316 |
| 335 struct MultiplexRouter::Task { | 317 struct MultiplexRouter::Task { |
| 336 public: | 318 public: |
| 337 // Doesn't take ownership of |message| but takes its contents. | 319 // Doesn't take ownership of |message| but takes its contents. |
| 338 static std::unique_ptr<Task> CreateMessageTask( | 320 static std::unique_ptr<Task> CreateMessageTask( |
| 339 IncomingMessageWrapper message_wrapper) { | 321 MessageWrapper message_wrapper) { |
| 340 Task* task = new Task(MESSAGE); | 322 Task* task = new Task(MESSAGE); |
| 341 task->message_wrapper = std::move(message_wrapper); | 323 task->message_wrapper = std::move(message_wrapper); |
| 342 return base::WrapUnique(task); | 324 return base::WrapUnique(task); |
| 343 } | 325 } |
| 344 static std::unique_ptr<Task> CreateNotifyErrorTask( | 326 static std::unique_ptr<Task> CreateNotifyErrorTask( |
| 345 InterfaceEndpoint* endpoint) { | 327 InterfaceEndpoint* endpoint) { |
| 346 Task* task = new Task(NOTIFY_ERROR); | 328 Task* task = new Task(NOTIFY_ERROR); |
| 347 task->endpoint_to_notify = endpoint; | 329 task->endpoint_to_notify = endpoint; |
| 348 return base::WrapUnique(task); | 330 return base::WrapUnique(task); |
| 349 } | 331 } |
| 350 | 332 |
| 351 ~Task() {} | 333 ~Task() {} |
| 352 | 334 |
| 353 bool IsMessageTask() const { return type == MESSAGE; } | 335 bool IsMessageTask() const { return type == MESSAGE; } |
| 354 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } | 336 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
| 355 | 337 |
| 356 IncomingMessageWrapper message_wrapper; | 338 MessageWrapper message_wrapper; |
| 357 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 339 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| 358 | 340 |
| 359 enum Type { MESSAGE, NOTIFY_ERROR }; | 341 enum Type { MESSAGE, NOTIFY_ERROR }; |
| 360 Type type; | 342 Type type; |
| 361 | 343 |
| 362 private: | 344 private: |
| 363 explicit Task(Type in_type) : type(in_type) {} | 345 explicit Task(Type in_type) : type(in_type) {} |
| 364 | 346 |
| 365 DISALLOW_COPY_AND_ASSIGN(Task); | 347 DISALLOW_COPY_AND_ASSIGN(Task); |
| 366 }; | 348 }; |
| (...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 454 id = next_interface_id_value_++; | 436 id = next_interface_id_value_++; |
| 455 if (set_interface_id_namespace_bit_) | 437 if (set_interface_id_namespace_bit_) |
| 456 id |= kInterfaceIdNamespaceMask; | 438 id |= kInterfaceIdNamespaceMask; |
| 457 } while (base::ContainsKey(endpoints_, id)); | 439 } while (base::ContainsKey(endpoints_, id)); |
| 458 | 440 |
| 459 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); | 441 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); |
| 460 endpoints_[id] = endpoint; | 442 endpoints_[id] = endpoint; |
| 461 if (encountered_error_) | 443 if (encountered_error_) |
| 462 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 444 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 463 | 445 |
| 446 endpoint->set_handle_created(); |
| 464 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); | 447 *local_endpoint = CreateScopedInterfaceEndpointHandle(id, true); |
| 465 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); | 448 *remote_endpoint = CreateScopedInterfaceEndpointHandle(id, false); |
| 466 } | 449 } |
| 467 | 450 |
| 468 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | 451 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| 469 InterfaceId id) { | 452 InterfaceId id) { |
| 470 if (!IsValidInterfaceId(id)) | 453 if (!IsValidInterfaceId(id)) |
| 471 return ScopedInterfaceEndpointHandle(); | 454 return ScopedInterfaceEndpointHandle(); |
| 472 | 455 |
| 473 MayAutoLock locker(lock_.get()); | 456 MayAutoLock locker(lock_.get()); |
| (...skipping 148 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 622 DCHECK(thread_checker_.CalledOnValidThread()); | 605 DCHECK(thread_checker_.CalledOnValidThread()); |
| 623 MayAutoLock locker(lock_.get()); | 606 MayAutoLock locker(lock_.get()); |
| 624 | 607 |
| 625 testing_mode_ = true; | 608 testing_mode_ = true; |
| 626 connector_.set_enforce_errors_from_incoming_receiver(false); | 609 connector_.set_enforce_errors_from_incoming_receiver(false); |
| 627 } | 610 } |
| 628 | 611 |
| 629 bool MultiplexRouter::Accept(Message* message) { | 612 bool MultiplexRouter::Accept(Message* message) { |
| 630 DCHECK(thread_checker_.CalledOnValidThread()); | 613 DCHECK(thread_checker_.CalledOnValidThread()); |
| 631 | 614 |
| 615 if (!message->DeserializeAssociatedEndpointHandles(this)) |
| 616 return false; |
| 617 |
| 632 scoped_refptr<MultiplexRouter> protector(this); | 618 scoped_refptr<MultiplexRouter> protector(this); |
| 633 MayAutoLock locker(lock_.get()); | 619 MayAutoLock locker(lock_.get()); |
| 634 | 620 |
| 635 DCHECK(!paused_); | 621 DCHECK(!paused_); |
| 636 | 622 |
| 637 IncomingMessageWrapper message_wrapper(this, message); | |
| 638 | |
| 639 ClientCallBehavior client_call_behavior = | 623 ClientCallBehavior client_call_behavior = |
| 640 connector_.during_sync_handle_watcher_callback() | 624 connector_.during_sync_handle_watcher_callback() |
| 641 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 625 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 642 : ALLOW_DIRECT_CLIENT_CALLS; | 626 : ALLOW_DIRECT_CLIENT_CALLS; |
| 643 | 627 |
| 644 bool processed = tasks_.empty() && ProcessIncomingMessage( | 628 bool processed = |
| 645 &message_wrapper, client_call_behavior, | 629 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, |
| 646 connector_.task_runner()); | 630 connector_.task_runner()); |
| 647 | 631 |
| 648 if (!processed) { | 632 if (!processed) { |
| 649 // Either the task queue is not empty or we cannot process the message | 633 // Either the task queue is not empty or we cannot process the message |
| 650 // directly. In both cases, there is no need to call ProcessTasks(). | 634 // directly. In both cases, there is no need to call ProcessTasks(). |
| 651 tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper))); | 635 tasks_.push_back( |
| 636 Task::CreateMessageTask(MessageWrapper(this, std::move(*message)))); |
| 652 Task* task = tasks_.back().get(); | 637 Task* task = tasks_.back().get(); |
| 653 | 638 |
| 654 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) { | 639 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) { |
| 655 InterfaceId id = task->message_wrapper.value().interface_id(); | 640 InterfaceId id = task->message_wrapper.value().interface_id(); |
| 656 sync_message_tasks_[id].push_back(task); | 641 sync_message_tasks_[id].push_back(task); |
| 657 auto iter = endpoints_.find(id); | 642 InterfaceEndpoint* endpoint = FindEndpoint(id); |
| 658 if (iter != endpoints_.end()) | 643 if (endpoint) |
| 659 iter->second->SignalSyncMessageEvent(); | 644 endpoint->SignalSyncMessageEvent(); |
| 660 } | 645 } |
| 661 } else if (!tasks_.empty()) { | 646 } else if (!tasks_.empty()) { |
| 662 // Processing the message may result in new tasks (for error notification) | 647 // Processing the message may result in new tasks (for error notification) |
| 663 // being added to the queue. In this case, we have to attempt to process the | 648 // being added to the queue. In this case, we have to attempt to process the |
| 664 // tasks. | 649 // tasks. |
| 665 ProcessTasks(client_call_behavior, connector_.task_runner()); | 650 ProcessTasks(client_call_behavior, connector_.task_runner()); |
| 666 } | 651 } |
| 667 | 652 |
| 668 // Always return true. If we see errors during message processing, we will | 653 // Always return true. If we see errors during message processing, we will |
| 669 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 654 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 760 id = task->message_wrapper.value().interface_id(); | 745 id = task->message_wrapper.value().interface_id(); |
| 761 auto& sync_message_queue = sync_message_tasks_[id]; | 746 auto& sync_message_queue = sync_message_tasks_[id]; |
| 762 DCHECK_EQ(task.get(), sync_message_queue.front()); | 747 DCHECK_EQ(task.get(), sync_message_queue.front()); |
| 763 sync_message_queue.pop_front(); | 748 sync_message_queue.pop_front(); |
| 764 } | 749 } |
| 765 | 750 |
| 766 bool processed = | 751 bool processed = |
| 767 task->IsNotifyErrorTask() | 752 task->IsNotifyErrorTask() |
| 768 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, | 753 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, |
| 769 current_task_runner) | 754 current_task_runner) |
| 770 : ProcessIncomingMessage(&task->message_wrapper, | 755 : ProcessIncomingMessage(&task->message_wrapper.value(), |
| 771 client_call_behavior, current_task_runner); | 756 client_call_behavior, current_task_runner); |
| 772 | 757 |
| 773 if (!processed) { | 758 if (!processed) { |
| 774 if (sync_message) { | 759 if (sync_message) { |
| 775 auto& sync_message_queue = sync_message_tasks_[id]; | 760 auto& sync_message_queue = sync_message_tasks_[id]; |
| 776 sync_message_queue.push_front(task.get()); | 761 sync_message_queue.push_front(task.get()); |
| 777 } | 762 } |
| 778 tasks_.push_front(std::move(task)); | 763 tasks_.push_front(std::move(task)); |
| 779 break; | 764 break; |
| 780 } else { | 765 } else { |
| (...skipping 13 matching lines...) Expand all Loading... |
| 794 if (iter == sync_message_tasks_.end()) | 779 if (iter == sync_message_tasks_.end()) |
| 795 return false; | 780 return false; |
| 796 | 781 |
| 797 if (paused_) | 782 if (paused_) |
| 798 return true; | 783 return true; |
| 799 | 784 |
| 800 MultiplexRouter::Task* task = iter->second.front(); | 785 MultiplexRouter::Task* task = iter->second.front(); |
| 801 iter->second.pop_front(); | 786 iter->second.pop_front(); |
| 802 | 787 |
| 803 DCHECK(task->IsMessageTask()); | 788 DCHECK(task->IsMessageTask()); |
| 804 IncomingMessageWrapper message_wrapper = std::move(task->message_wrapper); | 789 MessageWrapper message_wrapper = std::move(task->message_wrapper); |
| 805 | 790 |
| 806 // Note: after this call, |task| and |iter| may be invalidated. | 791 // Note: after this call, |task| and |iter| may be invalidated. |
| 807 bool processed = ProcessIncomingMessage( | 792 bool processed = ProcessIncomingMessage( |
| 808 &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); | 793 &message_wrapper.value(), ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, |
| 794 nullptr); |
| 809 DCHECK(processed); | 795 DCHECK(processed); |
| 810 | 796 |
| 811 iter = sync_message_tasks_.find(id); | 797 iter = sync_message_tasks_.find(id); |
| 812 if (iter == sync_message_tasks_.end()) | 798 if (iter == sync_message_tasks_.end()) |
| 813 return false; | 799 return false; |
| 814 | 800 |
| 815 if (iter->second.empty()) { | 801 if (iter->second.empty()) { |
| 816 sync_message_tasks_.erase(iter); | 802 sync_message_tasks_.erase(iter); |
| 817 return false; | 803 return false; |
| 818 } | 804 } |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 850 // | 836 // |
| 851 // It is safe to call into |client| without the lock. Because |client| is | 837 // It is safe to call into |client| without the lock. Because |client| is |
| 852 // always accessed on the same thread, including DetachEndpointClient(). | 838 // always accessed on the same thread, including DetachEndpointClient(). |
| 853 MayAutoUnlock unlocker(lock_.get()); | 839 MayAutoUnlock unlocker(lock_.get()); |
| 854 client->NotifyError(disconnect_reason); | 840 client->NotifyError(disconnect_reason); |
| 855 } | 841 } |
| 856 return true; | 842 return true; |
| 857 } | 843 } |
| 858 | 844 |
| 859 bool MultiplexRouter::ProcessIncomingMessage( | 845 bool MultiplexRouter::ProcessIncomingMessage( |
| 860 IncomingMessageWrapper* message_wrapper, | 846 Message* message, |
| 861 ClientCallBehavior client_call_behavior, | 847 ClientCallBehavior client_call_behavior, |
| 862 base::SingleThreadTaskRunner* current_task_runner) { | 848 base::SingleThreadTaskRunner* current_task_runner) { |
| 863 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 849 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 864 DCHECK(!paused_); | 850 DCHECK(!paused_); |
| 865 DCHECK(message_wrapper); | 851 DCHECK(message); |
| 866 AssertLockAcquired(); | 852 AssertLockAcquired(); |
| 867 | 853 |
| 868 if (message_wrapper->value().IsNull()) { | 854 if (message->IsNull()) { |
| 869 // This is a sync message and has been processed during sync handle | 855 // This is a sync message and has been processed during sync handle |
| 870 // watching. | 856 // watching. |
| 871 return true; | 857 return true; |
| 872 } | 858 } |
| 873 | 859 |
| 874 if (PipeControlMessageHandler::IsPipeControlMessage( | 860 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { |
| 875 &message_wrapper->value())) { | |
| 876 bool result = false; | 861 bool result = false; |
| 877 | 862 |
| 878 { | 863 { |
| 879 MayAutoUnlock unlocker(lock_.get()); | 864 MayAutoUnlock unlocker(lock_.get()); |
| 880 Message message; | 865 result = control_message_handler_.Accept(message); |
| 881 result = message_wrapper->TakeMessage(&message) && | |
| 882 control_message_handler_.Accept(&message); | |
| 883 } | 866 } |
| 884 | 867 |
| 885 if (!result) | 868 if (!result) |
| 886 RaiseErrorInNonTestingMode(); | 869 RaiseErrorInNonTestingMode(); |
| 887 | 870 |
| 888 return true; | 871 return true; |
| 889 } | 872 } |
| 890 | 873 |
| 891 InterfaceId id = message_wrapper->value().interface_id(); | 874 InterfaceId id = message->interface_id(); |
| 892 DCHECK(IsValidInterfaceId(id)); | 875 DCHECK(IsValidInterfaceId(id)); |
| 893 | 876 |
| 894 bool inserted = false; | 877 InterfaceEndpoint* endpoint = FindEndpoint(id); |
| 895 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 878 if (!endpoint || endpoint->closed()) |
| 896 if (inserted) { | |
| 897 // Currently, it is legitimate to receive messages for an endpoint | |
| 898 // that is not registered. For example, the endpoint is transferred in | |
| 899 // a message that is discarded. Once we add support to specify all | |
| 900 // enclosing endpoints in message header, we should be able to remove | |
| 901 // this. | |
| 902 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | |
| 903 | |
| 904 // It is also possible that this newly-inserted endpoint is the master | |
| 905 // endpoint. When the master InterfacePtr/Binding goes away, the message | |
| 906 // pipe is closed and we explicitly trigger a pipe connection error. The | |
| 907 // error updates all the endpoints, including the master endpoint, with | |
| 908 // PEER_ENDPOINT_CLOSED and removes the master endpoint from the | |
| 909 // registration. We continue to process remaining tasks in the queue, as | |
| 910 // long as there are refs keeping the router alive. If there are remaining | |
| 911 // messages for the master endpoint, we will get here. | |
| 912 MayAutoUnlock unlocker(lock_.get()); | |
| 913 if (!IsMasterInterfaceId(id)) | |
| 914 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); | |
| 915 return true; | |
| 916 } | |
| 917 | |
| 918 if (endpoint->closed()) | |
| 919 return true; | 879 return true; |
| 920 | 880 |
| 921 if (!endpoint->client()) { | 881 if (!endpoint->client()) { |
| 922 // We need to wait until a client is attached in order to dispatch further | 882 // We need to wait until a client is attached in order to dispatch further |
| 923 // messages. | 883 // messages. |
| 924 return false; | 884 return false; |
| 925 } | 885 } |
| 926 | 886 |
| 927 bool can_direct_call; | 887 bool can_direct_call; |
| 928 if (message_wrapper->value().has_flag(Message::kFlagIsSync)) { | 888 if (message->has_flag(Message::kFlagIsSync)) { |
| 929 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && | 889 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && |
| 930 endpoint->task_runner()->BelongsToCurrentThread(); | 890 endpoint->task_runner()->BelongsToCurrentThread(); |
| 931 } else { | 891 } else { |
| 932 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && | 892 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && |
| 933 endpoint->task_runner() == current_task_runner; | 893 endpoint->task_runner() == current_task_runner; |
| 934 } | 894 } |
| 935 | 895 |
| 936 if (!can_direct_call) { | 896 if (!can_direct_call) { |
| 937 MaybePostToProcessTasks(endpoint->task_runner()); | 897 MaybePostToProcessTasks(endpoint->task_runner()); |
| 938 return false; | 898 return false; |
| 939 } | 899 } |
| 940 | 900 |
| 941 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 901 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 942 | 902 |
| 943 InterfaceEndpointClient* client = endpoint->client(); | 903 InterfaceEndpointClient* client = endpoint->client(); |
| 944 bool result = false; | 904 bool result = false; |
| 945 { | 905 { |
| 946 // We must unlock before calling into |client| because it may call this | 906 // We must unlock before calling into |client| because it may call this |
| 947 // object within HandleIncomingMessage(). Holding the lock will lead to | 907 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 948 // deadlock. | 908 // deadlock. |
| 949 // | 909 // |
| 950 // It is safe to call into |client| without the lock. Because |client| is | 910 // It is safe to call into |client| without the lock. Because |client| is |
| 951 // always accessed on the same thread, including DetachEndpointClient(). | 911 // always accessed on the same thread, including DetachEndpointClient(). |
| 952 MayAutoUnlock unlocker(lock_.get()); | 912 MayAutoUnlock unlocker(lock_.get()); |
| 953 Message message; | 913 result = client->HandleIncomingMessage(message); |
| 954 result = message_wrapper->TakeMessage(&message) && | |
| 955 client->HandleIncomingMessage(&message); | |
| 956 } | 914 } |
| 957 if (!result) | 915 if (!result) |
| 958 RaiseErrorInNonTestingMode(); | 916 RaiseErrorInNonTestingMode(); |
| 959 | 917 |
| 960 return true; | 918 return true; |
| 961 } | 919 } |
| 962 | 920 |
| 963 void MultiplexRouter::MaybePostToProcessTasks( | 921 void MultiplexRouter::MaybePostToProcessTasks( |
| 964 base::SingleThreadTaskRunner* task_runner) { | 922 base::SingleThreadTaskRunner* task_runner) { |
| 965 AssertLockAcquired(); | 923 AssertLockAcquired(); |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1007 } | 965 } |
| 1008 | 966 |
| 1009 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( | 967 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( |
| 1010 InterfaceId id, | 968 InterfaceId id, |
| 1011 bool* inserted) { | 969 bool* inserted) { |
| 1012 AssertLockAcquired(); | 970 AssertLockAcquired(); |
| 1013 // Either |inserted| is nullptr or it points to a boolean initialized as | 971 // Either |inserted| is nullptr or it points to a boolean initialized as |
| 1014 // false. | 972 // false. |
| 1015 DCHECK(!inserted || !*inserted); | 973 DCHECK(!inserted || !*inserted); |
| 1016 | 974 |
| 1017 auto iter = endpoints_.find(id); | 975 InterfaceEndpoint* endpoint = FindEndpoint(id); |
| 1018 InterfaceEndpoint* endpoint; | 976 if (!endpoint) { |
| 1019 if (iter == endpoints_.end()) { | |
| 1020 endpoint = new InterfaceEndpoint(this, id); | 977 endpoint = new InterfaceEndpoint(this, id); |
| 1021 endpoints_[id] = endpoint; | 978 endpoints_[id] = endpoint; |
| 1022 if (inserted) | 979 if (inserted) |
| 1023 *inserted = true; | 980 *inserted = true; |
| 1024 } else { | |
| 1025 endpoint = iter->second.get(); | |
| 1026 } | 981 } |
| 1027 | 982 |
| 1028 return endpoint; | 983 return endpoint; |
| 1029 } | 984 } |
| 1030 | 985 |
| 986 MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint( |
| 987 InterfaceId id) { |
| 988 AssertLockAcquired(); |
| 989 auto iter = endpoints_.find(id); |
| 990 return iter != endpoints_.end() ? iter->second.get() : nullptr; |
| 991 } |
| 992 |
| 1031 void MultiplexRouter::AssertLockAcquired() { | 993 void MultiplexRouter::AssertLockAcquired() { |
| 1032 #if DCHECK_IS_ON() | 994 #if DCHECK_IS_ON() |
| 1033 if (lock_) | 995 if (lock_) |
| 1034 lock_->AssertAcquired(); | 996 lock_->AssertAcquired(); |
| 1035 #endif | 997 #endif |
| 1036 } | 998 } |
| 1037 | 999 |
| 1038 } // namespace internal | 1000 } // namespace internal |
| 1039 } // namespace mojo | 1001 } // namespace mojo |
| OLD | NEW |