Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
| 6 | 6 |
| 7 #include <stdint.h> | 7 #include <stdint.h> |
| 8 | 8 |
| 9 #include <map> | 9 #include <map> |
| 10 #include <memory> | 10 #include <memory> |
| 11 #include <queue> | |
| 11 #include <utility> | 12 #include <utility> |
| 12 #include <vector> | 13 #include <vector> |
| 13 | 14 |
| 14 #include "base/callback.h" | 15 #include "base/callback.h" |
| 15 #include "base/logging.h" | 16 #include "base/logging.h" |
| 16 #include "base/macros.h" | 17 #include "base/macros.h" |
| 17 #include "base/memory/ptr_util.h" | 18 #include "base/memory/ptr_util.h" |
| 18 #include "base/single_thread_task_runner.h" | 19 #include "base/single_thread_task_runner.h" |
| 19 #include "base/synchronization/lock.h" | 20 #include "base/synchronization/lock.h" |
| 20 #include "base/threading/thread_task_runner_handle.h" | 21 #include "base/threading/thread_task_runner_handle.h" |
| 22 #include "ipc/mojo_event.h" | |
| 21 #include "mojo/public/cpp/bindings/associated_group.h" | 23 #include "mojo/public/cpp/bindings/associated_group.h" |
| 22 #include "mojo/public/cpp/bindings/associated_group_controller.h" | 24 #include "mojo/public/cpp/bindings/associated_group_controller.h" |
| 23 #include "mojo/public/cpp/bindings/connector.h" | 25 #include "mojo/public/cpp/bindings/connector.h" |
| 24 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" | 26 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
| 25 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" | 27 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" |
| 26 #include "mojo/public/cpp/bindings/interface_id.h" | 28 #include "mojo/public/cpp/bindings/interface_id.h" |
| 27 #include "mojo/public/cpp/bindings/message.h" | 29 #include "mojo/public/cpp/bindings/message.h" |
| 28 #include "mojo/public/cpp/bindings/message_header_validator.h" | 30 #include "mojo/public/cpp/bindings/message_header_validator.h" |
| 29 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" | 31 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" |
| 30 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" | 32 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" |
| 31 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" | 33 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" |
| 34 #include "mojo/public/cpp/bindings/sync_handle_watcher.h" | |
| 32 | 35 |
| 33 namespace IPC { | 36 namespace IPC { |
| 34 | 37 |
| 35 namespace { | 38 namespace { |
| 36 | 39 |
| 37 class ChannelAssociatedGroupController | 40 class ChannelAssociatedGroupController |
| 38 : public mojo::AssociatedGroupController, | 41 : public mojo::AssociatedGroupController, |
| 39 public mojo::MessageReceiver, | 42 public mojo::MessageReceiver, |
| 40 public mojo::PipeControlMessageHandlerDelegate { | 43 public mojo::PipeControlMessageHandlerDelegate { |
| 41 public: | 44 public: |
| (...skipping 224 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 266 } | 269 } |
| 267 | 270 |
| 268 void DetachClient() { | 271 void DetachClient() { |
| 269 controller_->lock_.AssertAcquired(); | 272 controller_->lock_.AssertAcquired(); |
| 270 DCHECK(client_); | 273 DCHECK(client_); |
| 271 DCHECK(task_runner_->BelongsToCurrentThread()); | 274 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 272 DCHECK(!closed_); | 275 DCHECK(!closed_); |
| 273 | 276 |
| 274 task_runner_ = nullptr; | 277 task_runner_ = nullptr; |
| 275 client_ = nullptr; | 278 client_ = nullptr; |
| 279 sync_watcher_.reset(); | |
| 280 } | |
| 281 | |
| 282 void EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) { | |
| 283 controller_->lock_.AssertAcquired(); | |
| 284 sync_messages_.emplace(std::move(message)); | |
| 285 SignalSyncMessageEvent(); | |
| 286 } | |
| 287 | |
| 288 void SignalSyncMessageEvent() { | |
| 289 controller_->lock_.AssertAcquired(); | |
| 290 EnsureSyncMessageEventExists(); | |
| 291 sync_message_event_->Signal(); | |
| 292 } | |
| 293 | |
| 294 std::unique_ptr<mojo::Message> PopSyncMessage(mojo::Message* raw_message) { | |
| 295 controller_->lock_.AssertAcquired(); | |
| 296 if (sync_messages_.empty() || sync_messages_.front().get() != raw_message) | |
| 297 return nullptr; | |
| 298 std::unique_ptr<mojo::Message> message = | |
| 299 std::move(sync_messages_.front()); | |
| 300 sync_messages_.pop(); | |
| 301 return message; | |
| 276 } | 302 } |
| 277 | 303 |
| 278 // mojo::InterfaceEndpointController: | 304 // mojo::InterfaceEndpointController: |
| 279 bool SendMessage(mojo::Message* message) override { | 305 bool SendMessage(mojo::Message* message) override { |
| 280 DCHECK(task_runner_->BelongsToCurrentThread()); | 306 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 281 message->set_interface_id(id_); | 307 message->set_interface_id(id_); |
| 282 return controller_->SendMessage(message); | 308 return controller_->SendMessage(message); |
| 283 } | 309 } |
| 284 | 310 |
| 285 void AllowWokenUpBySyncWatchOnSameThread() override { | 311 void AllowWokenUpBySyncWatchOnSameThread() override { |
| 286 DCHECK(task_runner_->BelongsToCurrentThread()); | 312 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 287 | 313 |
| 288 // TODO(rockot): Implement sync waiting. | 314 EnsureSyncWatcherExists(); |
| 289 NOTREACHED(); | 315 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
| 290 } | 316 } |
| 291 | 317 |
| 292 bool SyncWatch(const bool* should_stop) override { | 318 bool SyncWatch(const bool* should_stop) override { |
| 293 DCHECK(task_runner_->BelongsToCurrentThread()); | 319 DCHECK(task_runner_->BelongsToCurrentThread()); |
| 294 | 320 |
| 295 // It's not legal to make sync calls from the master endpoint's thread, | 321 // It's not legal to make sync calls from the master endpoint's thread, |
| 296 // and in fact they must only happen from the proxy task runner. | 322 // and in fact they must only happen from the proxy task runner. |
| 297 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); | 323 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
| 298 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); | 324 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
| 299 | 325 |
| 300 // TODO(rockot): Implement sync waiting. | 326 EnsureSyncWatcherExists(); |
| 301 NOTREACHED(); | 327 return sync_watcher_->SyncWatch(should_stop); |
| 302 return false; | |
| 303 } | 328 } |
| 304 | 329 |
| 305 private: | 330 private: |
| 306 friend class base::RefCountedThreadSafe<Endpoint>; | 331 friend class base::RefCountedThreadSafe<Endpoint>; |
| 307 | 332 |
| 308 ~Endpoint() override {} | 333 ~Endpoint() override { |
| 334 controller_->lock_.AssertAcquired(); | |
| 335 DCHECK(!client_); | |
| 336 DCHECK(closed_); | |
| 337 DCHECK(peer_closed_); | |
| 338 DCHECK(!sync_watcher_); | |
| 339 } | |
| 340 | |
| 341 void OnSyncMessageEventHandleReady(MojoResult result) { | |
| 342 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 343 | |
| 344 scoped_refptr<Endpoint> keepalive(this); | |
| 345 scoped_refptr<AssociatedGroupController> controller_keepalive( | |
| 346 controller_); | |
| 347 | |
| 348 bool reset_sync_watcher = false; | |
| 349 { | |
| 350 base::AutoLock locker(controller_->lock_); | |
| 351 bool more_to_process = false; | |
| 352 if (!sync_messages_.empty()) { | |
| 353 std::unique_ptr<mojo::Message> message( | |
| 354 std::move(sync_messages_.front())); | |
| 355 sync_messages_.pop(); | |
| 356 | |
| 357 bool dispatch_succeeded; | |
| 358 mojo::InterfaceEndpointClient* client = client_; | |
| 359 { | |
| 360 base::AutoUnlock unlocker(controller_->lock_); | |
| 361 dispatch_succeeded = client->HandleIncomingMessage(message.get()); | |
| 362 } | |
| 363 | |
| 364 if (!sync_messages_.empty()) | |
| 365 more_to_process = true; | |
| 366 | |
| 367 if (!dispatch_succeeded) | |
| 368 controller_->RaiseError(); | |
| 369 } | |
| 370 | |
| 371 if (!more_to_process) | |
| 372 sync_message_event_->Reset(); | |
| 373 | |
| 374 // If there are no queued sync messages and the peer has closed, there | |
| 375 // there won't be incoming sync messages in the future. | |
| 376 reset_sync_watcher = !more_to_process && peer_closed_; | |
| 377 } | |
| 378 | |
| 379 if (reset_sync_watcher) { | |
| 380 // If a SyncWatch() call (or multiple ones) of this interface endpoint | |
| 381 // is on the call stack, resetting the sync watcher will allow it to | |
| 382 // exit when the call stack unwinds to that frame. | |
| 383 sync_watcher_.reset(); | |
| 384 } | |
| 385 } | |
| 386 | |
| 387 void EnsureSyncWatcherExists() { | |
| 388 DCHECK(task_runner_->BelongsToCurrentThread()); | |
| 389 if (sync_watcher_) | |
| 390 return; | |
| 391 | |
| 392 { | |
| 393 base::AutoLock locker(controller_->lock_); | |
| 394 EnsureSyncMessageEventExists(); | |
| 395 if (!sync_messages_.empty()) | |
| 396 SignalSyncMessageEvent(); | |
| 397 } | |
| 398 | |
| 399 sync_watcher_.reset(new mojo::SyncHandleWatcher( | |
| 400 sync_message_event_->GetHandle(), MOJO_HANDLE_SIGNAL_READABLE, | |
| 401 base::Bind(&Endpoint::OnSyncMessageEventHandleReady, | |
| 402 base::Unretained(this)))); | |
| 403 } | |
| 404 | |
| 405 void EnsureSyncMessageEventExists() { | |
| 406 controller_->lock_.AssertAcquired(); | |
| 407 if (!sync_message_event_) | |
| 408 sync_message_event_.reset(new MojoEvent); | |
| 409 } | |
| 309 | 410 |
| 310 ChannelAssociatedGroupController* const controller_; | 411 ChannelAssociatedGroupController* const controller_; |
| 311 const mojo::InterfaceId id_; | 412 const mojo::InterfaceId id_; |
| 312 | 413 |
| 313 bool closed_ = false; | 414 bool closed_ = false; |
| 314 bool peer_closed_ = false; | 415 bool peer_closed_ = false; |
| 315 mojo::InterfaceEndpointClient* client_ = nullptr; | 416 mojo::InterfaceEndpointClient* client_ = nullptr; |
| 316 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 417 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
| 418 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; | |
| 419 std::unique_ptr<MojoEvent> sync_message_event_; | |
| 420 std::queue<std::unique_ptr<mojo::Message>> sync_messages_; | |
| 317 | 421 |
| 318 DISALLOW_COPY_AND_ASSIGN(Endpoint); | 422 DISALLOW_COPY_AND_ASSIGN(Endpoint); |
| 319 }; | 423 }; |
| 320 | 424 |
| 321 class ControlMessageProxyThunk : public MessageReceiver { | 425 class ControlMessageProxyThunk : public MessageReceiver { |
| 322 public: | 426 public: |
| 323 explicit ControlMessageProxyThunk( | 427 explicit ControlMessageProxyThunk( |
| 324 ChannelAssociatedGroupController* controller) | 428 ChannelAssociatedGroupController* controller) |
| 325 : controller_(controller) {} | 429 : controller_(controller) {} |
| 326 | 430 |
| (...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 416 DCHECK(endpoint->task_runner() && endpoint->client()); | 520 DCHECK(endpoint->task_runner() && endpoint->client()); |
| 417 if (endpoint->task_runner()->BelongsToCurrentThread() && !force_async) { | 521 if (endpoint->task_runner()->BelongsToCurrentThread() && !force_async) { |
| 418 mojo::InterfaceEndpointClient* client = endpoint->client(); | 522 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 419 | 523 |
| 420 base::AutoUnlock unlocker(lock_); | 524 base::AutoUnlock unlocker(lock_); |
| 421 client->NotifyError(); | 525 client->NotifyError(); |
| 422 } else { | 526 } else { |
| 423 endpoint->task_runner()->PostTask( | 527 endpoint->task_runner()->PostTask( |
| 424 FROM_HERE, | 528 FROM_HERE, |
| 425 base::Bind(&ChannelAssociatedGroupController | 529 base::Bind(&ChannelAssociatedGroupController |
| 426 ::NotifyEndpointOfErrorOnEndpointThread, this, | 530 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
| 427 make_scoped_refptr(endpoint))); | 531 endpoint)); |
| 428 } | 532 } |
| 429 } | 533 } |
| 430 | 534 |
| 431 void NotifyEndpointOfErrorOnEndpointThread(scoped_refptr<Endpoint> endpoint) { | 535 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
| 536 Endpoint* endpoint) { | |
| 432 base::AutoLock locker(lock_); | 537 base::AutoLock locker(lock_); |
| 538 auto iter = endpoints_.find(id); | |
| 539 if (iter == endpoints_.end() || iter->second.get() != endpoint) | |
| 540 return; | |
| 433 if (!endpoint->client()) | 541 if (!endpoint->client()) |
| 434 return; | 542 return; |
| 543 | |
| 435 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 544 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 436 NotifyEndpointOfError(endpoint.get(), false /* force_async */); | 545 NotifyEndpointOfError(endpoint, false /* force_async */); |
| 437 } | 546 } |
| 438 | 547 |
| 439 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { | 548 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
| 440 lock_.AssertAcquired(); | 549 lock_.AssertAcquired(); |
| 441 endpoint->set_closed(); | 550 endpoint->set_closed(); |
| 442 if (endpoint->closed() && endpoint->peer_closed()) | 551 if (endpoint->closed() && endpoint->peer_closed()) |
| 443 endpoints_.erase(endpoint->id()); | 552 endpoints_.erase(endpoint->id()); |
| 444 } | 553 } |
| 445 | 554 |
| 446 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { | 555 void MarkPeerClosedAndMaybeRemove(Endpoint* endpoint) { |
| 447 lock_.AssertAcquired(); | 556 lock_.AssertAcquired(); |
| 448 endpoint->set_peer_closed(); | 557 endpoint->set_peer_closed(); |
| 558 endpoint->SignalSyncMessageEvent(); | |
| 449 if (endpoint->closed() && endpoint->peer_closed()) | 559 if (endpoint->closed() && endpoint->peer_closed()) |
| 450 endpoints_.erase(endpoint->id()); | 560 endpoints_.erase(endpoint->id()); |
| 451 } | 561 } |
| 452 | 562 |
| 453 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { | 563 Endpoint* FindOrInsertEndpoint(mojo::InterfaceId id, bool* inserted) { |
| 454 lock_.AssertAcquired(); | 564 lock_.AssertAcquired(); |
| 455 DCHECK(!inserted || !*inserted); | 565 DCHECK(!inserted || !*inserted); |
| 456 | 566 |
| 457 auto iter = endpoints_.find(id); | 567 auto iter = endpoints_.find(id); |
| 458 if (iter != endpoints_.end()) | 568 if (iter != endpoints_.end()) |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 482 | 592 |
| 483 mojo::InterfaceEndpointClient* client = endpoint->client(); | 593 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 484 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { | 594 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { |
| 485 // No client has been bound yet or the client runs tasks on another | 595 // No client has been bound yet or the client runs tasks on another |
| 486 // thread. We assume the other thread must always be the one on which | 596 // thread. We assume the other thread must always be the one on which |
| 487 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 597 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
| 488 // | 598 // |
| 489 // If the client is not yet bound, it must be bound by the time this task | 599 // If the client is not yet bound, it must be bound by the time this task |
| 490 // runs or else it's programmer error. | 600 // runs or else it's programmer error. |
| 491 DCHECK(proxy_task_runner_); | 601 DCHECK(proxy_task_runner_); |
| 602 | |
| 492 std::unique_ptr<mojo::Message> passed_message(new mojo::Message); | 603 std::unique_ptr<mojo::Message> passed_message(new mojo::Message); |
| 493 message->MoveTo(passed_message.get()); | 604 message->MoveTo(passed_message.get()); |
| 605 | |
| 606 if (passed_message->has_flag(mojo::Message::kFlagIsSync)) { | |
| 607 // Sync messages may need to be handled by the endpoint if it's blocking | |
| 608 // on a sync reply. We pass ownership of the message to the endpoint's | |
| 609 // sync message queue. If the endpoint was blocking, it will dequeue the | |
| 610 // message and dispatch it. Otherwise the posted |AcceptSyncMessage()| | |
| 611 // call will dequeue the message and dispatch it. | |
| 612 mojo::Message* raw_message = passed_message.get(); | |
| 613 endpoint->EnqueueSyncMessage(std::move(passed_message)); | |
| 614 proxy_task_runner_->PostTask( | |
| 615 FROM_HERE, | |
| 616 base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage, | |
| 617 this, id, raw_message)); | |
|
yzshen1
2016/08/02 17:38:37
Raw message pointer may not be an appropriate ID:
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
I've switched to using sequential integers. Overfl
yzshen1
2016/08/02 18:34:11
SGTM
| |
| 618 return true; | |
| 619 } | |
| 620 | |
| 494 proxy_task_runner_->PostTask( | 621 proxy_task_runner_->PostTask( |
| 495 FROM_HERE, | 622 FROM_HERE, |
| 496 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, | 623 base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread, |
| 497 this, base::Passed(&passed_message))); | 624 this, base::Passed(&passed_message))); |
| 498 return true; | 625 return true; |
| 499 } | 626 } |
| 500 | 627 |
| 501 // We do not expect to receive sync responses on the master endpoint thread. | 628 // We do not expect to receive sync responses on the master endpoint thread. |
| 502 // If it's happening, it's a bug. | 629 // If it's happening, it's a bug. |
| 503 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); | 630 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync) || |
| 631 !message->has_flag(mojo::Message::kFlagIsResponse)); | |
| 504 | 632 |
| 505 base::AutoUnlock unlocker(lock_); | 633 base::AutoUnlock unlocker(lock_); |
| 506 return client->HandleIncomingMessage(message); | 634 return client->HandleIncomingMessage(message); |
| 507 } | 635 } |
| 508 | 636 |
| 509 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { | 637 void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) { |
| 510 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 638 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
| 511 | 639 |
| 512 mojo::InterfaceId id = message->interface_id(); | 640 mojo::InterfaceId id = message->interface_id(); |
| 513 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); | 641 DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id)); |
| 514 | 642 |
| 643 scoped_refptr<ChannelAssociatedGroupController> keepalive(this); | |
|
yzshen1
2016/08/02 17:38:37
I think this line is not needed because the base::
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
Done
| |
| 515 base::AutoLock locker(lock_); | 644 base::AutoLock locker(lock_); |
| 516 Endpoint* endpoint = GetEndpointForDispatch(id); | 645 Endpoint* endpoint = GetEndpointForDispatch(id); |
| 517 if (!endpoint) | 646 if (!endpoint) |
| 518 return; | 647 return; |
| 519 | 648 |
| 520 mojo::InterfaceEndpointClient* client = endpoint->client(); | 649 mojo::InterfaceEndpointClient* client = endpoint->client(); |
| 521 if (!client) | 650 if (!client) |
| 522 return; | 651 return; |
| 523 | 652 |
| 524 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 653 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); |
| 525 | 654 |
| 526 // TODO(rockot): Implement sync dispatch. For now, sync messages are | 655 // Sync messages should never make their way to this method. |
| 527 // unsupported here. | |
| 528 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); | 656 DCHECK(!message->has_flag(mojo::Message::kFlagIsSync)); |
| 529 | 657 |
| 530 bool result = false; | 658 bool result = false; |
| 531 { | 659 { |
| 532 base::AutoUnlock unlocker(lock_); | 660 base::AutoUnlock unlocker(lock_); |
| 533 result = client->HandleIncomingMessage(message.get()); | 661 result = client->HandleIncomingMessage(message.get()); |
| 534 } | 662 } |
| 535 | 663 |
| 536 if (!result) | 664 if (!result) |
| 537 RaiseError(); | 665 RaiseError(); |
| 538 } | 666 } |
| 539 | 667 |
| 668 void AcceptSyncMessage(mojo::InterfaceId interface_id, | |
| 669 mojo::Message* raw_message) { | |
| 670 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | |
| 671 | |
| 672 scoped_refptr<ChannelAssociatedGroupController> keepalive(this); | |
| 673 base::AutoLock locker(lock_); | |
| 674 Endpoint* endpoint = GetEndpointForDispatch(interface_id); | |
| 675 if (!endpoint) | |
| 676 return; | |
| 677 | |
| 678 mojo::InterfaceEndpointClient* client = endpoint->client(); | |
| 679 if (!client) | |
|
yzshen1
2016/08/02 17:38:37
If there is not client, what should we do here? Sh
Ken Rockot(use gerrit already)
2016/08/02 18:21:24
Fixed to drop the message in this case
yzshen1
2016/08/02 18:34:11
Btw, I think we don't have code to disallow re-att
| |
| 680 return; | |
| 681 | |
| 682 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | |
| 683 std::unique_ptr<mojo::Message> message = | |
| 684 endpoint->PopSyncMessage(raw_message); | |
| 685 | |
| 686 // The message must have already been dequeued by the endpoint waking up | |
| 687 // from a sync wait. Nothing to do. | |
| 688 if (!message) | |
| 689 return; | |
| 690 | |
| 691 bool result = false; | |
| 692 { | |
| 693 base::AutoUnlock unlocker(lock_); | |
| 694 result = client->HandleIncomingMessage(message.get()); | |
| 695 } | |
| 696 | |
| 697 if (!result) | |
| 698 RaiseError(); | |
| 699 } | |
| 700 | |
| 540 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { | 701 Endpoint* GetEndpointForDispatch(mojo::InterfaceId id) { |
| 541 lock_.AssertAcquired(); | 702 lock_.AssertAcquired(); |
| 542 bool inserted = false; | 703 bool inserted = false; |
| 543 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); | 704 Endpoint* endpoint = FindOrInsertEndpoint(id, &inserted); |
| 544 if (inserted) { | 705 if (inserted) { |
| 545 MarkClosedAndMaybeRemove(endpoint); | 706 MarkClosedAndMaybeRemove(endpoint); |
| 546 if (!mojo::IsMasterInterfaceId(id)) | 707 if (!mojo::IsMasterInterfaceId(id)) |
| 547 control_message_proxy_.NotifyPeerEndpointClosed(id); | 708 control_message_proxy_.NotifyPeerEndpointClosed(id); |
| 548 return nullptr; | 709 return nullptr; |
| 549 } | 710 } |
| (...skipping 116 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 666 Channel::Mode mode, | 827 Channel::Mode mode, |
| 667 Delegate* delegate, | 828 Delegate* delegate, |
| 668 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 829 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
| 669 return base::MakeUnique<MojoBootstrapImpl>( | 830 return base::MakeUnique<MojoBootstrapImpl>( |
| 670 std::move(handle), delegate, | 831 std::move(handle), delegate, |
| 671 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, | 832 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, |
| 672 ipc_task_runner)); | 833 ipc_task_runner)); |
| 673 } | 834 } |
| 674 | 835 |
| 675 } // namespace IPC | 836 } // namespace IPC |
| OLD | NEW |