| OLD | NEW |
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <map> | 9 #include <map> |
| 10 #include <memory> | 10 #include <memory> |
| 11 #include <queue> |
| 11 #include <utility> | 12 #include <utility> |
| 12 #include <vector> | 13 #include <vector> |
| 13 | 14 |
| 14 #include "base/callback.h" | 15 #include "base/callback.h" |
| 15 #include "base/logging.h" | 16 #include "base/logging.h" |
| 16 #include "base/macros.h" | 17 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
| 18 #include "base/single_thread_task_runner.h" | 19 #include "base/single_thread_task_runner.h" |
| 19 #include "base/synchronization/lock.h" | 20 #include "base/synchronization/lock.h" |
| 20 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
| 22 #include "ipc/mojo_event.h" |
| 21 #include "mojo/public/cpp/bindings/associated_group.h" | 23 #include "mojo/public/cpp/bindings/associated_group.h" |
| 22 #include "mojo/public/cpp/bindings/associated_group_controller.h" | 24 #include "mojo/public/cpp/bindings/associated_group_controller.h" |
| 23 #include "mojo/public/cpp/bindings/connector.h" | 25 #include "mojo/public/cpp/bindings/connector.h" |
| 24 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" | 26 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
| 25 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" | 27 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" |
| 26 #include "mojo/public/cpp/bindings/interface_id.h" | 28 #include "mojo/public/cpp/bindings/interface_id.h" |
| 27 #include "mojo/public/cpp/bindings/message.h" | 29 #include "mojo/public/cpp/bindings/message.h" |
| 28 #include "mojo/public/cpp/bindings/message_header_validator.h" | 30 #include "mojo/public/cpp/bindings/message_header_validator.h" |
| 29 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" | 31 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" |
| 30 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" | 32 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" |
| 31 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" | 33 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" |
| 34 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" |
| 32 | 35 |
| 33 namespace IPC { | 36 namespace IPC { |
| 34 | 37 |
| 35 namespace { | 38 namespace { |
| 36 | 39 |
| 37 class ChannelAssociatedGroupController | 40 class ChannelAssociatedGroupController |
| 38 : public mojo::AssociatedGroupController, | 41 : public mojo::AssociatedGroupController, |
| 39 public mojo::MessageReceiver, | 42 public mojo::MessageReceiver, |
| 40 public mojo::PipeControlMessageHandlerDelegate { | 43 public mojo::PipeControlMessageHandlerDelegate { |
| 41 public: | 44 public: |
| (...skipping 224 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 266 } | 269 } |
| 267 | 270 |
| 268 void DetachClient() { | 271 void DetachClient() { |
| 269 controller_->lock_.AssertAcquired(); | 272 controller_->lock_.AssertAcquired(); |
| 270 DCHECK(client_); | 273 DCHECK(client_); |
| 271 DCHECK(task_runner_->BelongsToCurrentThread()); | 274 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 272 DCHECK(!closed_); | 275 DCHECK(!closed_); |
| 273 | 276 |
| 274 task_runner_ = nullptr; | 277 task_runner_ = nullptr; |
| 275 client_ = nullptr; | 278 client_ = nullptr; |
| 279 sync_watcher_.reset(); |
| 280 } |
| 281 |
| 282 uint32_t EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) { |
| 283 controller_->lock_.AssertAcquired(); |
| 284 uint32_t id = GenerateSyncMessageId(); |
| 285 sync_messages_.emplace(id, std::move(message)); |
| 286 SignalSyncMessageEvent(); |
| 287 return id; |
| 288 } |
| 289 |
| 290 void SignalSyncMessageEvent() { |
| 291 controller_->lock_.AssertAcquired(); |
| 292 EnsureSyncMessageEventExists(); |
| 293 sync_message_event_->Signal(); |
| 294 } |
| 295 |
| 296 std::unique_ptr<mojo::Message> PopSyncMessage(uint32_t id) { |
| 297 controller_->lock_.AssertAcquired(); |
| 298 if (sync_messages_.empty() || sync_messages_.front().first != id) |
| 299 return nullptr; |
| 300 std::unique_ptr<mojo::Message> message = |
| 301 std::move(sync_messages_.front().second); |
| 302 sync_messages_.pop(); |
| 303 return message; |
| 276 } | 304 } |
| 277 | 305 |
| 278 // mojo::InterfaceEndpointController: | 306 // mojo::InterfaceEndpointController: |
| 279 bool SendMessage(mojo::Message* message) override { | 307 bool SendMessage(mojo::Message* message) override { |
| 280 DCHECK(task_runner_->BelongsToCurrentThread()); | 308 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 281 message->set_interface_id(id_); | 309 message->set_interface_id(id_); |
| 282 return controller_->SendMessage(message); | 310 return controller_->SendMessage(message); |
| 283 } | 311 } |
| 284 | 312 |
| 285 void AllowWokenUpBySyncWatchOnSameThread() override { | 313 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 286 DCHECK(task_runner_->BelongsToCurrentThread()); | 314 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 287 | 315 |
| 288 // TODO(rockot): Implement sync waiting. | 316 EnsureSyncWatcherExists(); |
| 289 NOTREACHED(); | 317 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 290 } | 318 } |
| 291 | 319 |
| 292 bool SyncWatch(const bool* should_stop) override { | 320 bool SyncWatch(const bool* should_stop) override { |
| 293 DCHECK(task_runner_->BelongsToCurrentThread()); | 321 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 294 | 322 |
| 295 // It's not legal to make sync calls from the master endpoint's thread, | 323 // It's not legal to make sync calls from the master endpoint's thread, |
| 296 // and in fact they must only happen from the proxy task runner. | 324 // and in fact they must only happen from the proxy task runner. |
| 297 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); | 325 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
| 298 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); | 326 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
| 299 | 327 |
| 300 // TODO(rockot): Implement sync waiting. | 328 EnsureSyncWatcherExists(); |
| 301 NOTREACHED(); | 329 return sync_watcher_->SyncWatch(should_stop); |
| 302 return false; | |
| 303 } | 330 } |
| 304 | 331 |
| 305 private: | 332 private: |
| 306 friend class base::RefCountedThreadSafe<Endpoint>; | 333 friend class base::RefCountedThreadSafe<Endpoint>; |
| 307 | 334 |
| 308 ~Endpoint() override {} | 335 ~Endpoint() override { |
| 336 controller_->lock_.AssertAcquired(); |
| 337 DCHECK(!client_); |
| 338 DCHECK(closed_); |
| 339 DCHECK(peer_closed_); |
| 340 DCHECK(!sync_watcher_); |
| 341 } |
| 342 |
| 343 void OnSyncMessageEventHandleReady(MojoResult result) { |
| 344 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 345 |
| 346 scoped_refptr<Endpoint> keepalive(this); |
| 347 scoped_refptr<AssociatedGroupController> controller_keepalive( |
| 348 controller_); |
| 349 |
| 350 bool reset_sync_watcher = false; |
| 351 { |
| 352 base::AutoLock locker(controller_->lock_); |
| 353 bool more_to_process = false; |
| 354 if (!sync_messages_.empty()) { |
| 355 std::unique_ptr<mojo::Message> message( |
| 356 std::move(sync_messages_.front().second)); |
| 357 sync_messages_.pop(); |
| 358 |
| 359 bool dispatch_succeeded; |
| 360 mojo::InterfaceEndpointClient* client = client_; |
| 361 { |
| 362 base::AutoUnlock unlocker(controller_->lock_); |
| 363 dispatch_succeeded = client->HandleIncomingMessage(message.get()); |
| 364 } |
| 365 |
| 366 if (!sync_messages_.empty()) |
| 367 more_to_process = true; |
| 368 |
| 369 if (!dispatch_succeeded) |
| 370 controller_->RaiseError(); |
| 371 } |
| 372 |
| 373 if (!more_to_process) |
| 374 sync_message_event_->Reset(); |
| 375 |
| 376 // If there are no queued sync messages and the peer has closed, there |
| 377 // there won't be incoming sync messages in the future. |
| 378 reset_sync_watcher = !more_to_process && peer_closed_; |
| 379 } |
| 380 |
| 381 if (reset_sync_watcher) { |
| 382 // If a SyncWatch() call (or multiple ones) of this interface endpoint |
| 383 // is on the call stack, resetting the sync watcher will allow it to |
| 384 // exit when the call stack unwinds to that frame. |
| 385 sync_watcher_.reset(); |
| 386 } |
| 387 } |
| 388 |
| 389 void EnsureSyncWatcherExists() { |
| 390 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 391 if (sync_watcher_) |
| 392 return; |
| 393 |
| 394 { |
| 395 base::AutoLock locker(controller_->lock_); |
| 396 EnsureSyncMessageEventExists(); |
| 397 if (!sync_messages_.empty()) |
| 398 SignalSyncMessageEvent(); |
| 399 } |
| 400 |
| 401 sync_watcher_.reset(new mojo::SyncHandleWatcher( |
| 402 sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, |
| 403 base::Bind(&Endpoint::OnSyncMessageEventHandleReady, |
| 404 base::Unretained(this)))); |
| 405 } |
| 406 |
| 407 void EnsureSyncMessageEventExists() { |
| 408 controller_->lock_.AssertAcquired(); |
| 409 if (!sync_message_event_) |
| 410 sync_message_event_.reset(new MojoEvent); |
| 411 } |
| 412 |
| 413 uint32_t GenerateSyncMessageId() { |
| 414 // Overflow is fine. |
| 415 uint32_t id = next_sync_message_id_++; |
| 416 DCHECK(sync_messages_.empty() || sync_messages_.front().first != id); |
| 417 return id; |
| 418 } |
| 309 | 419 |
| 310 ChannelAssociatedGroupController* const controller_; | 420 ChannelAssociatedGroupController* const controller_; |
| 311 const mojo::InterfaceId id_; | 421 const mojo::InterfaceId id_; |
| 312 | 422 |
| 313 bool closed_ = false; | 423 bool closed_ = false; |
| 314 bool peer_closed_ = false; | 424 bool peer_closed_ = false; |
| 315 mojo::InterfaceEndpointClient* client_ = nullptr; | 425 mojo::InterfaceEndpointClient* client_ = nullptr; |
| 316 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 426 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 427 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; |
| 428 std::unique_ptr<MojoEvent> sync_message_event_; |
| 429 std::queue<std::pair<uint32_t, std::unique_ptr<mojo::Message>>> |
| 430 sync_messages_; |
| 431 uint32_t next_sync_message_id_ = 0; |
| 317 | 432 |
| 318 DISALLOW_COPY_AND_ASSIGN(Endpoint); | 433 DISALLOW_COPY_AND_ASSIGN(Endpoint); |
| 319 }; | 434 }; |
| 320 | 435 |
| 321 class ControlMessageProxyThunk : public MessageReceiver { | 436 class ControlMessageProxyThunk : public MessageReceiver { |
| 322 public: | 437 public: |
| 323 explicit ControlMessageProxyThunk( | 438 explicit ControlMessageProxyThunk( |
| 324 ChannelAssociatedGroupController* controller) | 439 ChannelAssociatedGroupController* controller) |
| 325 : controller_(controller) {} | 440 : controller_(controller) {} |
| 326 | 441 |
| (...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 416 DCHECK(endpoint->task_runner() && endpoint->client()); | 531 DCHECK(endpoint->task_runner() && endpoint->client()); |
| 417 if (endpoint->task_runner()->BelongsToCurrentThread() && !force_async) { | 532 if (endpoint->task_runner()->BelongsToCurrentThread() && !force_async) { |
| 418 mojo::InterfaceEndpointClient* client = endpoint->client(); | 533 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 419 | 534 |
| 420 base::AutoUnlock unlocker(lock_); | 535 base::AutoUnlock unlocker(lock_); |
| 421 client->NotifyError(); | 536 client->NotifyError(); |
| 422 } else { | 537 } else { |
| 423 endpoint->task_runner()->PostTask( | 538 endpoint->task_runner()->PostTask( |
| 424 FROM_HERE, | 539 FROM_HERE, |
| 425 base::Bind(&ChannelAssociatedGroupController | 540 base::Bind(&ChannelAssociatedGroupController |
| 426 ::NotifyEndpointOfErrorOnEndpointThread, this, | 541 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
| 427 make_scoped_refptr(endpoint))); | 542 endpoint)); |
| 428 } | 543 } |
| 429 } | 544 } |
| 430 | 545 |
| 431 void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) { | 546 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
| 547 Endpoint* endpoint) { |
| 432 base::AutoLock locker(lock_); | 548 base::AutoLock locker(lock_); |
| 549 auto iter = endpoints_.find(id); |
| 550 if (iter == endpoints_.end() || iter->second.get() != endpoint) |
| 551 return; |
| 433 if (!endpoint->client()) | 552 if (!endpoint->client()) |
| 434 return; | 553 return; |
| 554 |
| 435 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 555 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 436 NotifyEndpointOfError(endpoint.get(), false /* force_async */); | 556 NotifyEndpointOfError(endpoint, false /* force_async */); |
| 437 } | 557 } |
| 438 | 558 |
| 439 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { | 559 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
| 440 lock_.AssertAcquired(); | 560 lock_.AssertAcquired(); |
| 441 endpoint->set_closed(); | 561 endpoint->set_closed(); |
| 442 if (endpoint->closed() && endpoint->peer_closed()) | 562 if (endpoint->closed() && endpoint->peer_closed()) |
| 443 endpoints_.erase(endpoint->id()); | 563 endpoints_.erase(endpoint->id()); |
| 444 } | 564 } |
| 445 | 565 |
| 446 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { | 566 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { |
| 447 lock_.AssertAcquired(); | 567 lock_.AssertAcquired(); |
| 448 endpoint->set_peer_closed(); | 568 endpoint->set_peer_closed(); |
| 569 endpoint->SignalSyncMessageEvent(); |
| 449 if (endpoint->closed() && endpoint->peer_closed()) | 570 if (endpoint->closed() && endpoint->peer_closed()) |
| 450 endpoints_.erase(endpoint->id()); | 571 endpoints_.erase(endpoint->id()); |
| 451 } | 572 } |
| 452 | 573 |
| 453 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { | 574 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { |
| 454 lock_.AssertAcquired(); | 575 lock_.AssertAcquired(); |
| 455 DCHECK(!inserted || !*inserted); | 576 DCHECK(!inserted || !*inserted); |
| 456 | 577 |
| 457 auto iter = endpoints_.find(id); | 578 auto iter = endpoints_.find(id); |
| 458 if (iter != endpoints_.end()) | 579 if (iter != endpoints_.end()) |
| (...skipping 23 matching lines...) Expand all Loading... |
| 482 | 603 |
| 483 mojo::InterfaceEndpointClient* client = endpoint->client(); | 604 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 484 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { | 605 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { |
| 485 // No client has been bound yet or the client runs tasks on another | 606 // No client has been bound yet or the client runs tasks on another |
| 486 // thread. We assume the other thread must always be the one on which | 607 // thread. We assume the other thread must always be the one on which |
| 487 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 608 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
| 488 // | 609 // |
| 489 // If the client is not yet bound, it must be bound by the time this task | 610 // If the client is not yet bound, it must be bound by the time this task |
| 490 // runs or else it's programmer error. | 611 // runs or else it's programmer error. |
| 491 DCHECK(proxy_task_runner_); | 612 DCHECK(proxy_task_runner_); |
| 613 |
| 492 std::unique_ptr<mojo::Message> passed_message(new mojo::Message); | 614 std::unique_ptr<mojo::Message> passed_message(new mojo::Message); |
| 493 message->MoveTo(passed_message.get()); | 615 message->MoveTo(passed_message.get()); |
| 616 |
| 617 if (passed_message->has_flag(mojo::Message::kFlagIsSync)) { |
| 618 // Sync messages may need to be handled by the endpoint if it's blocking |
| 619 // on a sync reply. We pass ownership of the message to the endpoint's |
| 620 // sync message queue. If the endpoint was blocking, it will dequeue the |
| 621 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| |
| 622 // call will dequeue the message and dispatch it. |
| 623 uint32_t message_id = |
| 624 endpoint->EnqueueSyncMessage(std::move(passed_message)); |
| 625 proxy_task_runner_->PostTask( |
| 626 FROM_HERE, |
| 627 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, |
| 628 this, id, message_id)); |
| 629 return true; |
| 630 } |
| 631 |
| 494 proxy_task_runner_->PostTask( | 632 proxy_task_runner_->PostTask( |
| 495 FROM_HERE, | 633 FROM_HERE, |
| 496 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, | 634 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, |
| 497 this, base::Passed(&passed_message))); | 635 this, base::Passed(&passed_message))); |
| 498 return true; | 636 return true; |
| 499 } | 637 } |
| 500 | 638 |
| 501 // We do not expect to receive sync responses on the master endpoint thread. | 639 // We do not expect to receive sync responses on the master endpoint thread. |
| 502 // If it's happening, it's a bug. | 640 // If it's happening, it's a bug. |
| 503 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); | 641 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || |
| 642 !message->has_flag(mojo::Message::kFlagIsResponse)); |
| 504 | 643 |
| 505 base::AutoUnlock unlocker(lock_); | 644 base::AutoUnlock unlocker(lock_); |
| 506 return client->HandleIncomingMessage(message); | 645 return client->HandleIncomingMessage(message); |
| 507 } | 646 } |
| 508 | 647 |
| 509 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { | 648 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { |
| 510 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 649 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 511 | 650 |
| 512 mojo::InterfaceId id = message->interface_id(); | 651 mojo::InterfaceId id = message->interface_id(); |
| 513 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); | 652 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
| 514 | 653 |
| 515 base::AutoLock locker(lock_); | 654 base::AutoLock locker(lock_); |
| 516 Endpoint* endpoint = GetEndpointForDispatch(id); | 655 Endpoint* endpoint = GetEndpointForDispatch(id); |
| 517 if (!endpoint) | 656 if (!endpoint) |
| 518 return; | 657 return; |
| 519 | 658 |
| 520 mojo::InterfaceEndpointClient* client = endpoint->client(); | 659 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 521 if (!client) | 660 if (!client) |
| 522 return; | 661 return; |
| 523 | 662 |
| 524 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 663 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 525 | 664 |
| 526 // TODO(rockot): Implement sync dispatch. For now, sync messages are | 665 // Sync messages should never make their way to this method. |
| 527 // unsupported here. | |
| 528 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); | 666 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
| 529 | 667 |
| 530 bool result = false; | 668 bool result = false; |
| 531 { | 669 { |
| 532 base::AutoUnlock unlocker(lock_); | 670 base::AutoUnlock unlocker(lock_); |
| 533 result = client->HandleIncomingMessage(message.get()); | 671 result = client->HandleIncomingMessage(message.get()); |
| 534 } | 672 } |
| 535 | 673 |
| 536 if (!result) | 674 if (!result) |
| 537 RaiseError(); | 675 RaiseError(); |
| 538 } | 676 } |
| 539 | 677 |
| 678 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
| 679 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 680 |
| 681 base::AutoLock locker(lock_); |
| 682 Endpoint* endpoint = GetEndpointForDispatch(interface_id); |
| 683 if (!endpoint) |
| 684 return; |
| 685 |
| 686 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 687 std::unique_ptr<mojo::Message> message = |
| 688 endpoint->PopSyncMessage(message_id); |
| 689 |
| 690 // The message must have already been dequeued by the endpoint waking up |
| 691 // from a sync wait. Nothing to do. |
| 692 if (!message) |
| 693 return; |
| 694 |
| 695 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 696 if (!client) |
| 697 return; |
| 698 |
| 699 bool result = false; |
| 700 { |
| 701 base::AutoUnlock unlocker(lock_); |
| 702 result = client->HandleIncomingMessage(message.get()); |
| 703 } |
| 704 |
| 705 if (!result) |
| 706 RaiseError(); |
| 707 } |
| 708 |
| 540 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { | 709 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { |
| 541 lock_.AssertAcquired(); | 710 lock_.AssertAcquired(); |
| 542 bool inserted = false; | 711 bool inserted = false; |
| 543 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 712 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 544 if (inserted) { | 713 if (inserted) { |
| 545 MarkClosedAndMaybeRemove(endpoint); | 714 MarkClosedAndMaybeRemove(endpoint); |
| 546 if (!mojo::IsMasterInterfaceId(id)) | 715 if (!mojo::IsMasterInterfaceId(id)) |
| 547 control_message_proxy_.NotifyPeerEndpointClosed(id); | 716 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 548 return nullptr; | 717 return nullptr; |
| 549 } | 718 } |
| (...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 666 Channel::Mode mode, | 835 Channel::Mode mode, |
| 667 Delegate* delegate, | 836 Delegate* delegate, |
| 668 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 837 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
| 669 return base::MakeUnique<MojoBootstrapImpl>( | 838 return base::MakeUnique<MojoBootstrapImpl>( |
| 670 std::move(handle), delegate, | 839 std::move(handle), delegate, |
| 671 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, | 840 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, |
| 672 ipc_task_runner)); | 841 ipc_task_runner)); |
| 673 } | 842 } |
| 674 | 843 |
| 675 } // namespace IPC | 844 } // namespace IPC |
| OLD | NEW |