Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(224)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2608163003: Change single-interface mojo bindings to use SequencedTaskRunner. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/macros.h" 13 #include "base/macros.h"
14 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
15 #include "base/single_thread_task_runner.h" 15 #include "base/sequenced_task_runner.h"
16 #include "base/stl_util.h" 16 #include "base/stl_util.h"
17 #include "base/threading/sequenced_task_runner_handle.h"
17 #include "base/threading/thread_task_runner_handle.h" 18 #include "base/threading/thread_task_runner_handle.h"
18 #include "mojo/public/cpp/bindings/associated_group.h" 19 #include "mojo/public/cpp/bindings/associated_group.h"
19 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 20 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
20 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 21 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
21 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" 22 #include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
22 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" 23 #include "mojo/public/cpp/bindings/sync_handle_watcher.h"
23 24
24 namespace mojo { 25 namespace mojo {
25 namespace internal { 26 namespace internal {
26 27
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
63 64
64 const base::Optional<DisconnectReason>& disconnect_reason() const { 65 const base::Optional<DisconnectReason>& disconnect_reason() const {
65 return disconnect_reason_; 66 return disconnect_reason_;
66 } 67 }
67 void set_disconnect_reason( 68 void set_disconnect_reason(
68 const base::Optional<DisconnectReason>& disconnect_reason) { 69 const base::Optional<DisconnectReason>& disconnect_reason) {
69 router_->AssertLockAcquired(); 70 router_->AssertLockAcquired();
70 disconnect_reason_ = disconnect_reason; 71 disconnect_reason_ = disconnect_reason;
71 } 72 }
72 73
73 base::SingleThreadTaskRunner* task_runner() const { 74 base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); }
74 return task_runner_.get();
75 }
76 75
77 InterfaceEndpointClient* client() const { return client_; } 76 InterfaceEndpointClient* client() const { return client_; }
78 77
79 void AttachClient(InterfaceEndpointClient* client, 78 void AttachClient(InterfaceEndpointClient* client,
80 scoped_refptr<base::SingleThreadTaskRunner> runner) { 79 scoped_refptr<base::SequencedTaskRunner> runner) {
81 router_->AssertLockAcquired(); 80 router_->AssertLockAcquired();
82 DCHECK(!client_); 81 DCHECK(!client_);
83 DCHECK(!closed_); 82 DCHECK(!closed_);
84 DCHECK(runner->BelongsToCurrentThread()); 83 DCHECK(runner->RunsTasksOnCurrentThread());
85 84
86 task_runner_ = std::move(runner); 85 task_runner_ = std::move(runner);
87 client_ = client; 86 client_ = client;
88 } 87 }
89 88
90 // This method must be called on the same thread as the corresponding 89 // This method must be called on the same thread as the corresponding
91 // AttachClient() call. 90 // AttachClient() call.
92 void DetachClient() { 91 void DetachClient() {
93 router_->AssertLockAcquired(); 92 router_->AssertLockAcquired();
94 DCHECK(client_); 93 DCHECK(client_);
95 DCHECK(task_runner_->BelongsToCurrentThread()); 94 DCHECK(task_runner_->RunsTasksOnCurrentThread());
96 DCHECK(!closed_); 95 DCHECK(!closed_);
97 96
98 task_runner_ = nullptr; 97 task_runner_ = nullptr;
99 client_ = nullptr; 98 client_ = nullptr;
100 sync_watcher_.reset(); 99 sync_watcher_.reset();
101 } 100 }
102 101
103 void SignalSyncMessageEvent() { 102 void SignalSyncMessageEvent() {
104 router_->AssertLockAcquired(); 103 router_->AssertLockAcquired();
105 if (event_signalled_) 104 if (event_signalled_)
(...skipping 20 matching lines...) Expand all
126 DCHECK_EQ(MOJO_RESULT_OK, result); 125 DCHECK_EQ(MOJO_RESULT_OK, result);
127 event_signalled_ = false; 126 event_signalled_ = false;
128 } 127 }
129 128
130 // --------------------------------------------------------------------------- 129 // ---------------------------------------------------------------------------
131 // The following public methods (i.e., InterfaceEndpointController 130 // The following public methods (i.e., InterfaceEndpointController
132 // implementation) are called by the client on the same thread as the 131 // implementation) are called by the client on the same thread as the
133 // AttachClient() call. They are called outside of the router's lock. 132 // AttachClient() call. They are called outside of the router's lock.
134 133
135 bool SendMessage(Message* message) override { 134 bool SendMessage(Message* message) override {
136 DCHECK(task_runner_->BelongsToCurrentThread()); 135 DCHECK(task_runner_->RunsTasksOnCurrentThread());
137 message->set_interface_id(id_); 136 message->set_interface_id(id_);
138 return router_->connector_.Accept(message); 137 return router_->connector_.Accept(message);
139 } 138 }
140 139
141 void AllowWokenUpBySyncWatchOnSameThread() override { 140 void AllowWokenUpBySyncWatchOnSameThread() override {
142 DCHECK(task_runner_->BelongsToCurrentThread()); 141 DCHECK(task_runner_->RunsTasksOnCurrentThread());
143 142
144 EnsureSyncWatcherExists(); 143 EnsureSyncWatcherExists();
145 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 144 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
146 } 145 }
147 146
148 bool SyncWatch(const bool* should_stop) override { 147 bool SyncWatch(const bool* should_stop) override {
149 DCHECK(task_runner_->BelongsToCurrentThread()); 148 DCHECK(task_runner_->RunsTasksOnCurrentThread());
150 149
151 EnsureSyncWatcherExists(); 150 EnsureSyncWatcherExists();
152 return sync_watcher_->SyncWatch(should_stop); 151 return sync_watcher_->SyncWatch(should_stop);
153 } 152 }
154 153
155 private: 154 private:
156 friend class base::RefCounted<InterfaceEndpoint>; 155 friend class base::RefCounted<InterfaceEndpoint>;
157 156
158 ~InterfaceEndpoint() override { 157 ~InterfaceEndpoint() override {
159 router_->AssertLockAcquired(); 158 router_->AssertLockAcquired();
160 159
161 DCHECK(!client_); 160 DCHECK(!client_);
162 DCHECK(closed_); 161 DCHECK(closed_);
163 DCHECK(peer_closed_); 162 DCHECK(peer_closed_);
164 DCHECK(!sync_watcher_); 163 DCHECK(!sync_watcher_);
165 } 164 }
166 165
167 void OnHandleReady(MojoResult result) { 166 void OnHandleReady(MojoResult result) {
168 DCHECK(task_runner_->BelongsToCurrentThread()); 167 DCHECK(task_runner_->RunsTasksOnCurrentThread());
169 scoped_refptr<InterfaceEndpoint> self_protector(this); 168 scoped_refptr<InterfaceEndpoint> self_protector(this);
170 scoped_refptr<MultiplexRouter> router_protector(router_); 169 scoped_refptr<MultiplexRouter> router_protector(router_);
171 170
172 // Because we never close |sync_message_event_{sender,receiver}_| before 171 // Because we never close |sync_message_event_{sender,receiver}_| before
173 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. 172 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
174 DCHECK_EQ(MOJO_RESULT_OK, result); 173 DCHECK_EQ(MOJO_RESULT_OK, result);
175 bool reset_sync_watcher = false; 174 bool reset_sync_watcher = false;
176 { 175 {
177 MayAutoLock locker(router_->lock_.get()); 176 MayAutoLock locker(router_->lock_.get());
178 177
179 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); 178 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
180 179
181 if (!more_to_process) 180 if (!more_to_process)
182 ResetSyncMessageSignal(); 181 ResetSyncMessageSignal();
183 182
184 // Currently there are no queued sync messages and the peer has closed so 183 // Currently there are no queued sync messages and the peer has closed so
185 // there won't be incoming sync messages in the future. 184 // there won't be incoming sync messages in the future.
186 reset_sync_watcher = !more_to_process && peer_closed_; 185 reset_sync_watcher = !more_to_process && peer_closed_;
187 } 186 }
188 if (reset_sync_watcher) { 187 if (reset_sync_watcher) {
189 // If a SyncWatch() call (or multiple ones) of this interface endpoint is 188 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
190 // on the call stack, resetting the sync watcher will allow it to exit 189 // on the call stack, resetting the sync watcher will allow it to exit
191 // when the call stack unwinds to that frame. 190 // when the call stack unwinds to that frame.
192 sync_watcher_.reset(); 191 sync_watcher_.reset();
193 } 192 }
194 } 193 }
195 194
196 void EnsureSyncWatcherExists() { 195 void EnsureSyncWatcherExists() {
197 DCHECK(task_runner_->BelongsToCurrentThread()); 196 DCHECK(task_runner_->RunsTasksOnCurrentThread());
198 if (sync_watcher_) 197 if (sync_watcher_)
199 return; 198 return;
200 199
201 { 200 {
202 MayAutoLock locker(router_->lock_.get()); 201 MayAutoLock locker(router_->lock_.get());
203 EnsureEventMessagePipeExists(); 202 EnsureEventMessagePipeExists();
204 203
205 auto iter = router_->sync_message_tasks_.find(id_); 204 auto iter = router_->sync_message_tasks_.find(id_);
206 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) 205 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
207 SignalSyncMessageEvent(); 206 SignalSyncMessageEvent();
(...skipping 25 matching lines...) Expand all
233 // The following members are accessed under the router's lock. 232 // The following members are accessed under the router's lock.
234 233
235 // Whether the endpoint has been closed. 234 // Whether the endpoint has been closed.
236 bool closed_; 235 bool closed_;
237 // Whether the peer endpoint has been closed. 236 // Whether the peer endpoint has been closed.
238 bool peer_closed_; 237 bool peer_closed_;
239 238
240 base::Optional<DisconnectReason> disconnect_reason_; 239 base::Optional<DisconnectReason> disconnect_reason_;
241 240
242 // The task runner on which |client_|'s methods can be called. 241 // The task runner on which |client_|'s methods can be called.
243 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 242 scoped_refptr<base::SequencedTaskRunner> task_runner_;
244 // Not owned. It is null if no client is attached to this endpoint. 243 // Not owned. It is null if no client is attached to this endpoint.
245 InterfaceEndpointClient* client_; 244 InterfaceEndpointClient* client_;
246 245
247 // A message pipe used as an event to signal that sync messages are available. 246 // A message pipe used as an event to signal that sync messages are available.
248 // The message pipe handles are initialized under the router's lock and remain 247 // The message pipe handles are initialized under the router's lock and remain
249 // unchanged afterwards. They may be accessed outside of the router's lock 248 // unchanged afterwards. They may be accessed outside of the router's lock
250 // later. 249 // later.
251 ScopedMessagePipeHandle sync_message_event_sender_; 250 ScopedMessagePipeHandle sync_message_event_sender_;
252 ScopedMessagePipeHandle sync_message_event_receiver_; 251 ScopedMessagePipeHandle sync_message_event_receiver_;
253 bool event_signalled_; 252 bool event_signalled_;
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
289 Type type; 288 Type type;
290 289
291 private: 290 private:
292 explicit Task(Type in_type) : type(in_type) {} 291 explicit Task(Type in_type) : type(in_type) {}
293 }; 292 };
294 293
295 MultiplexRouter::MultiplexRouter( 294 MultiplexRouter::MultiplexRouter(
296 ScopedMessagePipeHandle message_pipe, 295 ScopedMessagePipeHandle message_pipe,
297 Config config, 296 Config config,
298 bool set_interface_id_namesapce_bit, 297 bool set_interface_id_namesapce_bit,
299 scoped_refptr<base::SingleThreadTaskRunner> runner) 298 scoped_refptr<base::SequencedTaskRunner> runner)
300 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 299 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
301 task_runner_(runner), 300 task_runner_(runner),
302 header_validator_(nullptr), 301 header_validator_(nullptr),
303 filters_(this), 302 filters_(this),
304 connector_(std::move(message_pipe), 303 connector_(std::move(message_pipe),
305 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND 304 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
306 : Connector::SINGLE_THREADED_SEND, 305 : Connector::SINGLE_THREADED_SEND,
307 std::move(runner)), 306 std::move(runner)),
308 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr), 307 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr),
309 control_message_handler_(this), 308 control_message_handler_(this),
310 control_message_proxy_(&connector_), 309 control_message_proxy_(&connector_),
311 next_interface_id_value_(1), 310 next_interface_id_value_(1),
312 posted_to_process_tasks_(false), 311 posted_to_process_tasks_(false),
313 encountered_error_(false), 312 encountered_error_(false),
314 paused_(false), 313 paused_(false),
315 testing_mode_(false) { 314 testing_mode_(false) {
316 DCHECK(task_runner_->BelongsToCurrentThread()); 315 DCHECK(task_runner_->RunsTasksOnCurrentThread());
316 DCHECK(config == SINGLE_INTERFACE || base::ThreadTaskRunnerHandle::IsSet());
317 317
318 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || 318 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
319 config == MULTI_INTERFACE) { 319 config == MULTI_INTERFACE) {
320 // Always participate in sync handle watching in multi-interface mode, 320 // Always participate in sync handle watching in multi-interface mode,
321 // because even if it doesn't expect sync requests during sync handle 321 // because even if it doesn't expect sync requests during sync handle
322 // watching, it may still need to dispatch messages to associated endpoints 322 // watching, it may still need to dispatch messages to associated endpoints
323 // on a different thread. 323 // on a different thread.
324 connector_.AllowWokenUpBySyncWatchOnSameThread(); 324 connector_.AllowWokenUpBySyncWatchOnSameThread();
325 } 325 }
326 connector_.set_incoming_receiver(&filters_); 326 connector_.set_incoming_receiver(&filters_);
(...skipping 28 matching lines...) Expand all
355 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 355 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
356 } else { 356 } else {
357 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 357 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
358 } 358 }
359 } 359 }
360 360
361 DCHECK(endpoints_.empty()); 361 DCHECK(endpoints_.empty());
362 } 362 }
363 363
364 void MultiplexRouter::SetMasterInterfaceName(const char* name) { 364 void MultiplexRouter::SetMasterInterfaceName(const char* name) {
365 DCHECK(thread_checker_.CalledOnValidThread()); 365 DCHECK(sequence_checker_.CalledOnValidSequence());
366 header_validator_->SetDescription( 366 header_validator_->SetDescription(
367 std::string(name) + " [master] MessageHeaderValidator"); 367 std::string(name) + " [master] MessageHeaderValidator");
368 control_message_handler_.SetDescription( 368 control_message_handler_.SetDescription(
369 std::string(name) + " [master] PipeControlMessageHandler"); 369 std::string(name) + " [master] PipeControlMessageHandler");
370 connector_.SetWatcherHeapProfilerTag(name); 370 connector_.SetWatcherHeapProfilerTag(name);
371 } 371 }
372 372
373 void MultiplexRouter::CreateEndpointHandlePair( 373 void MultiplexRouter::CreateEndpointHandlePair(
374 ScopedInterfaceEndpointHandle* local_endpoint, 374 ScopedInterfaceEndpointHandle* local_endpoint,
375 ScopedInterfaceEndpointHandle* remote_endpoint) { 375 ScopedInterfaceEndpointHandle* remote_endpoint) {
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
442 MayAutoUnlock unlocker(lock_.get()); 442 MayAutoUnlock unlocker(lock_.get());
443 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); 443 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
444 } 444 }
445 445
446 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 446 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
447 } 447 }
448 448
449 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( 449 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
450 const ScopedInterfaceEndpointHandle& handle, 450 const ScopedInterfaceEndpointHandle& handle,
451 InterfaceEndpointClient* client, 451 InterfaceEndpointClient* client,
452 scoped_refptr<base::SingleThreadTaskRunner> runner) { 452 scoped_refptr<base::SequencedTaskRunner> runner) {
453 const InterfaceId id = handle.id(); 453 const InterfaceId id = handle.id();
454 454
455 DCHECK(IsValidInterfaceId(id)); 455 DCHECK(IsValidInterfaceId(id));
456 DCHECK(client); 456 DCHECK(client);
457 457
458 MayAutoLock locker(lock_.get()); 458 MayAutoLock locker(lock_.get());
459 DCHECK(base::ContainsKey(endpoints_, id)); 459 DCHECK(base::ContainsKey(endpoints_, id));
460 460
461 InterfaceEndpoint* endpoint = endpoints_[id].get(); 461 InterfaceEndpoint* endpoint = endpoints_[id].get();
462 endpoint->AttachClient(client, std::move(runner)); 462 endpoint->AttachClient(client, std::move(runner));
(...skipping 12 matching lines...) Expand all
475 DCHECK(IsValidInterfaceId(id)); 475 DCHECK(IsValidInterfaceId(id));
476 476
477 MayAutoLock locker(lock_.get()); 477 MayAutoLock locker(lock_.get());
478 DCHECK(base::ContainsKey(endpoints_, id)); 478 DCHECK(base::ContainsKey(endpoints_, id));
479 479
480 InterfaceEndpoint* endpoint = endpoints_[id].get(); 480 InterfaceEndpoint* endpoint = endpoints_[id].get();
481 endpoint->DetachClient(); 481 endpoint->DetachClient();
482 } 482 }
483 483
484 void MultiplexRouter::RaiseError() { 484 void MultiplexRouter::RaiseError() {
485 if (task_runner_->BelongsToCurrentThread()) { 485 if (task_runner_->RunsTasksOnCurrentThread()) {
486 connector_.RaiseError(); 486 connector_.RaiseError();
487 } else { 487 } else {
488 task_runner_->PostTask(FROM_HERE, 488 task_runner_->PostTask(FROM_HERE,
489 base::Bind(&MultiplexRouter::RaiseError, this)); 489 base::Bind(&MultiplexRouter::RaiseError, this));
490 } 490 }
491 } 491 }
492 492
493 void MultiplexRouter::CloseMessagePipe() { 493 void MultiplexRouter::CloseMessagePipe() {
494 DCHECK(thread_checker_.CalledOnValidThread()); 494 DCHECK(sequence_checker_.CalledOnValidSequence());
495 connector_.CloseMessagePipe(); 495 connector_.CloseMessagePipe();
496 // CloseMessagePipe() above won't trigger connection error handler. 496 // CloseMessagePipe() above won't trigger connection error handler.
497 // Explicitly call OnPipeConnectionError() so that associated endpoints will 497 // Explicitly call OnPipeConnectionError() so that associated endpoints will
498 // get notified. 498 // get notified.
499 OnPipeConnectionError(); 499 OnPipeConnectionError();
500 } 500 }
501 501
502 void MultiplexRouter::PauseIncomingMethodCallProcessing() { 502 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
503 DCHECK(thread_checker_.CalledOnValidThread()); 503 DCHECK(sequence_checker_.CalledOnValidSequence());
504 connector_.PauseIncomingMethodCallProcessing(); 504 connector_.PauseIncomingMethodCallProcessing();
505 505
506 MayAutoLock locker(lock_.get()); 506 MayAutoLock locker(lock_.get());
507 paused_ = true; 507 paused_ = true;
508 508
509 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) 509 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
510 iter->second->ResetSyncMessageSignal(); 510 iter->second->ResetSyncMessageSignal();
511 } 511 }
512 512
513 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { 513 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
514 DCHECK(thread_checker_.CalledOnValidThread()); 514 DCHECK(sequence_checker_.CalledOnValidSequence());
515 connector_.ResumeIncomingMethodCallProcessing(); 515 connector_.ResumeIncomingMethodCallProcessing();
516 516
517 MayAutoLock locker(lock_.get()); 517 MayAutoLock locker(lock_.get());
518 paused_ = false; 518 paused_ = false;
519 519
520 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { 520 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
521 auto sync_iter = sync_message_tasks_.find(iter->first); 521 auto sync_iter = sync_message_tasks_.find(iter->first);
522 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) 522 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty())
523 iter->second->SignalSyncMessageEvent(); 523 iter->second->SignalSyncMessageEvent();
524 } 524 }
525 525
526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
527 } 527 }
528 528
529 bool MultiplexRouter::HasAssociatedEndpoints() const { 529 bool MultiplexRouter::HasAssociatedEndpoints() const {
530 DCHECK(thread_checker_.CalledOnValidThread()); 530 DCHECK(sequence_checker_.CalledOnValidSequence());
531 MayAutoLock locker(lock_.get()); 531 MayAutoLock locker(lock_.get());
532 532
533 if (endpoints_.size() > 1) 533 if (endpoints_.size() > 1)
534 return true; 534 return true;
535 if (endpoints_.size() == 0) 535 if (endpoints_.size() == 0)
536 return false; 536 return false;
537 537
538 return !base::ContainsKey(endpoints_, kMasterInterfaceId); 538 return !base::ContainsKey(endpoints_, kMasterInterfaceId);
539 } 539 }
540 540
541 void MultiplexRouter::EnableTestingMode() { 541 void MultiplexRouter::EnableTestingMode() {
542 DCHECK(thread_checker_.CalledOnValidThread()); 542 DCHECK(sequence_checker_.CalledOnValidSequence());
543 MayAutoLock locker(lock_.get()); 543 MayAutoLock locker(lock_.get());
544 544
545 testing_mode_ = true; 545 testing_mode_ = true;
546 connector_.set_enforce_errors_from_incoming_receiver(false); 546 connector_.set_enforce_errors_from_incoming_receiver(false);
547 } 547 }
548 548
549 bool MultiplexRouter::Accept(Message* message) { 549 bool MultiplexRouter::Accept(Message* message) {
550 DCHECK(thread_checker_.CalledOnValidThread()); 550 DCHECK(sequence_checker_.CalledOnValidSequence());
551 551
552 scoped_refptr<MultiplexRouter> protector(this); 552 scoped_refptr<MultiplexRouter> protector(this);
553 MayAutoLock locker(lock_.get()); 553 MayAutoLock locker(lock_.get());
554 554
555 DCHECK(!paused_); 555 DCHECK(!paused_);
556 556
557 ClientCallBehavior client_call_behavior = 557 ClientCallBehavior client_call_behavior =
558 connector_.during_sync_handle_watcher_callback() 558 connector_.during_sync_handle_watcher_callback()
559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 559 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
560 : ALLOW_DIRECT_CLIENT_CALLS; 560 : ALLOW_DIRECT_CLIENT_CALLS;
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
627 DCHECK(!endpoint->closed()); 627 DCHECK(!endpoint->closed());
628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 628 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
629 629
630 MayAutoUnlock unlocker(lock_.get()); 630 MayAutoUnlock unlocker(lock_.get());
631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt); 631 control_message_proxy_.NotifyPeerEndpointClosed(id, base::nullopt);
632 632
633 return true; 633 return true;
634 } 634 }
635 635
636 void MultiplexRouter::OnPipeConnectionError() { 636 void MultiplexRouter::OnPipeConnectionError() {
637 DCHECK(thread_checker_.CalledOnValidThread()); 637 DCHECK(sequence_checker_.CalledOnValidSequence());
638 638
639 scoped_refptr<MultiplexRouter> protector(this); 639 scoped_refptr<MultiplexRouter> protector(this);
640 MayAutoLock locker(lock_.get()); 640 MayAutoLock locker(lock_.get());
641 641
642 encountered_error_ = true; 642 encountered_error_ = true;
643 643
644 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 644 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
645 InterfaceEndpoint* endpoint = iter->second.get(); 645 InterfaceEndpoint* endpoint = iter->second.get();
646 // Increment the iterator before calling UpdateEndpointStateMayRemove() 646 // Increment the iterator before calling UpdateEndpointStateMayRemove()
647 // because it may remove the corresponding value from the map. 647 // because it may remove the corresponding value from the map.
648 ++iter; 648 ++iter;
649 649
650 if (endpoint->client()) 650 if (endpoint->client())
651 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 651 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
652 652
653 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 653 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
654 } 654 }
655 655
656 ProcessTasks(connector_.during_sync_handle_watcher_callback() 656 ProcessTasks(connector_.during_sync_handle_watcher_callback()
657 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 657 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
658 : ALLOW_DIRECT_CLIENT_CALLS, 658 : ALLOW_DIRECT_CLIENT_CALLS,
659 connector_.task_runner()); 659 connector_.task_runner());
660 } 660 }
661 661
662 void MultiplexRouter::ProcessTasks( 662 void MultiplexRouter::ProcessTasks(
663 ClientCallBehavior client_call_behavior, 663 ClientCallBehavior client_call_behavior,
664 base::SingleThreadTaskRunner* current_task_runner) { 664 base::SequencedTaskRunner* current_task_runner) {
665 AssertLockAcquired(); 665 AssertLockAcquired();
666 666
667 if (posted_to_process_tasks_) 667 if (posted_to_process_tasks_)
668 return; 668 return;
669 669
670 while (!tasks_.empty() && !paused_) { 670 while (!tasks_.empty() && !paused_) {
671 std::unique_ptr<Task> task(std::move(tasks_.front())); 671 std::unique_ptr<Task> task(std::move(tasks_.front()));
672 tasks_.pop_front(); 672 tasks_.pop_front();
673 673
674 InterfaceId id = kInvalidInterfaceId; 674 InterfaceId id = kInvalidInterfaceId;
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
734 sync_message_tasks_.erase(iter); 734 sync_message_tasks_.erase(iter);
735 return false; 735 return false;
736 } 736 }
737 737
738 return true; 738 return true;
739 } 739 }
740 740
741 bool MultiplexRouter::ProcessNotifyErrorTask( 741 bool MultiplexRouter::ProcessNotifyErrorTask(
742 Task* task, 742 Task* task,
743 ClientCallBehavior client_call_behavior, 743 ClientCallBehavior client_call_behavior,
744 base::SingleThreadTaskRunner* current_task_runner) { 744 base::SequencedTaskRunner* current_task_runner) {
745 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 745 DCHECK(!current_task_runner ||
746 current_task_runner->RunsTasksOnCurrentThread());
746 DCHECK(!paused_); 747 DCHECK(!paused_);
747 748
748 AssertLockAcquired(); 749 AssertLockAcquired();
749 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); 750 InterfaceEndpoint* endpoint = task->endpoint_to_notify.get();
750 if (!endpoint->client()) 751 if (!endpoint->client())
751 return true; 752 return true;
752 753
753 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || 754 if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS ||
754 endpoint->task_runner() != current_task_runner) { 755 endpoint->task_runner() != current_task_runner) {
755 MaybePostToProcessTasks(endpoint->task_runner()); 756 MaybePostToProcessTasks(endpoint->task_runner());
756 return false; 757 return false;
757 } 758 }
758 759
759 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 760 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread());
760 761
761 InterfaceEndpointClient* client = endpoint->client(); 762 InterfaceEndpointClient* client = endpoint->client();
762 base::Optional<DisconnectReason> disconnect_reason( 763 base::Optional<DisconnectReason> disconnect_reason(
763 endpoint->disconnect_reason()); 764 endpoint->disconnect_reason());
764 765
765 { 766 {
766 // We must unlock before calling into |client| because it may call this 767 // We must unlock before calling into |client| because it may call this
767 // object within NotifyError(). Holding the lock will lead to deadlock. 768 // object within NotifyError(). Holding the lock will lead to deadlock.
768 // 769 //
769 // It is safe to call into |client| without the lock. Because |client| is 770 // It is safe to call into |client| without the lock. Because |client| is
770 // always accessed on the same thread, including DetachEndpointClient(). 771 // always accessed on the same thread, including DetachEndpointClient().
771 MayAutoUnlock unlocker(lock_.get()); 772 MayAutoUnlock unlocker(lock_.get());
772 client->NotifyError(disconnect_reason); 773 client->NotifyError(disconnect_reason);
773 } 774 }
774 return true; 775 return true;
775 } 776 }
776 777
777 bool MultiplexRouter::ProcessIncomingMessage( 778 bool MultiplexRouter::ProcessIncomingMessage(
778 Message* message, 779 Message* message,
779 ClientCallBehavior client_call_behavior, 780 ClientCallBehavior client_call_behavior,
780 base::SingleThreadTaskRunner* current_task_runner) { 781 base::SequencedTaskRunner* current_task_runner) {
781 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 782 DCHECK(!current_task_runner ||
783 current_task_runner->RunsTasksOnCurrentThread());
782 DCHECK(!paused_); 784 DCHECK(!paused_);
783 DCHECK(message); 785 DCHECK(message);
784 AssertLockAcquired(); 786 AssertLockAcquired();
785 787
786 if (message->IsNull()) { 788 if (message->IsNull()) {
787 // This is a sync message and has been processed during sync handle 789 // This is a sync message and has been processed during sync handle
788 // watching. 790 // watching.
789 return true; 791 return true;
790 } 792 }
791 793
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
827 829
828 if (!endpoint->client()) { 830 if (!endpoint->client()) {
829 // We need to wait until a client is attached in order to dispatch further 831 // We need to wait until a client is attached in order to dispatch further
830 // messages. 832 // messages.
831 return false; 833 return false;
832 } 834 }
833 835
834 bool can_direct_call; 836 bool can_direct_call;
835 if (message->has_flag(Message::kFlagIsSync)) { 837 if (message->has_flag(Message::kFlagIsSync)) {
836 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && 838 can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS &&
837 endpoint->task_runner()->BelongsToCurrentThread(); 839 endpoint->task_runner()->RunsTasksOnCurrentThread();
838 } else { 840 } else {
839 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && 841 can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS &&
840 endpoint->task_runner() == current_task_runner; 842 endpoint->task_runner() == current_task_runner;
841 } 843 }
842 844
843 if (!can_direct_call) { 845 if (!can_direct_call) {
844 MaybePostToProcessTasks(endpoint->task_runner()); 846 MaybePostToProcessTasks(endpoint->task_runner());
845 return false; 847 return false;
846 } 848 }
847 849
848 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 850 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread());
849 851
850 InterfaceEndpointClient* client = endpoint->client(); 852 InterfaceEndpointClient* client = endpoint->client();
851 bool result = false; 853 bool result = false;
852 { 854 {
853 // We must unlock before calling into |client| because it may call this 855 // We must unlock before calling into |client| because it may call this
854 // object within HandleIncomingMessage(). Holding the lock will lead to 856 // object within HandleIncomingMessage(). Holding the lock will lead to
855 // deadlock. 857 // deadlock.
856 // 858 //
857 // It is safe to call into |client| without the lock. Because |client| is 859 // It is safe to call into |client| without the lock. Because |client| is
858 // always accessed on the same thread, including DetachEndpointClient(). 860 // always accessed on the same thread, including DetachEndpointClient().
859 MayAutoUnlock unlocker(lock_.get()); 861 MayAutoUnlock unlocker(lock_.get());
860 result = client->HandleIncomingMessage(message); 862 result = client->HandleIncomingMessage(message);
861 } 863 }
862 if (!result) 864 if (!result)
863 RaiseErrorInNonTestingMode(); 865 RaiseErrorInNonTestingMode();
864 866
865 return true; 867 return true;
866 } 868 }
867 869
868 void MultiplexRouter::MaybePostToProcessTasks( 870 void MultiplexRouter::MaybePostToProcessTasks(
869 base::SingleThreadTaskRunner* task_runner) { 871 base::SequencedTaskRunner* task_runner) {
870 AssertLockAcquired(); 872 AssertLockAcquired();
871 if (posted_to_process_tasks_) 873 if (posted_to_process_tasks_)
872 return; 874 return;
873 875
874 posted_to_process_tasks_ = true; 876 posted_to_process_tasks_ = true;
875 posted_to_task_runner_ = task_runner; 877 posted_to_task_runner_ = task_runner;
876 task_runner->PostTask( 878 task_runner->PostTask(
877 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 879 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
878 } 880 }
879 881
880 void MultiplexRouter::LockAndCallProcessTasks() { 882 void MultiplexRouter::LockAndCallProcessTasks() {
881 // There is no need to hold a ref to this class in this case because this is 883 // There is no need to hold a ref to this class in this case because this is
882 // always called using base::Bind(), which holds a ref. 884 // always called using base::Bind(), which holds a ref.
883 MayAutoLock locker(lock_.get()); 885 MayAutoLock locker(lock_.get());
884 posted_to_process_tasks_ = false; 886 posted_to_process_tasks_ = false;
885 scoped_refptr<base::SingleThreadTaskRunner> runner( 887 scoped_refptr<base::SequencedTaskRunner> runner(
886 std::move(posted_to_task_runner_)); 888 std::move(posted_to_task_runner_));
887 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); 889 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
888 } 890 }
889 891
890 void MultiplexRouter::UpdateEndpointStateMayRemove( 892 void MultiplexRouter::UpdateEndpointStateMayRemove(
891 InterfaceEndpoint* endpoint, 893 InterfaceEndpoint* endpoint,
892 EndpointStateUpdateType type) { 894 EndpointStateUpdateType type) {
893 switch (type) { 895 switch (type) {
894 case ENDPOINT_CLOSED: 896 case ENDPOINT_CLOSED:
895 endpoint->set_closed(); 897 endpoint->set_closed();
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
935 937
936 void MultiplexRouter::AssertLockAcquired() { 938 void MultiplexRouter::AssertLockAcquired() {
937 #if DCHECK_IS_ON() 939 #if DCHECK_IS_ON()
938 if (lock_) 940 if (lock_)
939 lock_->AssertAcquired(); 941 lock_->AssertAcquired();
940 #endif 942 #endif
941 } 943 }
942 944
943 } // namespace internal 945 } // namespace internal
944 } // namespace mojo 946 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/tests/BUILD.gn » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698