| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" | 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <utility> | 9 #include <utility> |
| 10 | 10 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 30 // this object. | 30 // this object. |
| 31 class MultiplexRouter::InterfaceEndpoint | 31 class MultiplexRouter::InterfaceEndpoint |
| 32 : public base::RefCounted<InterfaceEndpoint>, | 32 : public base::RefCounted<InterfaceEndpoint>, |
| 33 public InterfaceEndpointController { | 33 public InterfaceEndpointController { |
| 34 public: | 34 public: |
| 35 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) | 35 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) |
| 36 : router_(router), | 36 : router_(router), |
| 37 id_(id), | 37 id_(id), |
| 38 closed_(false), | 38 closed_(false), |
| 39 peer_closed_(false), | 39 peer_closed_(false), |
| 40 handle_created_(false), |
| 40 client_(nullptr), | 41 client_(nullptr), |
| 41 event_signalled_(false) {} | 42 event_signalled_(false) {} |
| 42 | 43 |
| 43 // --------------------------------------------------------------------------- | 44 // --------------------------------------------------------------------------- |
| 44 // The following public methods are safe to call from any threads without | 45 // The following public methods are safe to call from any threads without |
| 45 // locking. | 46 // locking. |
| 46 | 47 |
| 47 InterfaceId id() const { return id_; } | 48 InterfaceId id() const { return id_; } |
| 48 | 49 |
| 49 // --------------------------------------------------------------------------- | 50 // --------------------------------------------------------------------------- |
| 50 // The following public methods are called under the router's lock. | 51 // The following public methods are called under the router's lock. |
| 51 | 52 |
| 52 bool closed() const { return closed_; } | 53 bool closed() const { return closed_; } |
| 53 void set_closed() { | 54 void set_closed() { |
| 54 router_->AssertLockAcquired(); | 55 router_->AssertLockAcquired(); |
| 55 closed_ = true; | 56 closed_ = true; |
| 56 } | 57 } |
| 57 | 58 |
| 58 bool peer_closed() const { return peer_closed_; } | 59 bool peer_closed() const { return peer_closed_; } |
| 59 void set_peer_closed() { | 60 void set_peer_closed() { |
| 60 router_->AssertLockAcquired(); | 61 router_->AssertLockAcquired(); |
| 61 peer_closed_ = true; | 62 peer_closed_ = true; |
| 62 } | 63 } |
| 63 | 64 |
| 65 bool handle_created() const { return handle_created_; } |
| 66 void set_handle_created() { |
| 67 router_->AssertLockAcquired(); |
| 68 handle_created_ = true; |
| 69 } |
| 70 |
| 64 const base::Optional<DisconnectReason>& disconnect_reason() const { | 71 const base::Optional<DisconnectReason>& disconnect_reason() const { |
| 65 return disconnect_reason_; | 72 return disconnect_reason_; |
| 66 } | 73 } |
| 67 void set_disconnect_reason( | 74 void set_disconnect_reason( |
| 68 const base::Optional<DisconnectReason>& disconnect_reason) { | 75 const base::Optional<DisconnectReason>& disconnect_reason) { |
| 69 router_->AssertLockAcquired(); | 76 router_->AssertLockAcquired(); |
| 70 disconnect_reason_ = disconnect_reason; | 77 disconnect_reason_ = disconnect_reason; |
| 71 } | 78 } |
| 72 | 79 |
| 73 base::SingleThreadTaskRunner* task_runner() const { | 80 base::SingleThreadTaskRunner* task_runner() const { |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 127 event_signalled_ = false; | 134 event_signalled_ = false; |
| 128 } | 135 } |
| 129 | 136 |
| 130 // --------------------------------------------------------------------------- | 137 // --------------------------------------------------------------------------- |
| 131 // The following public methods (i.e., InterfaceEndpointController | 138 // The following public methods (i.e., InterfaceEndpointController |
| 132 // implementation) are called by the client on the same thread as the | 139 // implementation) are called by the client on the same thread as the |
| 133 // AttachClient() call. They are called outside of the router's lock. | 140 // AttachClient() call. They are called outside of the router's lock. |
| 134 | 141 |
| 135 bool SendMessage(Message* message) override { | 142 bool SendMessage(Message* message) override { |
| 136 DCHECK(task_runner_->BelongsToCurrentThread()); | 143 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 144 message->SerializeAssociatedEndpointHandles(router_); |
| 137 message->set_interface_id(id_); | 145 message->set_interface_id(id_); |
| 138 return router_->connector_.Accept(message); | 146 return router_->connector_.Accept(message); |
| 139 } | 147 } |
| 140 | 148 |
| 141 void AllowWokenUpBySyncWatchOnSameThread() override { | 149 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 142 DCHECK(task_runner_->BelongsToCurrentThread()); | 150 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 143 | 151 |
| 144 EnsureSyncWatcherExists(); | 152 EnsureSyncWatcherExists(); |
| 145 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 153 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 146 } | 154 } |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 230 const InterfaceId id_; | 238 const InterfaceId id_; |
| 231 | 239 |
| 232 // --------------------------------------------------------------------------- | 240 // --------------------------------------------------------------------------- |
| 233 // The following members are accessed under the router's lock. | 241 // The following members are accessed under the router's lock. |
| 234 | 242 |
| 235 // Whether the endpoint has been closed. | 243 // Whether the endpoint has been closed. |
| 236 bool closed_; | 244 bool closed_; |
| 237 // Whether the peer endpoint has been closed. | 245 // Whether the peer endpoint has been closed. |
| 238 bool peer_closed_; | 246 bool peer_closed_; |
| 239 | 247 |
| 248 // Whether there is already a ScopedInterfaceEndpointHandle created for this |
| 249 // endpoint. |
| 250 bool handle_created_; |
| 251 |
| 240 base::Optional<DisconnectReason> disconnect_reason_; | 252 base::Optional<DisconnectReason> disconnect_reason_; |
| 241 | 253 |
| 242 // The task runner on which |client_|'s methods can be called. | 254 // The task runner on which |client_|'s methods can be called. |
| 243 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 255 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 244 // Not owned. It is null if no client is attached to this endpoint. | 256 // Not owned. It is null if no client is attached to this endpoint. |
| 245 InterfaceEndpointClient* client_; | 257 InterfaceEndpointClient* client_; |
| 246 | 258 |
| 247 // A message pipe used as an event to signal that sync messages are available. | 259 // A message pipe used as an event to signal that sync messages are available. |
| 248 // The message pipe handles are initialized under the router's lock and remain | 260 // The message pipe handles are initialized under the router's lock and remain |
| 249 // unchanged afterwards. They may be accessed outside of the router's lock | 261 // unchanged afterwards. They may be accessed outside of the router's lock |
| 250 // later. | 262 // later. |
| 251 ScopedMessagePipeHandle sync_message_event_sender_; | 263 ScopedMessagePipeHandle sync_message_event_sender_; |
| 252 ScopedMessagePipeHandle sync_message_event_receiver_; | 264 ScopedMessagePipeHandle sync_message_event_receiver_; |
| 253 bool event_signalled_; | 265 bool event_signalled_; |
| 254 | 266 |
| 255 // --------------------------------------------------------------------------- | 267 // --------------------------------------------------------------------------- |
| 256 // The following members are only valid while a client is attached. They are | 268 // The following members are only valid while a client is attached. They are |
| 257 // used exclusively on the client's thread. They may be accessed outside of | 269 // used exclusively on the client's thread. They may be accessed outside of |
| 258 // the router's lock. | 270 // the router's lock. |
| 259 | 271 |
| 260 std::unique_ptr<SyncHandleWatcher> sync_watcher_; | 272 std::unique_ptr<SyncHandleWatcher> sync_watcher_; |
| 261 | 273 |
| 262 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); | 274 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); |
| 263 }; | 275 }; |
| 264 | 276 |
| 277 // Message objects cannot be destroyed under the router's lock, if they contain |
| 278 // ScopedInterfaceEndpointHandle objects. |
| 279 // IncomingMessageWrapper is used to wrap messages which haven't got the payload |
| 280 // interface IDs deserialized into ScopedInterfaceEndpointHandles. Wrapper |
| 281 // objects are always destroyed under the router's lock. When a wrapper is |
| 282 // destroyed and the message hasn't been consumed, the wrapper is responsible |
| 283 // to send endpoint closed notifications. |
| 284 class MultiplexRouter::IncomingMessageWrapper { |
| 285 public: |
| 286 IncomingMessageWrapper() = default; |
| 287 |
| 288 IncomingMessageWrapper(MultiplexRouter* router, Message* message) |
| 289 : router_(router), value_(std::move(*message)) { |
| 290 DCHECK(value_.associated_endpoint_handles()->empty()); |
| 291 } |
| 292 |
| 293 IncomingMessageWrapper(IncomingMessageWrapper&& other) |
| 294 : router_(other.router_), value_(std::move(other.value_)) {} |
| 295 |
| 296 ~IncomingMessageWrapper() { |
| 297 if (value_.IsNull()) |
| 298 return; |
| 299 |
| 300 router_->AssertLockAcquired(); |
| 301 |
| 302 uint32_t num_ids = value_.payload_num_interface_ids(); |
| 303 const uint32_t* ids = value_.payload_interface_ids(); |
| 304 for (uint32_t i = 0; i < num_ids; ++i) { |
| 305 MayAutoUnlock unlocker(router_->lock_.get()); |
| 306 router_->control_message_proxy_.NotifyPeerEndpointClosed(ids[i], |
| 307 base::nullopt); |
| 308 } |
| 309 } |
| 310 |
| 311 IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) { |
| 312 router_ = other.router_; |
| 313 value_ = std::move(other.value_); |
| 314 return *this; |
| 315 } |
| 316 |
| 317 // Must be called outside of the router's lock. |
| 318 bool TakeMessage(Message* output) { |
| 319 DCHECK(!value_.IsNull()); |
| 320 |
| 321 *output = std::move(value_); |
| 322 return output->DeserializeAssociatedEndpointHandles(router_); |
| 323 } |
| 324 |
| 325 const Message& value() const { return value_; } |
| 326 |
| 327 private: |
| 328 MultiplexRouter* router_ = nullptr; |
| 329 // It must not hold any ScopedInterfaceEndpointHandle objects. |
| 330 Message value_; |
| 331 |
| 332 DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper); |
| 333 }; |
| 334 |
| 265 struct MultiplexRouter::Task { | 335 struct MultiplexRouter::Task { |
| 266 public: | 336 public: |
| 267 // Doesn't take ownership of |message| but takes its contents. | 337 // Doesn't take ownership of |message| but takes its contents. |
| 268 static std::unique_ptr<Task> CreateMessageTask(Message* message) { | 338 static std::unique_ptr<Task> CreateMessageTask( |
| 339 IncomingMessageWrapper message_wrapper) { |
| 269 Task* task = new Task(MESSAGE); | 340 Task* task = new Task(MESSAGE); |
| 270 task->message = std::move(*message); | 341 task->message_wrapper = std::move(message_wrapper); |
| 271 return base::WrapUnique(task); | 342 return base::WrapUnique(task); |
| 272 } | 343 } |
| 273 static std::unique_ptr<Task> CreateNotifyErrorTask( | 344 static std::unique_ptr<Task> CreateNotifyErrorTask( |
| 274 InterfaceEndpoint* endpoint) { | 345 InterfaceEndpoint* endpoint) { |
| 275 Task* task = new Task(NOTIFY_ERROR); | 346 Task* task = new Task(NOTIFY_ERROR); |
| 276 task->endpoint_to_notify = endpoint; | 347 task->endpoint_to_notify = endpoint; |
| 277 return base::WrapUnique(task); | 348 return base::WrapUnique(task); |
| 278 } | 349 } |
| 279 | 350 |
| 280 ~Task() {} | 351 ~Task() {} |
| 281 | 352 |
| 282 bool IsMessageTask() const { return type == MESSAGE; } | 353 bool IsMessageTask() const { return type == MESSAGE; } |
| 283 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } | 354 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } |
| 284 | 355 |
| 285 Message message; | 356 IncomingMessageWrapper message_wrapper; |
| 286 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; | 357 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; |
| 287 | 358 |
| 288 enum Type { MESSAGE, NOTIFY_ERROR }; | 359 enum Type { MESSAGE, NOTIFY_ERROR }; |
| 289 Type type; | 360 Type type; |
| 290 | 361 |
| 291 private: | 362 private: |
| 292 explicit Task(Type in_type) : type(in_type) {} | 363 explicit Task(Type in_type) : type(in_type) {} |
| 364 |
| 365 DISALLOW_COPY_AND_ASSIGN(Task); |
| 293 }; | 366 }; |
| 294 | 367 |
| 295 MultiplexRouter::MultiplexRouter( | 368 MultiplexRouter::MultiplexRouter( |
| 296 ScopedMessagePipeHandle message_pipe, | 369 ScopedMessagePipeHandle message_pipe, |
| 297 Config config, | 370 Config config, |
| 298 bool set_interface_id_namesapce_bit, | 371 bool set_interface_id_namesapce_bit, |
| 299 scoped_refptr<base::SingleThreadTaskRunner> runner) | 372 scoped_refptr<base::SingleThreadTaskRunner> runner) |
| 300 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), | 373 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), |
| 301 task_runner_(runner), | 374 task_runner_(runner), |
| 302 header_validator_(nullptr), | 375 header_validator_(nullptr), |
| (...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 394 | 467 |
| 395 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( | 468 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( |
| 396 InterfaceId id) { | 469 InterfaceId id) { |
| 397 if (!IsValidInterfaceId(id)) | 470 if (!IsValidInterfaceId(id)) |
| 398 return ScopedInterfaceEndpointHandle(); | 471 return ScopedInterfaceEndpointHandle(); |
| 399 | 472 |
| 400 MayAutoLock locker(lock_.get()); | 473 MayAutoLock locker(lock_.get()); |
| 401 bool inserted = false; | 474 bool inserted = false; |
| 402 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 475 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 403 if (inserted) { | 476 if (inserted) { |
| 477 DCHECK(!endpoint->handle_created()); |
| 478 |
| 404 if (encountered_error_) | 479 if (encountered_error_) |
| 405 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 480 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 406 } else { | 481 } else { |
| 407 // If the endpoint already exist, it is because we have received a | 482 // If the endpoint already exist, it is because we have received a |
| 408 // notification that the peer endpoint has closed. | 483 // notification that the peer endpoint has closed. |
| 409 CHECK(!endpoint->closed()); | 484 CHECK(!endpoint->closed()); |
| 410 CHECK(endpoint->peer_closed()); | 485 CHECK(endpoint->peer_closed()); |
| 486 |
| 487 if (endpoint->handle_created()) |
| 488 return ScopedInterfaceEndpointHandle(); |
| 411 } | 489 } |
| 490 |
| 491 endpoint->set_handle_created(); |
| 412 return CreateScopedInterfaceEndpointHandle(id, true); | 492 return CreateScopedInterfaceEndpointHandle(id, true); |
| 413 } | 493 } |
| 414 | 494 |
| 415 void MultiplexRouter::CloseEndpointHandle( | 495 void MultiplexRouter::CloseEndpointHandle( |
| 416 InterfaceId id, | 496 InterfaceId id, |
| 417 bool is_local, | 497 bool is_local, |
| 418 const base::Optional<DisconnectReason>& reason) { | 498 const base::Optional<DisconnectReason>& reason) { |
| 419 if (!IsValidInterfaceId(id)) | 499 if (!IsValidInterfaceId(id)) |
| 420 return; | 500 return; |
| 421 | 501 |
| (...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 547 } | 627 } |
| 548 | 628 |
| 549 bool MultiplexRouter::Accept(Message* message) { | 629 bool MultiplexRouter::Accept(Message* message) { |
| 550 DCHECK(thread_checker_.CalledOnValidThread()); | 630 DCHECK(thread_checker_.CalledOnValidThread()); |
| 551 | 631 |
| 552 scoped_refptr<MultiplexRouter> protector(this); | 632 scoped_refptr<MultiplexRouter> protector(this); |
| 553 MayAutoLock locker(lock_.get()); | 633 MayAutoLock locker(lock_.get()); |
| 554 | 634 |
| 555 DCHECK(!paused_); | 635 DCHECK(!paused_); |
| 556 | 636 |
| 637 IncomingMessageWrapper message_wrapper(this, message); |
| 638 |
| 557 ClientCallBehavior client_call_behavior = | 639 ClientCallBehavior client_call_behavior = |
| 558 connector_.during_sync_handle_watcher_callback() | 640 connector_.during_sync_handle_watcher_callback() |
| 559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES | 641 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES |
| 560 : ALLOW_DIRECT_CLIENT_CALLS; | 642 : ALLOW_DIRECT_CLIENT_CALLS; |
| 561 | 643 |
| 562 bool processed = | 644 bool processed = tasks_.empty() && ProcessIncomingMessage( |
| 563 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, | 645 &message_wrapper, client_call_behavior, |
| 564 connector_.task_runner()); | 646 connector_.task_runner()); |
| 565 | 647 |
| 566 if (!processed) { | 648 if (!processed) { |
| 567 // Either the task queue is not empty or we cannot process the message | 649 // Either the task queue is not empty or we cannot process the message |
| 568 // directly. In both cases, there is no need to call ProcessTasks(). | 650 // directly. In both cases, there is no need to call ProcessTasks(). |
| 569 tasks_.push_back(Task::CreateMessageTask(message)); | 651 tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper))); |
| 570 Task* task = tasks_.back().get(); | 652 Task* task = tasks_.back().get(); |
| 571 | 653 |
| 572 if (task->message.has_flag(Message::kFlagIsSync)) { | 654 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) { |
| 573 InterfaceId id = task->message.interface_id(); | 655 InterfaceId id = task->message_wrapper.value().interface_id(); |
| 574 sync_message_tasks_[id].push_back(task); | 656 sync_message_tasks_[id].push_back(task); |
| 575 auto iter = endpoints_.find(id); | 657 auto iter = endpoints_.find(id); |
| 576 if (iter != endpoints_.end()) | 658 if (iter != endpoints_.end()) |
| 577 iter->second->SignalSyncMessageEvent(); | 659 iter->second->SignalSyncMessageEvent(); |
| 578 } | 660 } |
| 579 } else if (!tasks_.empty()) { | 661 } else if (!tasks_.empty()) { |
| 580 // Processing the message may result in new tasks (for error notification) | 662 // Processing the message may result in new tasks (for error notification) |
| 581 // being added to the queue. In this case, we have to attempt to process the | 663 // being added to the queue. In this case, we have to attempt to process the |
| 582 // tasks. | 664 // tasks. |
| 583 ProcessTasks(client_call_behavior, connector_.task_runner()); | 665 ProcessTasks(client_call_behavior, connector_.task_runner()); |
| 584 } | 666 } |
| 585 | 667 |
| 586 // Always return true. If we see errors during message processing, we will | 668 // Always return true. If we see errors during message processing, we will |
| 587 // explicitly call Connector::RaiseError() to disconnect the message pipe. | 669 // explicitly call Connector::RaiseError() to disconnect the message pipe. |
| 588 return true; | 670 return true; |
| 589 } | 671 } |
| 590 | 672 |
| 591 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( | 673 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( |
| 592 InterfaceId id, | 674 InterfaceId id, |
| 593 const base::Optional<DisconnectReason>& reason) { | 675 const base::Optional<DisconnectReason>& reason) { |
| 594 AssertLockAcquired(); | |
| 595 | |
| 596 DCHECK(!IsMasterInterfaceId(id) || reason); | 676 DCHECK(!IsMasterInterfaceId(id) || reason); |
| 597 | 677 |
| 678 MayAutoLock locker(lock_.get()); |
| 598 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 679 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 599 | 680 |
| 600 if (reason) | 681 if (reason) |
| 601 endpoint->set_disconnect_reason(reason); | 682 endpoint->set_disconnect_reason(reason); |
| 602 | 683 |
| 603 // It is possible that this endpoint has been set as peer closed. That is | 684 // It is possible that this endpoint has been set as peer closed. That is |
| 604 // because when the message pipe is closed, all the endpoints are updated with | 685 // because when the message pipe is closed, all the endpoints are updated with |
| 605 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, | 686 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, |
| 606 // as long as there are refs keeping the router alive. If there is a | 687 // as long as there are refs keeping the router alive. If there is a |
| 607 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get | 688 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get |
| 608 // here and see that the endpoint has been marked as peer closed. | 689 // here and see that the endpoint has been marked as peer closed. |
| 609 if (!endpoint->peer_closed()) { | 690 if (!endpoint->peer_closed()) { |
| 610 if (endpoint->client()) | 691 if (endpoint->client()) |
| 611 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); | 692 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); |
| 612 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); | 693 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); |
| 613 } | 694 } |
| 614 | 695 |
| 615 // No need to trigger a ProcessTasks() because it is already on the stack. | 696 // No need to trigger a ProcessTasks() because it is already on the stack. |
| 616 | 697 |
| 617 return true; | 698 return true; |
| 618 } | 699 } |
| 619 | 700 |
| 620 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { | 701 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { |
| 621 AssertLockAcquired(); | |
| 622 | |
| 623 if (IsMasterInterfaceId(id)) | 702 if (IsMasterInterfaceId(id)) |
| 624 return false; | 703 return false; |
| 625 | 704 |
| 626 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); | 705 { |
| 627 DCHECK(!endpoint->closed()); | 706 MayAutoLock locker(lock_.get()); |
| 628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); | |
| 629 | 707 |
| 630 MayAutoUnlock unlocker(lock_.get()); | 708 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); |
| 709 DCHECK(!endpoint->closed()); |
| 710 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); |
| 711 } |
| 712 |
| 631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); | 713 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); |
| 632 | |
| 633 return true; | 714 return true; |
| 634 } | 715 } |
| 635 | 716 |
| 636 void MultiplexRouter::OnPipeConnectionError() { | 717 void MultiplexRouter::OnPipeConnectionError() { |
| 637 DCHECK(thread_checker_.CalledOnValidThread()); | 718 DCHECK(thread_checker_.CalledOnValidThread()); |
| 638 | 719 |
| 639 scoped_refptr<MultiplexRouter> protector(this); | 720 scoped_refptr<MultiplexRouter> protector(this); |
| 640 MayAutoLock locker(lock_.get()); | 721 MayAutoLock locker(lock_.get()); |
| 641 | 722 |
| 642 encountered_error_ = true; | 723 encountered_error_ = true; |
| (...skipping 22 matching lines...) Expand all Loading... |
| 665 AssertLockAcquired(); | 746 AssertLockAcquired(); |
| 666 | 747 |
| 667 if (posted_to_process_tasks_) | 748 if (posted_to_process_tasks_) |
| 668 return; | 749 return; |
| 669 | 750 |
| 670 while (!tasks_.empty() && !paused_) { | 751 while (!tasks_.empty() && !paused_) { |
| 671 std::unique_ptr<Task> task(std::move(tasks_.front())); | 752 std::unique_ptr<Task> task(std::move(tasks_.front())); |
| 672 tasks_.pop_front(); | 753 tasks_.pop_front(); |
| 673 | 754 |
| 674 InterfaceId id = kInvalidInterfaceId; | 755 InterfaceId id = kInvalidInterfaceId; |
| 675 bool sync_message = task->IsMessageTask() && !task->message.IsNull() && | 756 bool sync_message = |
| 676 task->message.has_flag(Message::kFlagIsSync); | 757 task->IsMessageTask() && !task->message_wrapper.value().IsNull() && |
| 758 task->message_wrapper.value().has_flag(Message::kFlagIsSync); |
| 677 if (sync_message) { | 759 if (sync_message) { |
| 678 id = task->message.interface_id(); | 760 id = task->message_wrapper.value().interface_id(); |
| 679 auto& sync_message_queue = sync_message_tasks_[id]; | 761 auto& sync_message_queue = sync_message_tasks_[id]; |
| 680 DCHECK_EQ(task.get(), sync_message_queue.front()); | 762 DCHECK_EQ(task.get(), sync_message_queue.front()); |
| 681 sync_message_queue.pop_front(); | 763 sync_message_queue.pop_front(); |
| 682 } | 764 } |
| 683 | 765 |
| 684 bool processed = | 766 bool processed = |
| 685 task->IsNotifyErrorTask() | 767 task->IsNotifyErrorTask() |
| 686 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, | 768 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, |
| 687 current_task_runner) | 769 current_task_runner) |
| 688 : ProcessIncomingMessage(&task->message, client_call_behavior, | 770 : ProcessIncomingMessage(&task->message_wrapper, |
| 689 current_task_runner); | 771 client_call_behavior, current_task_runner); |
| 690 | 772 |
| 691 if (!processed) { | 773 if (!processed) { |
| 692 if (sync_message) { | 774 if (sync_message) { |
| 693 auto& sync_message_queue = sync_message_tasks_[id]; | 775 auto& sync_message_queue = sync_message_tasks_[id]; |
| 694 sync_message_queue.push_front(task.get()); | 776 sync_message_queue.push_front(task.get()); |
| 695 } | 777 } |
| 696 tasks_.push_front(std::move(task)); | 778 tasks_.push_front(std::move(task)); |
| 697 break; | 779 break; |
| 698 } else { | 780 } else { |
| 699 if (sync_message) { | 781 if (sync_message) { |
| (...skipping 12 matching lines...) Expand all Loading... |
| 712 if (iter == sync_message_tasks_.end()) | 794 if (iter == sync_message_tasks_.end()) |
| 713 return false; | 795 return false; |
| 714 | 796 |
| 715 if (paused_) | 797 if (paused_) |
| 716 return true; | 798 return true; |
| 717 | 799 |
| 718 MultiplexRouter::Task* task = iter->second.front(); | 800 MultiplexRouter::Task* task = iter->second.front(); |
| 719 iter->second.pop_front(); | 801 iter->second.pop_front(); |
| 720 | 802 |
| 721 DCHECK(task->IsMessageTask()); | 803 DCHECK(task->IsMessageTask()); |
| 722 Message message = std::move(task->message); | 804 IncomingMessageWrapper message_wrapper = std::move(task->message_wrapper); |
| 723 | 805 |
| 724 // Note: after this call, |task| and |iter| may be invalidated. | 806 // Note: after this call, |task| and |iter| may be invalidated. |
| 725 bool processed = ProcessIncomingMessage( | 807 bool processed = ProcessIncomingMessage( |
| 726 &message, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); | 808 &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); |
| 727 DCHECK(processed); | 809 DCHECK(processed); |
| 728 | 810 |
| 729 iter = sync_message_tasks_.find(id); | 811 iter = sync_message_tasks_.find(id); |
| 730 if (iter == sync_message_tasks_.end()) | 812 if (iter == sync_message_tasks_.end()) |
| 731 return false; | 813 return false; |
| 732 | 814 |
| 733 if (iter->second.empty()) { | 815 if (iter->second.empty()) { |
| 734 sync_message_tasks_.erase(iter); | 816 sync_message_tasks_.erase(iter); |
| 735 return false; | 817 return false; |
| 736 } | 818 } |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 768 // | 850 // |
| 769 // It is safe to call into |client| without the lock. Because |client| is | 851 // It is safe to call into |client| without the lock. Because |client| is |
| 770 // always accessed on the same thread, including DetachEndpointClient(). | 852 // always accessed on the same thread, including DetachEndpointClient(). |
| 771 MayAutoUnlock unlocker(lock_.get()); | 853 MayAutoUnlock unlocker(lock_.get()); |
| 772 client->NotifyError(disconnect_reason); | 854 client->NotifyError(disconnect_reason); |
| 773 } | 855 } |
| 774 return true; | 856 return true; |
| 775 } | 857 } |
| 776 | 858 |
| 777 bool MultiplexRouter::ProcessIncomingMessage( | 859 bool MultiplexRouter::ProcessIncomingMessage( |
| 778 Message* message, | 860 IncomingMessageWrapper* message_wrapper, |
| 779 ClientCallBehavior client_call_behavior, | 861 ClientCallBehavior client_call_behavior, |
| 780 base::SingleThreadTaskRunner* current_task_runner) { | 862 base::SingleThreadTaskRunner* current_task_runner) { |
| 781 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); | 863 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); |
| 782 DCHECK(!paused_); | 864 DCHECK(!paused_); |
| 783 DCHECK(message); | 865 DCHECK(message_wrapper); |
| 784 AssertLockAcquired(); | 866 AssertLockAcquired(); |
| 785 | 867 |
| 786 if (message->IsNull()) { | 868 if (message_wrapper->value().IsNull()) { |
| 787 // This is a sync message and has been processed during sync handle | 869 // This is a sync message and has been processed during sync handle |
| 788 // watching. | 870 // watching. |
| 789 return true; | 871 return true; |
| 790 } | 872 } |
| 791 | 873 |
| 792 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { | 874 if (PipeControlMessageHandler::IsPipeControlMessage( |
| 793 if (!control_message_handler_.Accept(message)) | 875 &message_wrapper->value())) { |
| 876 bool result = false; |
| 877 |
| 878 { |
| 879 MayAutoUnlock unlocker(lock_.get()); |
| 880 Message message; |
| 881 result = message_wrapper->TakeMessage(&message) && |
| 882 control_message_handler_.Accept(&message); |
| 883 } |
| 884 |
| 885 if (!result) |
| 794 RaiseErrorInNonTestingMode(); | 886 RaiseErrorInNonTestingMode(); |
| 887 |
| 795 return true; | 888 return true; |
| 796 } | 889 } |
| 797 | 890 |
| 798 InterfaceId id = message->interface_id(); | 891 InterfaceId id = message_wrapper->value().interface_id(); |
| 799 DCHECK(IsValidInterfaceId(id)); | 892 DCHECK(IsValidInterfaceId(id)); |
| 800 | 893 |
| 801 bool inserted = false; | 894 bool inserted = false; |
| 802 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 895 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 803 if (inserted) { | 896 if (inserted) { |
| 804 // Currently, it is legitimate to receive messages for an endpoint | 897 // Currently, it is legitimate to receive messages for an endpoint |
| 805 // that is not registered. For example, the endpoint is transferred in | 898 // that is not registered. For example, the endpoint is transferred in |
| 806 // a message that is discarded. Once we add support to specify all | 899 // a message that is discarded. Once we add support to specify all |
| 807 // enclosing endpoints in message header, we should be able to remove | 900 // enclosing endpoints in message header, we should be able to remove |
| 808 // this. | 901 // this. |
| (...skipping 16 matching lines...) Expand all Loading... |
| 825 if (endpoint->closed()) | 918 if (endpoint->closed()) |
| 826 return true; | 919 return true; |
| 827 | 920 |
| 828 if (!endpoint->client()) { | 921 if (!endpoint->client()) { |
| 829 // We need to wait until a client is attached in order to dispatch further | 922 // We need to wait until a client is attached in order to dispatch further |
| 830 // messages. | 923 // messages. |
| 831 return false; | 924 return false; |
| 832 } | 925 } |
| 833 | 926 |
| 834 bool can_direct_call; | 927 bool can_direct_call; |
| 835 if (message->has_flag(Message::kFlagIsSync)) { | 928 if (message_wrapper->value().has_flag(Message::kFlagIsSync)) { |
| 836 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && | 929 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && |
| 837 endpoint->task_runner()->BelongsToCurrentThread(); | 930 endpoint->task_runner()->BelongsToCurrentThread(); |
| 838 } else { | 931 } else { |
| 839 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && | 932 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && |
| 840 endpoint->task_runner() == current_task_runner; | 933 endpoint->task_runner() == current_task_runner; |
| 841 } | 934 } |
| 842 | 935 |
| 843 if (!can_direct_call) { | 936 if (!can_direct_call) { |
| 844 MaybePostToProcessTasks(endpoint->task_runner()); | 937 MaybePostToProcessTasks(endpoint->task_runner()); |
| 845 return false; | 938 return false; |
| 846 } | 939 } |
| 847 | 940 |
| 848 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 941 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 849 | 942 |
| 850 InterfaceEndpointClient* client = endpoint->client(); | 943 InterfaceEndpointClient* client = endpoint->client(); |
| 851 bool result = false; | 944 bool result = false; |
| 852 { | 945 { |
| 853 // We must unlock before calling into |client| because it may call this | 946 // We must unlock before calling into |client| because it may call this |
| 854 // object within HandleIncomingMessage(). Holding the lock will lead to | 947 // object within HandleIncomingMessage(). Holding the lock will lead to |
| 855 // deadlock. | 948 // deadlock. |
| 856 // | 949 // |
| 857 // It is safe to call into |client| without the lock. Because |client| is | 950 // It is safe to call into |client| without the lock. Because |client| is |
| 858 // always accessed on the same thread, including DetachEndpointClient(). | 951 // always accessed on the same thread, including DetachEndpointClient(). |
| 859 MayAutoUnlock unlocker(lock_.get()); | 952 MayAutoUnlock unlocker(lock_.get()); |
| 860 result = client->HandleIncomingMessage(message); | 953 Message message; |
| 954 result = message_wrapper->TakeMessage(&message) && |
| 955 client->HandleIncomingMessage(&message); |
| 861 } | 956 } |
| 862 if (!result) | 957 if (!result) |
| 863 RaiseErrorInNonTestingMode(); | 958 RaiseErrorInNonTestingMode(); |
| 864 | 959 |
| 865 return true; | 960 return true; |
| 866 } | 961 } |
| 867 | 962 |
| 868 void MultiplexRouter::MaybePostToProcessTasks( | 963 void MultiplexRouter::MaybePostToProcessTasks( |
| 869 base::SingleThreadTaskRunner* task_runner) { | 964 base::SingleThreadTaskRunner* task_runner) { |
| 870 AssertLockAcquired(); | 965 AssertLockAcquired(); |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 935 | 1030 |
| 936 void MultiplexRouter::AssertLockAcquired() { | 1031 void MultiplexRouter::AssertLockAcquired() { |
| 937 #if DCHECK_IS_ON() | 1032 #if DCHECK_IS_ON() |
| 938 if (lock_) | 1033 if (lock_) |
| 939 lock_->AssertAcquired(); | 1034 lock_->AssertAcquired(); |
| 940 #endif | 1035 #endif |
| 941 } | 1036 } |
| 942 | 1037 |
| 943 } // namespace internal | 1038 } // namespace internal |
| 944 } // namespace mojo | 1039 } // namespace mojo |
| OLD | NEW |