Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(500)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2660733002: Mojo C++ bindings: introduce an optional array to store transferred interface IDs in messages. (Closed)
Patch Set: . Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 19 matching lines...) Expand all
30 // this object. 30 // this object.
31 class MultiplexRouter::InterfaceEndpoint 31 class MultiplexRouter::InterfaceEndpoint
32 : public base::RefCounted<InterfaceEndpoint>, 32 : public base::RefCounted<InterfaceEndpoint>,
33 public InterfaceEndpointController { 33 public InterfaceEndpointController {
34 public: 34 public:
35 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) 35 InterfaceEndpoint(MultiplexRouter* router, InterfaceId id)
36 : router_(router), 36 : router_(router),
37 id_(id), 37 id_(id),
38 closed_(false), 38 closed_(false),
39 peer_closed_(false), 39 peer_closed_(false),
40 handle_created_(false),
40 client_(nullptr), 41 client_(nullptr),
41 event_signalled_(false) {} 42 event_signalled_(false) {}
42 43
43 // --------------------------------------------------------------------------- 44 // ---------------------------------------------------------------------------
44 // The following public methods are safe to call from any threads without 45 // The following public methods are safe to call from any threads without
45 // locking. 46 // locking.
46 47
47 InterfaceId id() const { return id_; } 48 InterfaceId id() const { return id_; }
48 49
49 // --------------------------------------------------------------------------- 50 // ---------------------------------------------------------------------------
50 // The following public methods are called under the router's lock. 51 // The following public methods are called under the router's lock.
51 52
52 bool closed() const { return closed_; } 53 bool closed() const { return closed_; }
53 void set_closed() { 54 void set_closed() {
54 router_->AssertLockAcquired(); 55 router_->AssertLockAcquired();
55 closed_ = true; 56 closed_ = true;
56 } 57 }
57 58
58 bool peer_closed() const { return peer_closed_; } 59 bool peer_closed() const { return peer_closed_; }
59 void set_peer_closed() { 60 void set_peer_closed() {
60 router_->AssertLockAcquired(); 61 router_->AssertLockAcquired();
61 peer_closed_ = true; 62 peer_closed_ = true;
62 } 63 }
63 64
65 bool handle_created() const { return handle_created_; }
66 void set_handle_created() {
67 router_->AssertLockAcquired();
68 handle_created_ = true;
69 }
70
64 const base::Optional<DisconnectReason>& disconnect_reason() const { 71 const base::Optional<DisconnectReason>& disconnect_reason() const {
65 return disconnect_reason_; 72 return disconnect_reason_;
66 } 73 }
67 void set_disconnect_reason( 74 void set_disconnect_reason(
68 const base::Optional<DisconnectReason>& disconnect_reason) { 75 const base::Optional<DisconnectReason>& disconnect_reason) {
69 router_->AssertLockAcquired(); 76 router_->AssertLockAcquired();
70 disconnect_reason_ = disconnect_reason; 77 disconnect_reason_ = disconnect_reason;
71 } 78 }
72 79
73 base::SingleThreadTaskRunner* task_runner() const { 80 base::SingleThreadTaskRunner* task_runner() const {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
127 event_signalled_ = false; 134 event_signalled_ = false;
128 } 135 }
129 136
130 // --------------------------------------------------------------------------- 137 // ---------------------------------------------------------------------------
131 // The following public methods (i.e., InterfaceEndpointController 138 // The following public methods (i.e., InterfaceEndpointController
132 // implementation) are called by the client on the same thread as the 139 // implementation) are called by the client on the same thread as the
133 // AttachClient() call. They are called outside of the router's lock. 140 // AttachClient() call. They are called outside of the router's lock.
134 141
135 bool SendMessage(Message* message) override { 142 bool SendMessage(Message* message) override {
136 DCHECK(task_runner_->BelongsToCurrentThread()); 143 DCHECK(task_runner_->BelongsToCurrentThread());
144 message->SerializeAssociatedEndpointHandles(router_);
137 message->set_interface_id(id_); 145 message->set_interface_id(id_);
138 return router_->connector_.Accept(message); 146 return router_->connector_.Accept(message);
139 } 147 }
140 148
141 void AllowWokenUpBySyncWatchOnSameThread() override { 149 void AllowWokenUpBySyncWatchOnSameThread() override {
142 DCHECK(task_runner_->BelongsToCurrentThread()); 150 DCHECK(task_runner_->BelongsToCurrentThread());
143 151
144 EnsureSyncWatcherExists(); 152 EnsureSyncWatcherExists();
145 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 153 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
146 } 154 }
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
230 const InterfaceId id_; 238 const InterfaceId id_;
231 239
232 // --------------------------------------------------------------------------- 240 // ---------------------------------------------------------------------------
233 // The following members are accessed under the router's lock. 241 // The following members are accessed under the router's lock.
234 242
235 // Whether the endpoint has been closed. 243 // Whether the endpoint has been closed.
236 bool closed_; 244 bool closed_;
237 // Whether the peer endpoint has been closed. 245 // Whether the peer endpoint has been closed.
238 bool peer_closed_; 246 bool peer_closed_;
239 247
248 // Whether there is already a ScopedInterfaceEndpointHandle created for this
249 // endpoint.
250 bool handle_created_;
251
240 base::Optional<DisconnectReason> disconnect_reason_; 252 base::Optional<DisconnectReason> disconnect_reason_;
241 253
242 // The task runner on which |client_|'s methods can be called. 254 // The task runner on which |client_|'s methods can be called.
243 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 255 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
244 // Not owned. It is null if no client is attached to this endpoint. 256 // Not owned. It is null if no client is attached to this endpoint.
245 InterfaceEndpointClient* client_; 257 InterfaceEndpointClient* client_;
246 258
247 // A message pipe used as an event to signal that sync messages are available. 259 // A message pipe used as an event to signal that sync messages are available.
248 // The message pipe handles are initialized under the router's lock and remain 260 // The message pipe handles are initialized under the router's lock and remain
249 // unchanged afterwards. They may be accessed outside of the router's lock 261 // unchanged afterwards. They may be accessed outside of the router's lock
250 // later. 262 // later.
251 ScopedMessagePipeHandle sync_message_event_sender_; 263 ScopedMessagePipeHandle sync_message_event_sender_;
252 ScopedMessagePipeHandle sync_message_event_receiver_; 264 ScopedMessagePipeHandle sync_message_event_receiver_;
253 bool event_signalled_; 265 bool event_signalled_;
254 266
255 // --------------------------------------------------------------------------- 267 // ---------------------------------------------------------------------------
256 // The following members are only valid while a client is attached. They are 268 // The following members are only valid while a client is attached. They are
257 // used exclusively on the client's thread. They may be accessed outside of 269 // used exclusively on the client's thread. They may be accessed outside of
258 // the router's lock. 270 // the router's lock.
259 271
260 std::unique_ptr<SyncHandleWatcher> sync_watcher_; 272 std::unique_ptr<SyncHandleWatcher> sync_watcher_;
261 273
262 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); 274 DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint);
263 }; 275 };
264 276
277 // Message objects cannot be destroyed under the router's lock, if they contain
278 // ScopedInterfaceEndpointHandle objects.
279 // IncomingMessageWrapper is used to wrap messages which haven't got the payload
280 // interface IDs deserialized into ScopedInterfaceEndpointHandles. Wrapper
281 // objects are always destroyed under the router's lock. When a wrapper is
282 // destroyed and the message hasn't been consumed, the wrapper is responsible
283 // to send endpoint closed notifications.
284 class MultiplexRouter::IncomingMessageWrapper {
285 public:
286 IncomingMessageWrapper() = default;
287
288 IncomingMessageWrapper(MultiplexRouter* router, Message* message)
289 : router_(router), value_(std::move(*message)) {
290 DCHECK(value_.associated_endpoint_handles()->empty());
291 }
292
293 IncomingMessageWrapper(IncomingMessageWrapper&& other)
294 : router_(other.router_), value_(std::move(other.value_)) {}
295
296 ~IncomingMessageWrapper() {
297 if (value_.IsNull())
298 return;
299
300 router_->AssertLockAcquired();
301
302 uint32_t num_ids = value_.payload_num_interface_ids();
303 const uint32_t* ids = value_.payload_interface_ids();
304 for (uint32_t i = 0; i < num_ids; ++i) {
305 MayAutoUnlock unlocker(router_->lock_.get());
306 router_->control_message_proxy_.NotifyPeerEndpointClosed(ids[i],
307 base::nullopt);
308 }
309 }
310
311 IncomingMessageWrapper& operator=(IncomingMessageWrapper&& other) {
312 router_ = other.router_;
313 value_ = std::move(other.value_);
314 return *this;
315 }
316
317 // Must be called outside of the router's lock.
318 bool TakeMessage(Message* output) {
319 DCHECK(!value_.IsNull());
320
321 *output = std::move(value_);
322 return output->DeserializeAssociatedEndpointHandles(router_);
323 }
324
325 const Message& value() const { return value_; }
326
327 private:
328 MultiplexRouter* router_ = nullptr;
329 // It must not hold any ScopedInterfaceEndpointHandle objects.
330 Message value_;
331
332 DISALLOW_COPY_AND_ASSIGN(IncomingMessageWrapper);
333 };
334
265 struct MultiplexRouter::Task { 335 struct MultiplexRouter::Task {
266 public: 336 public:
267 // Doesn't take ownership of |message| but takes its contents. 337 // Doesn't take ownership of |message| but takes its contents.
268 static std::unique_ptr<Task> CreateMessageTask(Message* message) { 338 static std::unique_ptr<Task> CreateMessageTask(
339 IncomingMessageWrapper message_wrapper) {
269 Task* task = new Task(MESSAGE); 340 Task* task = new Task(MESSAGE);
270 task->message = std::move(*message); 341 task->message_wrapper = std::move(message_wrapper);
271 return base::WrapUnique(task); 342 return base::WrapUnique(task);
272 } 343 }
273 static std::unique_ptr<Task> CreateNotifyErrorTask( 344 static std::unique_ptr<Task> CreateNotifyErrorTask(
274 InterfaceEndpoint* endpoint) { 345 InterfaceEndpoint* endpoint) {
275 Task* task = new Task(NOTIFY_ERROR); 346 Task* task = new Task(NOTIFY_ERROR);
276 task->endpoint_to_notify = endpoint; 347 task->endpoint_to_notify = endpoint;
277 return base::WrapUnique(task); 348 return base::WrapUnique(task);
278 } 349 }
279 350
280 ~Task() {} 351 ~Task() {}
281 352
282 bool IsMessageTask() const { return type == MESSAGE; } 353 bool IsMessageTask() const { return type == MESSAGE; }
283 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } 354 bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; }
284 355
285 Message message; 356 IncomingMessageWrapper message_wrapper;
286 scoped_refptr<InterfaceEndpoint> endpoint_to_notify; 357 scoped_refptr<InterfaceEndpoint> endpoint_to_notify;
287 358
288 enum Type { MESSAGE, NOTIFY_ERROR }; 359 enum Type { MESSAGE, NOTIFY_ERROR };
289 Type type; 360 Type type;
290 361
291 private: 362 private:
292 explicit Task(Type in_type) : type(in_type) {} 363 explicit Task(Type in_type) : type(in_type) {}
364
365 DISALLOW_COPY_AND_ASSIGN(Task);
293 }; 366 };
294 367
295 MultiplexRouter::MultiplexRouter( 368 MultiplexRouter::MultiplexRouter(
296 ScopedMessagePipeHandle message_pipe, 369 ScopedMessagePipeHandle message_pipe,
297 Config config, 370 Config config,
298 bool set_interface_id_namesapce_bit, 371 bool set_interface_id_namesapce_bit,
299 scoped_refptr<base::SingleThreadTaskRunner> runner) 372 scoped_refptr<base::SingleThreadTaskRunner> runner)
300 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 373 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
301 task_runner_(runner), 374 task_runner_(runner),
302 header_validator_(nullptr), 375 header_validator_(nullptr),
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
394 467
395 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( 468 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
396 InterfaceId id) { 469 InterfaceId id) {
397 if (!IsValidInterfaceId(id)) 470 if (!IsValidInterfaceId(id))
398 return ScopedInterfaceEndpointHandle(); 471 return ScopedInterfaceEndpointHandle();
399 472
400 MayAutoLock locker(lock_.get()); 473 MayAutoLock locker(lock_.get());
401 bool inserted = false; 474 bool inserted = false;
402 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 475 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
403 if (inserted) { 476 if (inserted) {
477 DCHECK(!endpoint->handle_created());
478
404 if (encountered_error_) 479 if (encountered_error_)
405 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 480 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
406 } else { 481 } else {
407 // If the endpoint already exist, it is because we have received a 482 // If the endpoint already exist, it is because we have received a
408 // notification that the peer endpoint has closed. 483 // notification that the peer endpoint has closed.
409 CHECK(!endpoint->closed()); 484 CHECK(!endpoint->closed());
410 CHECK(endpoint->peer_closed()); 485 CHECK(endpoint->peer_closed());
486
487 if (endpoint->handle_created())
488 return ScopedInterfaceEndpointHandle();
411 } 489 }
490
491 endpoint->set_handle_created();
412 return CreateScopedInterfaceEndpointHandle(id, true); 492 return CreateScopedInterfaceEndpointHandle(id, true);
413 } 493 }
414 494
415 void MultiplexRouter::CloseEndpointHandle( 495 void MultiplexRouter::CloseEndpointHandle(
416 InterfaceId id, 496 InterfaceId id,
417 bool is_local, 497 bool is_local,
418 const base::Optional<DisconnectReason>& reason) { 498 const base::Optional<DisconnectReason>& reason) {
419 if (!IsValidInterfaceId(id)) 499 if (!IsValidInterfaceId(id))
420 return; 500 return;
421 501
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after
547 } 627 }
548 628
549 bool MultiplexRouter::Accept(Message* message) { 629 bool MultiplexRouter::Accept(Message* message) {
550 DCHECK(thread_checker_.CalledOnValidThread()); 630 DCHECK(thread_checker_.CalledOnValidThread());
551 631
552 scoped_refptr<MultiplexRouter> protector(this); 632 scoped_refptr<MultiplexRouter> protector(this);
553 MayAutoLock locker(lock_.get()); 633 MayAutoLock locker(lock_.get());
554 634
555 DCHECK(!paused_); 635 DCHECK(!paused_);
556 636
637 IncomingMessageWrapper message_wrapper(this, message);
638
557 ClientCallBehavior client_call_behavior = 639 ClientCallBehavior client_call_behavior =
558 connector_.during_sync_handle_watcher_callback() 640 connector_.during_sync_handle_watcher_callback()
559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 641 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
560 : ALLOW_DIRECT_CLIENT_CALLS; 642 : ALLOW_DIRECT_CLIENT_CALLS;
561 643
562 bool processed = 644 bool processed = tasks_.empty() && ProcessIncomingMessage(
563 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, 645 &message_wrapper, client_call_behavior,
564 connector_.task_runner()); 646 connector_.task_runner());
565 647
566 if (!processed) { 648 if (!processed) {
567 // Either the task queue is not empty or we cannot process the message 649 // Either the task queue is not empty or we cannot process the message
568 // directly. In both cases, there is no need to call ProcessTasks(). 650 // directly. In both cases, there is no need to call ProcessTasks().
569 tasks_.push_back(Task::CreateMessageTask(message)); 651 tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper)));
570 Task* task = tasks_.back().get(); 652 Task* task = tasks_.back().get();
571 653
572 if (task->message.has_flag(Message::kFlagIsSync)) { 654 if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) {
573 InterfaceId id = task->message.interface_id(); 655 InterfaceId id = task->message_wrapper.value().interface_id();
574 sync_message_tasks_[id].push_back(task); 656 sync_message_tasks_[id].push_back(task);
575 auto iter = endpoints_.find(id); 657 auto iter = endpoints_.find(id);
576 if (iter != endpoints_.end()) 658 if (iter != endpoints_.end())
577 iter->second->SignalSyncMessageEvent(); 659 iter->second->SignalSyncMessageEvent();
578 } 660 }
579 } else if (!tasks_.empty()) { 661 } else if (!tasks_.empty()) {
580 // Processing the message may result in new tasks (for error notification) 662 // Processing the message may result in new tasks (for error notification)
581 // being added to the queue. In this case, we have to attempt to process the 663 // being added to the queue. In this case, we have to attempt to process the
582 // tasks. 664 // tasks.
583 ProcessTasks(client_call_behavior, connector_.task_runner()); 665 ProcessTasks(client_call_behavior, connector_.task_runner());
584 } 666 }
585 667
586 // Always return true. If we see errors during message processing, we will 668 // Always return true. If we see errors during message processing, we will
587 // explicitly call Connector::RaiseError() to disconnect the message pipe. 669 // explicitly call Connector::RaiseError() to disconnect the message pipe.
588 return true; 670 return true;
589 } 671 }
590 672
591 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( 673 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
592 InterfaceId id, 674 InterfaceId id,
593 const base::Optional<DisconnectReason>& reason) { 675 const base::Optional<DisconnectReason>& reason) {
594 AssertLockAcquired();
595
596 DCHECK(!IsMasterInterfaceId(id) || reason); 676 DCHECK(!IsMasterInterfaceId(id) || reason);
597 677
678 MayAutoLock locker(lock_.get());
598 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 679 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
599 680
600 if (reason) 681 if (reason)
601 endpoint->set_disconnect_reason(reason); 682 endpoint->set_disconnect_reason(reason);
602 683
603 // It is possible that this endpoint has been set as peer closed. That is 684 // It is possible that this endpoint has been set as peer closed. That is
604 // because when the message pipe is closed, all the endpoints are updated with 685 // because when the message pipe is closed, all the endpoints are updated with
605 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, 686 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
606 // as long as there are refs keeping the router alive. If there is a 687 // as long as there are refs keeping the router alive. If there is a
607 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get 688 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
608 // here and see that the endpoint has been marked as peer closed. 689 // here and see that the endpoint has been marked as peer closed.
609 if (!endpoint->peer_closed()) { 690 if (!endpoint->peer_closed()) {
610 if (endpoint->client()) 691 if (endpoint->client())
611 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 692 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
612 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 693 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
613 } 694 }
614 695
615 // No need to trigger a ProcessTasks() because it is already on the stack. 696 // No need to trigger a ProcessTasks() because it is already on the stack.
616 697
617 return true; 698 return true;
618 } 699 }
619 700
620 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) { 701 bool MultiplexRouter::OnAssociatedEndpointClosedBeforeSent(InterfaceId id) {
621 AssertLockAcquired();
622
623 if (IsMasterInterfaceId(id)) 702 if (IsMasterInterfaceId(id))
624 return false; 703 return false;
625 704
626 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 705 {
627 DCHECK(!endpoint->closed()); 706 MayAutoLock locker(lock_.get());
628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
629 707
630 MayAutoUnlock unlocker(lock_.get()); 708 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
709 DCHECK(!endpoint->closed());
710 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
711 }
712
631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); 713 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
632
633 return true; 714 return true;
634 } 715 }
635 716
636 void MultiplexRouter::OnPipeConnectionError() { 717 void MultiplexRouter::OnPipeConnectionError() {
637 DCHECK(thread_checker_.CalledOnValidThread()); 718 DCHECK(thread_checker_.CalledOnValidThread());
638 719
639 scoped_refptr<MultiplexRouter> protector(this); 720 scoped_refptr<MultiplexRouter> protector(this);
640 MayAutoLock locker(lock_.get()); 721 MayAutoLock locker(lock_.get());
641 722
642 encountered_error_ = true; 723 encountered_error_ = true;
(...skipping 22 matching lines...) Expand all
665 AssertLockAcquired(); 746 AssertLockAcquired();
666 747
667 if (posted_to_process_tasks_) 748 if (posted_to_process_tasks_)
668 return; 749 return;
669 750
670 while (!tasks_.empty() && !paused_) { 751 while (!tasks_.empty() && !paused_) {
671 std::unique_ptr<Task> task(std::move(tasks_.front())); 752 std::unique_ptr<Task> task(std::move(tasks_.front()));
672 tasks_.pop_front(); 753 tasks_.pop_front();
673 754
674 InterfaceId id = kInvalidInterfaceId; 755 InterfaceId id = kInvalidInterfaceId;
675 bool sync_message = task->IsMessageTask() && !task->message.IsNull() && 756 bool sync_message =
676 task->message.has_flag(Message::kFlagIsSync); 757 task->IsMessageTask() && !task->message_wrapper.value().IsNull() &&
758 task->message_wrapper.value().has_flag(Message::kFlagIsSync);
677 if (sync_message) { 759 if (sync_message) {
678 id = task->message.interface_id(); 760 id = task->message_wrapper.value().interface_id();
679 auto& sync_message_queue = sync_message_tasks_[id]; 761 auto& sync_message_queue = sync_message_tasks_[id];
680 DCHECK_EQ(task.get(), sync_message_queue.front()); 762 DCHECK_EQ(task.get(), sync_message_queue.front());
681 sync_message_queue.pop_front(); 763 sync_message_queue.pop_front();
682 } 764 }
683 765
684 bool processed = 766 bool processed =
685 task->IsNotifyErrorTask() 767 task->IsNotifyErrorTask()
686 ? ProcessNotifyErrorTask(task.get(), client_call_behavior, 768 ? ProcessNotifyErrorTask(task.get(), client_call_behavior,
687 current_task_runner) 769 current_task_runner)
688 : ProcessIncomingMessage(&task->message, client_call_behavior, 770 : ProcessIncomingMessage(&task->message_wrapper,
689 current_task_runner); 771 client_call_behavior, current_task_runner);
690 772
691 if (!processed) { 773 if (!processed) {
692 if (sync_message) { 774 if (sync_message) {
693 auto& sync_message_queue = sync_message_tasks_[id]; 775 auto& sync_message_queue = sync_message_tasks_[id];
694 sync_message_queue.push_front(task.get()); 776 sync_message_queue.push_front(task.get());
695 } 777 }
696 tasks_.push_front(std::move(task)); 778 tasks_.push_front(std::move(task));
697 break; 779 break;
698 } else { 780 } else {
699 if (sync_message) { 781 if (sync_message) {
(...skipping 12 matching lines...) Expand all
712 if (iter == sync_message_tasks_.end()) 794 if (iter == sync_message_tasks_.end())
713 return false; 795 return false;
714 796
715 if (paused_) 797 if (paused_)
716 return true; 798 return true;
717 799
718 MultiplexRouter::Task* task = iter->second.front(); 800 MultiplexRouter::Task* task = iter->second.front();
719 iter->second.pop_front(); 801 iter->second.pop_front();
720 802
721 DCHECK(task->IsMessageTask()); 803 DCHECK(task->IsMessageTask());
722 Message message = std::move(task->message); 804 IncomingMessageWrapper message_wrapper = std::move(task->message_wrapper);
723 805
724 // Note: after this call, |task| and |iter| may be invalidated. 806 // Note: after this call, |task| and |iter| may be invalidated.
725 bool processed = ProcessIncomingMessage( 807 bool processed = ProcessIncomingMessage(
726 &message, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); 808 &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr);
727 DCHECK(processed); 809 DCHECK(processed);
728 810
729 iter = sync_message_tasks_.find(id); 811 iter = sync_message_tasks_.find(id);
730 if (iter == sync_message_tasks_.end()) 812 if (iter == sync_message_tasks_.end())
731 return false; 813 return false;
732 814
733 if (iter->second.empty()) { 815 if (iter->second.empty()) {
734 sync_message_tasks_.erase(iter); 816 sync_message_tasks_.erase(iter);
735 return false; 817 return false;
736 } 818 }
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
768 // 850 //
769 // It is safe to call into |client| without the lock. Because |client| is 851 // It is safe to call into |client| without the lock. Because |client| is
770 // always accessed on the same thread, including DetachEndpointClient(). 852 // always accessed on the same thread, including DetachEndpointClient().
771 MayAutoUnlock unlocker(lock_.get()); 853 MayAutoUnlock unlocker(lock_.get());
772 client->NotifyError(disconnect_reason); 854 client->NotifyError(disconnect_reason);
773 } 855 }
774 return true; 856 return true;
775 } 857 }
776 858
777 bool MultiplexRouter::ProcessIncomingMessage( 859 bool MultiplexRouter::ProcessIncomingMessage(
778 Message* message, 860 IncomingMessageWrapper* message_wrapper,
779 ClientCallBehavior client_call_behavior, 861 ClientCallBehavior client_call_behavior,
780 base::SingleThreadTaskRunner* current_task_runner) { 862 base::SingleThreadTaskRunner* current_task_runner) {
781 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 863 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
782 DCHECK(!paused_); 864 DCHECK(!paused_);
783 DCHECK(message); 865 DCHECK(message_wrapper);
784 AssertLockAcquired(); 866 AssertLockAcquired();
785 867
786 if (message->IsNull()) { 868 if (message_wrapper->value().IsNull()) {
787 // This is a sync message and has been processed during sync handle 869 // This is a sync message and has been processed during sync handle
788 // watching. 870 // watching.
789 return true; 871 return true;
790 } 872 }
791 873
792 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 874 if (PipeControlMessageHandler::IsPipeControlMessage(
793 if (!control_message_handler_.Accept(message)) 875 &message_wrapper->value())) {
876 bool result = false;
877
878 {
879 MayAutoUnlock unlocker(lock_.get());
880 Message message;
881 result = message_wrapper->TakeMessage(&message) &&
882 control_message_handler_.Accept(&message);
883 }
884
885 if (!result)
794 RaiseErrorInNonTestingMode(); 886 RaiseErrorInNonTestingMode();
887
795 return true; 888 return true;
796 } 889 }
797 890
798 InterfaceId id = message->interface_id(); 891 InterfaceId id = message_wrapper->value().interface_id();
799 DCHECK(IsValidInterfaceId(id)); 892 DCHECK(IsValidInterfaceId(id));
800 893
801 bool inserted = false; 894 bool inserted = false;
802 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 895 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
803 if (inserted) { 896 if (inserted) {
804 // Currently, it is legitimate to receive messages for an endpoint 897 // Currently, it is legitimate to receive messages for an endpoint
805 // that is not registered. For example, the endpoint is transferred in 898 // that is not registered. For example, the endpoint is transferred in
806 // a message that is discarded. Once we add support to specify all 899 // a message that is discarded. Once we add support to specify all
807 // enclosing endpoints in message header, we should be able to remove 900 // enclosing endpoints in message header, we should be able to remove
808 // this. 901 // this.
(...skipping 16 matching lines...) Expand all
825 if (endpoint->closed()) 918 if (endpoint->closed())
826 return true; 919 return true;
827 920
828 if (!endpoint->client()) { 921 if (!endpoint->client()) {
829 // We need to wait until a client is attached in order to dispatch further 922 // We need to wait until a client is attached in order to dispatch further
830 // messages. 923 // messages.
831 return false; 924 return false;
832 } 925 }
833 926
834 bool can_direct_call; 927 bool can_direct_call;
835 if (message->has_flag(Message::kFlagIsSync)) { 928 if (message_wrapper->value().has_flag(Message::kFlagIsSync)) {
836 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && 929 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
837 endpoint->task_runner()->BelongsToCurrentThread(); 930 endpoint->task_runner()->BelongsToCurrentThread();
838 } else { 931 } else {
839 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && 932 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
840 endpoint->task_runner() == current_task_runner; 933 endpoint->task_runner() == current_task_runner;
841 } 934 }
842 935
843 if (!can_direct_call) { 936 if (!can_direct_call) {
844 MaybePostToProcessTasks(endpoint->task_runner()); 937 MaybePostToProcessTasks(endpoint->task_runner());
845 return false; 938 return false;
846 } 939 }
847 940
848 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 941 DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
849 942
850 InterfaceEndpointClient* client = endpoint->client(); 943 InterfaceEndpointClient* client = endpoint->client();
851 bool result = false; 944 bool result = false;
852 { 945 {
853 // We must unlock before calling into |client| because it may call this 946 // We must unlock before calling into |client| because it may call this
854 // object within HandleIncomingMessage(). Holding the lock will lead to 947 // object within HandleIncomingMessage(). Holding the lock will lead to
855 // deadlock. 948 // deadlock.
856 // 949 //
857 // It is safe to call into |client| without the lock. Because |client| is 950 // It is safe to call into |client| without the lock. Because |client| is
858 // always accessed on the same thread, including DetachEndpointClient(). 951 // always accessed on the same thread, including DetachEndpointClient().
859 MayAutoUnlock unlocker(lock_.get()); 952 MayAutoUnlock unlocker(lock_.get());
860 result = client->HandleIncomingMessage(message); 953 Message message;
954 result = message_wrapper->TakeMessage(&message) &&
955 client->HandleIncomingMessage(&message);
861 } 956 }
862 if (!result) 957 if (!result)
863 RaiseErrorInNonTestingMode(); 958 RaiseErrorInNonTestingMode();
864 959
865 return true; 960 return true;
866 } 961 }
867 962
868 void MultiplexRouter::MaybePostToProcessTasks( 963 void MultiplexRouter::MaybePostToProcessTasks(
869 base::SingleThreadTaskRunner* task_runner) { 964 base::SingleThreadTaskRunner* task_runner) {
870 AssertLockAcquired(); 965 AssertLockAcquired();
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
935 1030
936 void MultiplexRouter::AssertLockAcquired() { 1031 void MultiplexRouter::AssertLockAcquired() {
937 #if DCHECK_IS_ON() 1032 #if DCHECK_IS_ON()
938 if (lock_) 1033 if (lock_)
939 lock_->AssertAcquired(); 1034 lock_->AssertAcquired();
940 #endif 1035 #endif
941 } 1036 }
942 1037
943 } // namespace internal 1038 } // namespace internal
944 } // namespace mojo 1039 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/pipe_control_message_handler.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698