Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(526)

Side by Side Diff: mojo/public/cpp/bindings/lib/multiplex_router.cc

Issue 2707483002: Mojo C++ bindings: change some std::unique_ptr<base::Lock> to base::Optional<base::Lock>. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h" 5 #include "mojo/public/cpp/bindings/lib/multiplex_router.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <utility> 9 #include <utility>
10 10
(...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after
171 } 171 }
172 172
173 void OnHandleReady(MojoResult result) { 173 void OnHandleReady(MojoResult result) {
174 DCHECK(task_runner_->BelongsToCurrentThread()); 174 DCHECK(task_runner_->BelongsToCurrentThread());
175 scoped_refptr<MultiplexRouter> router_protector(router_); 175 scoped_refptr<MultiplexRouter> router_protector(router_);
176 176
177 // Because we never close |sync_message_event_{sender,receiver}_| before 177 // Because we never close |sync_message_event_{sender,receiver}_| before
178 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK. 178 // destruction or set a deadline, |result| should always be MOJO_RESULT_OK.
179 DCHECK_EQ(MOJO_RESULT_OK, result); 179 DCHECK_EQ(MOJO_RESULT_OK, result);
180 180
181 MayAutoLock locker(router_->lock_.get()); 181 MayAutoLock locker(&router_->lock_);
182 scoped_refptr<InterfaceEndpoint> self_protector(this); 182 scoped_refptr<InterfaceEndpoint> self_protector(this);
183 183
184 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); 184 bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_);
185 185
186 if (!more_to_process) 186 if (!more_to_process)
187 ResetSyncMessageSignal(); 187 ResetSyncMessageSignal();
188 188
189 // Currently there are no queued sync messages and the peer has closed so 189 // Currently there are no queued sync messages and the peer has closed so
190 // there won't be incoming sync messages in the future. 190 // there won't be incoming sync messages in the future.
191 if (!more_to_process && peer_closed_) { 191 if (!more_to_process && peer_closed_) {
192 // If a SyncWatch() call (or multiple ones) of this interface endpoint is 192 // If a SyncWatch() call (or multiple ones) of this interface endpoint is
193 // on the call stack, resetting the sync watcher will allow it to exit 193 // on the call stack, resetting the sync watcher will allow it to exit
194 // when the call stack unwinds to that frame. 194 // when the call stack unwinds to that frame.
195 sync_watcher_.reset(); 195 sync_watcher_.reset();
196 } 196 }
197 } 197 }
198 198
199 void EnsureSyncWatcherExists() { 199 void EnsureSyncWatcherExists() {
200 DCHECK(task_runner_->BelongsToCurrentThread()); 200 DCHECK(task_runner_->BelongsToCurrentThread());
201 if (sync_watcher_) 201 if (sync_watcher_)
202 return; 202 return;
203 203
204 { 204 {
205 MayAutoLock locker(router_->lock_.get()); 205 MayAutoLock locker(&router_->lock_);
206 EnsureEventMessagePipeExists(); 206 EnsureEventMessagePipeExists();
207 207
208 auto iter = router_->sync_message_tasks_.find(id_); 208 auto iter = router_->sync_message_tasks_.find(id_);
209 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty()) 209 if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
210 SignalSyncMessageEvent(); 210 SignalSyncMessageEvent();
211 } 211 }
212 212
213 sync_watcher_.reset(new SyncHandleWatcher( 213 sync_watcher_.reset(new SyncHandleWatcher(
214 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE, 214 sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
215 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this)))); 215 base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
(...skipping 66 matching lines...) Expand 10 before | Expand all | Expand 10 after
282 282
283 MessageWrapper(MessageWrapper&& other) 283 MessageWrapper(MessageWrapper&& other)
284 : router_(other.router_), value_(std::move(other.value_)) {} 284 : router_(other.router_), value_(std::move(other.value_)) {}
285 285
286 ~MessageWrapper() { 286 ~MessageWrapper() {
287 if (value_.associated_endpoint_handles()->empty()) 287 if (value_.associated_endpoint_handles()->empty())
288 return; 288 return;
289 289
290 router_->AssertLockAcquired(); 290 router_->AssertLockAcquired();
291 { 291 {
292 MayAutoUnlock unlocker(router_->lock_.get()); 292 MayAutoUnlock unlocker(&router_->lock_);
293 value_.mutable_associated_endpoint_handles()->clear(); 293 value_.mutable_associated_endpoint_handles()->clear();
294 } 294 }
295 } 295 }
296 296
297 MessageWrapper& operator=(MessageWrapper&& other) { 297 MessageWrapper& operator=(MessageWrapper&& other) {
298 router_ = other.router_; 298 router_ = other.router_;
299 value_ = std::move(other.value_); 299 value_ = std::move(other.value_);
300 return *this; 300 return *this;
301 } 301 }
302 302
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
348 bool set_interface_id_namesapce_bit, 348 bool set_interface_id_namesapce_bit,
349 scoped_refptr<base::SingleThreadTaskRunner> runner) 349 scoped_refptr<base::SingleThreadTaskRunner> runner)
350 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), 350 : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit),
351 task_runner_(runner), 351 task_runner_(runner),
352 header_validator_(nullptr), 352 header_validator_(nullptr),
353 filters_(this), 353 filters_(this),
354 connector_(std::move(message_pipe), 354 connector_(std::move(message_pipe),
355 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND 355 config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND
356 : Connector::SINGLE_THREADED_SEND, 356 : Connector::SINGLE_THREADED_SEND,
357 std::move(runner)), 357 std::move(runner)),
358 lock_(config == MULTI_INTERFACE ? new base::Lock : nullptr),
359 control_message_handler_(this), 358 control_message_handler_(this),
360 control_message_proxy_(&connector_), 359 control_message_proxy_(&connector_),
361 next_interface_id_value_(1), 360 next_interface_id_value_(1),
362 posted_to_process_tasks_(false), 361 posted_to_process_tasks_(false),
363 encountered_error_(false), 362 encountered_error_(false),
364 paused_(false), 363 paused_(false),
365 testing_mode_(false) { 364 testing_mode_(false) {
366 DCHECK(task_runner_->BelongsToCurrentThread()); 365 DCHECK(task_runner_->BelongsToCurrentThread());
367 366
367 if (config == MULTI_INTERFACE)
368 lock_.emplace();
369
368 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || 370 if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS ||
369 config == MULTI_INTERFACE) { 371 config == MULTI_INTERFACE) {
370 // Always participate in sync handle watching in multi-interface mode, 372 // Always participate in sync handle watching in multi-interface mode,
371 // because even if it doesn't expect sync requests during sync handle 373 // because even if it doesn't expect sync requests during sync handle
372 // watching, it may still need to dispatch messages to associated endpoints 374 // watching, it may still need to dispatch messages to associated endpoints
373 // on a different thread. 375 // on a different thread.
374 connector_.AllowWokenUpBySyncWatchOnSameThread(); 376 connector_.AllowWokenUpBySyncWatchOnSameThread();
375 } 377 }
376 connector_.set_incoming_receiver(&filters_); 378 connector_.set_incoming_receiver(&filters_);
377 connector_.set_connection_error_handler( 379 connector_.set_connection_error_handler(
378 base::Bind(&MultiplexRouter::OnPipeConnectionError, 380 base::Bind(&MultiplexRouter::OnPipeConnectionError,
379 base::Unretained(this))); 381 base::Unretained(this)));
380 382
381 std::unique_ptr<MessageHeaderValidator> header_validator = 383 std::unique_ptr<MessageHeaderValidator> header_validator =
382 base::MakeUnique<MessageHeaderValidator>(); 384 base::MakeUnique<MessageHeaderValidator>();
383 header_validator_ = header_validator.get(); 385 header_validator_ = header_validator.get();
384 filters_.Append(std::move(header_validator)); 386 filters_.Append(std::move(header_validator));
385 } 387 }
386 388
387 MultiplexRouter::~MultiplexRouter() { 389 MultiplexRouter::~MultiplexRouter() {
388 MayAutoLock locker(lock_.get()); 390 MayAutoLock locker(&lock_);
389 391
390 sync_message_tasks_.clear(); 392 sync_message_tasks_.clear();
391 tasks_.clear(); 393 tasks_.clear();
392 394
393 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 395 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
394 InterfaceEndpoint* endpoint = iter->second.get(); 396 InterfaceEndpoint* endpoint = iter->second.get();
395 // Increment the iterator before calling UpdateEndpointStateMayRemove() 397 // Increment the iterator before calling UpdateEndpointStateMayRemove()
396 // because it may remove the corresponding value from the map. 398 // because it may remove the corresponding value from the map.
397 ++iter; 399 ++iter;
398 400
(...skipping 20 matching lines...) Expand all
419 connector_.SetWatcherHeapProfilerTag(name); 421 connector_.SetWatcherHeapProfilerTag(name);
420 } 422 }
421 423
422 InterfaceId MultiplexRouter::AssociateInterface( 424 InterfaceId MultiplexRouter::AssociateInterface(
423 ScopedInterfaceEndpointHandle handle_to_send) { 425 ScopedInterfaceEndpointHandle handle_to_send) {
424 if (!handle_to_send.pending_association()) 426 if (!handle_to_send.pending_association())
425 return kInvalidInterfaceId; 427 return kInvalidInterfaceId;
426 428
427 uint32_t id = 0; 429 uint32_t id = 0;
428 { 430 {
429 MayAutoLock locker(lock_.get()); 431 MayAutoLock locker(&lock_);
430 do { 432 do {
431 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) 433 if (next_interface_id_value_ >= kInterfaceIdNamespaceMask)
432 next_interface_id_value_ = 1; 434 next_interface_id_value_ = 1;
433 id = next_interface_id_value_++; 435 id = next_interface_id_value_++;
434 if (set_interface_id_namespace_bit_) 436 if (set_interface_id_namespace_bit_)
435 id |= kInterfaceIdNamespaceMask; 437 id |= kInterfaceIdNamespaceMask;
436 } while (base::ContainsKey(endpoints_, id)); 438 } while (base::ContainsKey(endpoints_, id));
437 439
438 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); 440 InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id);
439 endpoints_[id] = endpoint; 441 endpoints_[id] = endpoint;
440 if (encountered_error_) 442 if (encountered_error_)
441 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 443 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
442 endpoint->set_handle_created(); 444 endpoint->set_handle_created();
443 } 445 }
444 446
445 if (!NotifyAssociation(&handle_to_send, id)) { 447 if (!NotifyAssociation(&handle_to_send, id)) {
446 // The peer handle of |handle_to_send|, which is supposed to join this 448 // The peer handle of |handle_to_send|, which is supposed to join this
447 // associated group, has been closed. 449 // associated group, has been closed.
448 { 450 {
449 MayAutoLock locker(lock_.get()); 451 MayAutoLock locker(&lock_);
450 InterfaceEndpoint* endpoint = FindEndpoint(id); 452 InterfaceEndpoint* endpoint = FindEndpoint(id);
451 if (endpoint) 453 if (endpoint)
452 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 454 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
453 } 455 }
454 456
455 control_message_proxy_.NotifyPeerEndpointClosed( 457 control_message_proxy_.NotifyPeerEndpointClosed(
456 id, handle_to_send.disconnect_reason()); 458 id, handle_to_send.disconnect_reason());
457 } 459 }
458 return id; 460 return id;
459 } 461 }
460 462
461 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( 463 ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle(
462 InterfaceId id) { 464 InterfaceId id) {
463 if (!IsValidInterfaceId(id)) 465 if (!IsValidInterfaceId(id))
464 return ScopedInterfaceEndpointHandle(); 466 return ScopedInterfaceEndpointHandle();
465 467
466 MayAutoLock locker(lock_.get()); 468 MayAutoLock locker(&lock_);
467 bool inserted = false; 469 bool inserted = false;
468 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); 470 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted);
469 if (inserted) { 471 if (inserted) {
470 DCHECK(!endpoint->handle_created()); 472 DCHECK(!endpoint->handle_created());
471 473
472 if (encountered_error_) 474 if (encountered_error_)
473 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 475 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
474 } else { 476 } else {
475 // If the endpoint already exist, it is because we have received a 477 // If the endpoint already exist, it is because we have received a
476 // notification that the peer endpoint has closed. 478 // notification that the peer endpoint has closed.
477 CHECK(!endpoint->closed()); 479 CHECK(!endpoint->closed());
478 CHECK(endpoint->peer_closed()); 480 CHECK(endpoint->peer_closed());
479 481
480 if (endpoint->handle_created()) 482 if (endpoint->handle_created())
481 return ScopedInterfaceEndpointHandle(); 483 return ScopedInterfaceEndpointHandle();
482 } 484 }
483 485
484 endpoint->set_handle_created(); 486 endpoint->set_handle_created();
485 return CreateScopedInterfaceEndpointHandle(id); 487 return CreateScopedInterfaceEndpointHandle(id);
486 } 488 }
487 489
488 void MultiplexRouter::CloseEndpointHandle( 490 void MultiplexRouter::CloseEndpointHandle(
489 InterfaceId id, 491 InterfaceId id,
490 const base::Optional<DisconnectReason>& reason) { 492 const base::Optional<DisconnectReason>& reason) {
491 if (!IsValidInterfaceId(id)) 493 if (!IsValidInterfaceId(id))
492 return; 494 return;
493 495
494 MayAutoLock locker(lock_.get()); 496 MayAutoLock locker(&lock_);
495 DCHECK(base::ContainsKey(endpoints_, id)); 497 DCHECK(base::ContainsKey(endpoints_, id));
496 InterfaceEndpoint* endpoint = endpoints_[id].get(); 498 InterfaceEndpoint* endpoint = endpoints_[id].get();
497 DCHECK(!endpoint->client()); 499 DCHECK(!endpoint->client());
498 DCHECK(!endpoint->closed()); 500 DCHECK(!endpoint->closed());
499 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); 501 UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED);
500 502
501 if (!IsMasterInterfaceId(id) || reason) { 503 if (!IsMasterInterfaceId(id) || reason) {
502 MayAutoUnlock unlocker(lock_.get()); 504 MayAutoUnlock unlocker(&lock_);
503 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); 505 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
504 } 506 }
505 507
506 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 508 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
507 } 509 }
508 510
509 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( 511 InterfaceEndpointController* MultiplexRouter::AttachEndpointClient(
510 const ScopedInterfaceEndpointHandle& handle, 512 const ScopedInterfaceEndpointHandle& handle,
511 InterfaceEndpointClient* client, 513 InterfaceEndpointClient* client,
512 scoped_refptr<base::SingleThreadTaskRunner> runner) { 514 scoped_refptr<base::SingleThreadTaskRunner> runner) {
513 const InterfaceId id = handle.id(); 515 const InterfaceId id = handle.id();
514 516
515 DCHECK(IsValidInterfaceId(id)); 517 DCHECK(IsValidInterfaceId(id));
516 DCHECK(client); 518 DCHECK(client);
517 519
518 MayAutoLock locker(lock_.get()); 520 MayAutoLock locker(&lock_);
519 DCHECK(base::ContainsKey(endpoints_, id)); 521 DCHECK(base::ContainsKey(endpoints_, id));
520 522
521 InterfaceEndpoint* endpoint = endpoints_[id].get(); 523 InterfaceEndpoint* endpoint = endpoints_[id].get();
522 endpoint->AttachClient(client, std::move(runner)); 524 endpoint->AttachClient(client, std::move(runner));
523 525
524 if (endpoint->peer_closed()) 526 if (endpoint->peer_closed())
525 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 527 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
526 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 528 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
527 529
528 return endpoint; 530 return endpoint;
529 } 531 }
530 532
531 void MultiplexRouter::DetachEndpointClient( 533 void MultiplexRouter::DetachEndpointClient(
532 const ScopedInterfaceEndpointHandle& handle) { 534 const ScopedInterfaceEndpointHandle& handle) {
533 const InterfaceId id = handle.id(); 535 const InterfaceId id = handle.id();
534 536
535 DCHECK(IsValidInterfaceId(id)); 537 DCHECK(IsValidInterfaceId(id));
536 538
537 MayAutoLock locker(lock_.get()); 539 MayAutoLock locker(&lock_);
538 DCHECK(base::ContainsKey(endpoints_, id)); 540 DCHECK(base::ContainsKey(endpoints_, id));
539 541
540 InterfaceEndpoint* endpoint = endpoints_[id].get(); 542 InterfaceEndpoint* endpoint = endpoints_[id].get();
541 endpoint->DetachClient(); 543 endpoint->DetachClient();
542 } 544 }
543 545
544 void MultiplexRouter::RaiseError() { 546 void MultiplexRouter::RaiseError() {
545 if (task_runner_->BelongsToCurrentThread()) { 547 if (task_runner_->BelongsToCurrentThread()) {
546 connector_.RaiseError(); 548 connector_.RaiseError();
547 } else { 549 } else {
548 task_runner_->PostTask(FROM_HERE, 550 task_runner_->PostTask(FROM_HERE,
549 base::Bind(&MultiplexRouter::RaiseError, this)); 551 base::Bind(&MultiplexRouter::RaiseError, this));
550 } 552 }
551 } 553 }
552 554
553 void MultiplexRouter::CloseMessagePipe() { 555 void MultiplexRouter::CloseMessagePipe() {
554 DCHECK(thread_checker_.CalledOnValidThread()); 556 DCHECK(thread_checker_.CalledOnValidThread());
555 connector_.CloseMessagePipe(); 557 connector_.CloseMessagePipe();
556 // CloseMessagePipe() above won't trigger connection error handler. 558 // CloseMessagePipe() above won't trigger connection error handler.
557 // Explicitly call OnPipeConnectionError() so that associated endpoints will 559 // Explicitly call OnPipeConnectionError() so that associated endpoints will
558 // get notified. 560 // get notified.
559 OnPipeConnectionError(); 561 OnPipeConnectionError();
560 } 562 }
561 563
562 void MultiplexRouter::PauseIncomingMethodCallProcessing() { 564 void MultiplexRouter::PauseIncomingMethodCallProcessing() {
563 DCHECK(thread_checker_.CalledOnValidThread()); 565 DCHECK(thread_checker_.CalledOnValidThread());
564 connector_.PauseIncomingMethodCallProcessing(); 566 connector_.PauseIncomingMethodCallProcessing();
565 567
566 MayAutoLock locker(lock_.get()); 568 MayAutoLock locker(&lock_);
567 paused_ = true; 569 paused_ = true;
568 570
569 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) 571 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter)
570 iter->second->ResetSyncMessageSignal(); 572 iter->second->ResetSyncMessageSignal();
571 } 573 }
572 574
573 void MultiplexRouter::ResumeIncomingMethodCallProcessing() { 575 void MultiplexRouter::ResumeIncomingMethodCallProcessing() {
574 DCHECK(thread_checker_.CalledOnValidThread()); 576 DCHECK(thread_checker_.CalledOnValidThread());
575 connector_.ResumeIncomingMethodCallProcessing(); 577 connector_.ResumeIncomingMethodCallProcessing();
576 578
577 MayAutoLock locker(lock_.get()); 579 MayAutoLock locker(&lock_);
578 paused_ = false; 580 paused_ = false;
579 581
580 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { 582 for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
581 auto sync_iter = sync_message_tasks_.find(iter->first); 583 auto sync_iter = sync_message_tasks_.find(iter->first);
582 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty()) 584 if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty())
583 iter->second->SignalSyncMessageEvent(); 585 iter->second->SignalSyncMessageEvent();
584 } 586 }
585 587
586 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); 588 ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
587 } 589 }
588 590
589 bool MultiplexRouter::HasAssociatedEndpoints() const { 591 bool MultiplexRouter::HasAssociatedEndpoints() const {
590 DCHECK(thread_checker_.CalledOnValidThread()); 592 DCHECK(thread_checker_.CalledOnValidThread());
591 MayAutoLock locker(lock_.get()); 593 MayAutoLock locker(&lock_);
592 594
593 if (endpoints_.size() > 1) 595 if (endpoints_.size() > 1)
594 return true; 596 return true;
595 if (endpoints_.size() == 0) 597 if (endpoints_.size() == 0)
596 return false; 598 return false;
597 599
598 return !base::ContainsKey(endpoints_, kMasterInterfaceId); 600 return !base::ContainsKey(endpoints_, kMasterInterfaceId);
599 } 601 }
600 602
601 void MultiplexRouter::EnableTestingMode() { 603 void MultiplexRouter::EnableTestingMode() {
602 DCHECK(thread_checker_.CalledOnValidThread()); 604 DCHECK(thread_checker_.CalledOnValidThread());
603 MayAutoLock locker(lock_.get()); 605 MayAutoLock locker(&lock_);
604 606
605 testing_mode_ = true; 607 testing_mode_ = true;
606 connector_.set_enforce_errors_from_incoming_receiver(false); 608 connector_.set_enforce_errors_from_incoming_receiver(false);
607 } 609 }
608 610
609 bool MultiplexRouter::Accept(Message* message) { 611 bool MultiplexRouter::Accept(Message* message) {
610 DCHECK(thread_checker_.CalledOnValidThread()); 612 DCHECK(thread_checker_.CalledOnValidThread());
611 613
612 if (!message->DeserializeAssociatedEndpointHandles(this)) 614 if (!message->DeserializeAssociatedEndpointHandles(this))
613 return false; 615 return false;
614 616
615 scoped_refptr<MultiplexRouter> protector(this); 617 scoped_refptr<MultiplexRouter> protector(this);
616 MayAutoLock locker(lock_.get()); 618 MayAutoLock locker(&lock_);
617 619
618 DCHECK(!paused_); 620 DCHECK(!paused_);
619 621
620 ClientCallBehavior client_call_behavior = 622 ClientCallBehavior client_call_behavior =
621 connector_.during_sync_handle_watcher_callback() 623 connector_.during_sync_handle_watcher_callback()
622 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES 624 ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES
623 : ALLOW_DIRECT_CLIENT_CALLS; 625 : ALLOW_DIRECT_CLIENT_CALLS;
624 626
625 bool processed = 627 bool processed =
626 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior, 628 tasks_.empty() && ProcessIncomingMessage(message, client_call_behavior,
(...skipping 23 matching lines...) Expand all
650 // Always return true. If we see errors during message processing, we will 652 // Always return true. If we see errors during message processing, we will
651 // explicitly call Connector::RaiseError() to disconnect the message pipe. 653 // explicitly call Connector::RaiseError() to disconnect the message pipe.
652 return true; 654 return true;
653 } 655 }
654 656
655 bool MultiplexRouter::OnPeerAssociatedEndpointClosed( 657 bool MultiplexRouter::OnPeerAssociatedEndpointClosed(
656 InterfaceId id, 658 InterfaceId id,
657 const base::Optional<DisconnectReason>& reason) { 659 const base::Optional<DisconnectReason>& reason) {
658 DCHECK(!IsMasterInterfaceId(id) || reason); 660 DCHECK(!IsMasterInterfaceId(id) || reason);
659 661
660 MayAutoLock locker(lock_.get()); 662 MayAutoLock locker(&lock_);
661 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); 663 InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr);
662 664
663 if (reason) 665 if (reason)
664 endpoint->set_disconnect_reason(reason); 666 endpoint->set_disconnect_reason(reason);
665 667
666 // It is possible that this endpoint has been set as peer closed. That is 668 // It is possible that this endpoint has been set as peer closed. That is
667 // because when the message pipe is closed, all the endpoints are updated with 669 // because when the message pipe is closed, all the endpoints are updated with
668 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, 670 // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue,
669 // as long as there are refs keeping the router alive. If there is a 671 // as long as there are refs keeping the router alive. If there is a
670 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get 672 // PeerAssociatedEndpointClosedEvent control message in the queue, we will get
671 // here and see that the endpoint has been marked as peer closed. 673 // here and see that the endpoint has been marked as peer closed.
672 if (!endpoint->peer_closed()) { 674 if (!endpoint->peer_closed()) {
673 if (endpoint->client()) 675 if (endpoint->client())
674 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); 676 tasks_.push_back(Task::CreateNotifyErrorTask(endpoint));
675 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); 677 UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED);
676 } 678 }
677 679
678 // No need to trigger a ProcessTasks() because it is already on the stack. 680 // No need to trigger a ProcessTasks() because it is already on the stack.
679 681
680 return true; 682 return true;
681 } 683 }
682 684
683 void MultiplexRouter::OnPipeConnectionError() { 685 void MultiplexRouter::OnPipeConnectionError() {
684 DCHECK(thread_checker_.CalledOnValidThread()); 686 DCHECK(thread_checker_.CalledOnValidThread());
685 687
686 scoped_refptr<MultiplexRouter> protector(this); 688 scoped_refptr<MultiplexRouter> protector(this);
687 MayAutoLock locker(lock_.get()); 689 MayAutoLock locker(&lock_);
688 690
689 encountered_error_ = true; 691 encountered_error_ = true;
690 692
691 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) { 693 for (auto iter = endpoints_.begin(); iter != endpoints_.end();) {
692 InterfaceEndpoint* endpoint = iter->second.get(); 694 InterfaceEndpoint* endpoint = iter->second.get();
693 // Increment the iterator before calling UpdateEndpointStateMayRemove() 695 // Increment the iterator before calling UpdateEndpointStateMayRemove()
694 // because it may remove the corresponding value from the map. 696 // because it may remove the corresponding value from the map.
695 ++iter; 697 ++iter;
696 698
697 if (endpoint->client()) 699 if (endpoint->client())
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after
810 InterfaceEndpointClient* client = endpoint->client(); 812 InterfaceEndpointClient* client = endpoint->client();
811 base::Optional<DisconnectReason> disconnect_reason( 813 base::Optional<DisconnectReason> disconnect_reason(
812 endpoint->disconnect_reason()); 814 endpoint->disconnect_reason());
813 815
814 { 816 {
815 // We must unlock before calling into |client| because it may call this 817 // We must unlock before calling into |client| because it may call this
816 // object within NotifyError(). Holding the lock will lead to deadlock. 818 // object within NotifyError(). Holding the lock will lead to deadlock.
817 // 819 //
818 // It is safe to call into |client| without the lock. Because |client| is 820 // It is safe to call into |client| without the lock. Because |client| is
819 // always accessed on the same thread, including DetachEndpointClient(). 821 // always accessed on the same thread, including DetachEndpointClient().
820 MayAutoUnlock unlocker(lock_.get()); 822 MayAutoUnlock unlocker(&lock_);
821 client->NotifyError(disconnect_reason); 823 client->NotifyError(disconnect_reason);
822 } 824 }
823 return true; 825 return true;
824 } 826 }
825 827
826 bool MultiplexRouter::ProcessIncomingMessage( 828 bool MultiplexRouter::ProcessIncomingMessage(
827 Message* message, 829 Message* message,
828 ClientCallBehavior client_call_behavior, 830 ClientCallBehavior client_call_behavior,
829 base::SingleThreadTaskRunner* current_task_runner) { 831 base::SingleThreadTaskRunner* current_task_runner) {
830 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread()); 832 DCHECK(!current_task_runner || current_task_runner->BelongsToCurrentThread());
831 DCHECK(!paused_); 833 DCHECK(!paused_);
832 DCHECK(message); 834 DCHECK(message);
833 AssertLockAcquired(); 835 AssertLockAcquired();
834 836
835 if (message->IsNull()) { 837 if (message->IsNull()) {
836 // This is a sync message and has been processed during sync handle 838 // This is a sync message and has been processed during sync handle
837 // watching. 839 // watching.
838 return true; 840 return true;
839 } 841 }
840 842
841 if (PipeControlMessageHandler::IsPipeControlMessage(message)) { 843 if (PipeControlMessageHandler::IsPipeControlMessage(message)) {
842 bool result = false; 844 bool result = false;
843 845
844 { 846 {
845 MayAutoUnlock unlocker(lock_.get()); 847 MayAutoUnlock unlocker(&lock_);
846 result = control_message_handler_.Accept(message); 848 result = control_message_handler_.Accept(message);
847 } 849 }
848 850
849 if (!result) 851 if (!result)
850 RaiseErrorInNonTestingMode(); 852 RaiseErrorInNonTestingMode();
851 853
852 return true; 854 return true;
853 } 855 }
854 856
855 InterfaceId id = message->interface_id(); 857 InterfaceId id = message->interface_id();
(...skipping 27 matching lines...) Expand all
883 885
884 InterfaceEndpointClient* client = endpoint->client(); 886 InterfaceEndpointClient* client = endpoint->client();
885 bool result = false; 887 bool result = false;
886 { 888 {
887 // We must unlock before calling into |client| because it may call this 889 // We must unlock before calling into |client| because it may call this
888 // object within HandleIncomingMessage(). Holding the lock will lead to 890 // object within HandleIncomingMessage(). Holding the lock will lead to
889 // deadlock. 891 // deadlock.
890 // 892 //
891 // It is safe to call into |client| without the lock. Because |client| is 893 // It is safe to call into |client| without the lock. Because |client| is
892 // always accessed on the same thread, including DetachEndpointClient(). 894 // always accessed on the same thread, including DetachEndpointClient().
893 MayAutoUnlock unlocker(lock_.get()); 895 MayAutoUnlock unlocker(&lock_);
894 result = client->HandleIncomingMessage(message); 896 result = client->HandleIncomingMessage(message);
895 } 897 }
896 if (!result) 898 if (!result)
897 RaiseErrorInNonTestingMode(); 899 RaiseErrorInNonTestingMode();
898 900
899 return true; 901 return true;
900 } 902 }
901 903
902 void MultiplexRouter::MaybePostToProcessTasks( 904 void MultiplexRouter::MaybePostToProcessTasks(
903 base::SingleThreadTaskRunner* task_runner) { 905 base::SingleThreadTaskRunner* task_runner) {
904 AssertLockAcquired(); 906 AssertLockAcquired();
905 if (posted_to_process_tasks_) 907 if (posted_to_process_tasks_)
906 return; 908 return;
907 909
908 posted_to_process_tasks_ = true; 910 posted_to_process_tasks_ = true;
909 posted_to_task_runner_ = task_runner; 911 posted_to_task_runner_ = task_runner;
910 task_runner->PostTask( 912 task_runner->PostTask(
911 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); 913 FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this));
912 } 914 }
913 915
914 void MultiplexRouter::LockAndCallProcessTasks() { 916 void MultiplexRouter::LockAndCallProcessTasks() {
915 // There is no need to hold a ref to this class in this case because this is 917 // There is no need to hold a ref to this class in this case because this is
916 // always called using base::Bind(), which holds a ref. 918 // always called using base::Bind(), which holds a ref.
917 MayAutoLock locker(lock_.get()); 919 MayAutoLock locker(&lock_);
918 posted_to_process_tasks_ = false; 920 posted_to_process_tasks_ = false;
919 scoped_refptr<base::SingleThreadTaskRunner> runner( 921 scoped_refptr<base::SingleThreadTaskRunner> runner(
920 std::move(posted_to_task_runner_)); 922 std::move(posted_to_task_runner_));
921 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); 923 ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get());
922 } 924 }
923 925
924 void MultiplexRouter::UpdateEndpointStateMayRemove( 926 void MultiplexRouter::UpdateEndpointStateMayRemove(
925 InterfaceEndpoint* endpoint, 927 InterfaceEndpoint* endpoint,
926 EndpointStateUpdateType type) { 928 EndpointStateUpdateType type) {
927 switch (type) { 929 switch (type) {
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
973 975
974 void MultiplexRouter::AssertLockAcquired() { 976 void MultiplexRouter::AssertLockAcquired() {
975 #if DCHECK_IS_ON() 977 #if DCHECK_IS_ON()
976 if (lock_) 978 if (lock_)
977 lock_->AssertAcquired(); 979 lock_->AssertAcquired();
978 #endif 980 #endif
979 } 981 }
980 982
981 } // namespace internal 983 } // namespace internal
982 } // namespace mojo 984 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/public/cpp/bindings/lib/multiplex_router.h ('k') | mojo/public/cpp/bindings/lib/scoped_interface_endpoint_handle.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698