OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <map> | 9 #include <map> |
10 #include <memory> | 10 #include <memory> |
(...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
186 MarkClosedAndMaybeRemove(endpoint); | 186 MarkClosedAndMaybeRemove(endpoint); |
187 | 187 |
188 base::AutoUnlock unlocker(lock_); | 188 base::AutoUnlock unlocker(lock_); |
189 if (!mojo::IsMasterInterfaceId(id) || reason) | 189 if (!mojo::IsMasterInterfaceId(id) || reason) |
190 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); | 190 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); |
191 } | 191 } |
192 | 192 |
193 mojo::InterfaceEndpointController* AttachEndpointClient( | 193 mojo::InterfaceEndpointController* AttachEndpointClient( |
194 const mojo::ScopedInterfaceEndpointHandle& handle, | 194 const mojo::ScopedInterfaceEndpointHandle& handle, |
195 mojo::InterfaceEndpointClient* client, | 195 mojo::InterfaceEndpointClient* client, |
196 scoped_refptr<base::SingleThreadTaskRunner> runner) override { | 196 scoped_refptr<base::SequencedTaskRunner> runner) override { |
197 const mojo::InterfaceId id = handle.id(); | 197 const mojo::InterfaceId id = handle.id(); |
198 | 198 |
199 DCHECK(mojo::IsValidInterfaceId(id)); | 199 DCHECK(mojo::IsValidInterfaceId(id)); |
200 DCHECK(client); | 200 DCHECK(client); |
201 | 201 |
202 base::AutoLock locker(lock_); | 202 base::AutoLock locker(lock_); |
203 DCHECK(ContainsKey(endpoints_, id)); | 203 DCHECK(ContainsKey(endpoints_, id)); |
204 | 204 |
205 Endpoint* endpoint = endpoints_[id].get(); | 205 Endpoint* endpoint = endpoints_[id].get(); |
206 endpoint->AttachClient(client, std::move(runner)); | 206 endpoint->AttachClient(client, std::move(runner)); |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
270 | 270 |
271 const base::Optional<mojo::DisconnectReason>& disconnect_reason() const { | 271 const base::Optional<mojo::DisconnectReason>& disconnect_reason() const { |
272 return disconnect_reason_; | 272 return disconnect_reason_; |
273 } | 273 } |
274 | 274 |
275 void set_disconnect_reason( | 275 void set_disconnect_reason( |
276 const base::Optional<mojo::DisconnectReason>& disconnect_reason) { | 276 const base::Optional<mojo::DisconnectReason>& disconnect_reason) { |
277 disconnect_reason_ = disconnect_reason; | 277 disconnect_reason_ = disconnect_reason; |
278 } | 278 } |
279 | 279 |
280 base::SingleThreadTaskRunner* task_runner() const { | 280 base::SequencedTaskRunner* task_runner() const { |
281 return task_runner_.get(); | 281 return task_runner_.get(); |
282 } | 282 } |
283 | 283 |
284 mojo::InterfaceEndpointClient* client() const { | 284 mojo::InterfaceEndpointClient* client() const { |
285 controller_->lock_.AssertAcquired(); | 285 controller_->lock_.AssertAcquired(); |
286 return client_; | 286 return client_; |
287 } | 287 } |
288 | 288 |
289 void AttachClient(mojo::InterfaceEndpointClient* client, | 289 void AttachClient(mojo::InterfaceEndpointClient* client, |
290 scoped_refptr<base::SingleThreadTaskRunner> runner) { | 290 scoped_refptr<base::SequencedTaskRunner> runner) { |
291 controller_->lock_.AssertAcquired(); | 291 controller_->lock_.AssertAcquired(); |
292 DCHECK(!client_); | 292 DCHECK(!client_); |
293 DCHECK(!closed_); | 293 DCHECK(!closed_); |
294 DCHECK(runner->BelongsToCurrentThread()); | 294 DCHECK(runner->RunsTasksOnCurrentThread()); |
295 | 295 |
296 task_runner_ = std::move(runner); | 296 task_runner_ = std::move(runner); |
297 client_ = client; | 297 client_ = client; |
298 } | 298 } |
299 | 299 |
300 void DetachClient() { | 300 void DetachClient() { |
301 controller_->lock_.AssertAcquired(); | 301 controller_->lock_.AssertAcquired(); |
302 DCHECK(client_); | 302 DCHECK(client_); |
303 DCHECK(task_runner_->BelongsToCurrentThread()); | 303 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
304 DCHECK(!closed_); | 304 DCHECK(!closed_); |
305 | 305 |
306 task_runner_ = nullptr; | 306 task_runner_ = nullptr; |
307 client_ = nullptr; | 307 client_ = nullptr; |
308 sync_watcher_.reset(); | 308 sync_watcher_.reset(); |
309 } | 309 } |
310 | 310 |
311 uint32_t EnqueueSyncMessage(mojo::Message message) { | 311 uint32_t EnqueueSyncMessage(mojo::Message message) { |
312 controller_->lock_.AssertAcquired(); | 312 controller_->lock_.AssertAcquired(); |
313 uint32_t id = GenerateSyncMessageId(); | 313 uint32_t id = GenerateSyncMessageId(); |
(...skipping 12 matching lines...) Expand all Loading... |
326 controller_->lock_.AssertAcquired(); | 326 controller_->lock_.AssertAcquired(); |
327 if (sync_messages_.empty() || sync_messages_.front().first != id) | 327 if (sync_messages_.empty() || sync_messages_.front().first != id) |
328 return mojo::Message(); | 328 return mojo::Message(); |
329 mojo::Message message = std::move(sync_messages_.front().second); | 329 mojo::Message message = std::move(sync_messages_.front().second); |
330 sync_messages_.pop(); | 330 sync_messages_.pop(); |
331 return message; | 331 return message; |
332 } | 332 } |
333 | 333 |
334 // mojo::InterfaceEndpointController: | 334 // mojo::InterfaceEndpointController: |
335 bool SendMessage(mojo::Message* message) override { | 335 bool SendMessage(mojo::Message* message) override { |
336 DCHECK(task_runner_->BelongsToCurrentThread()); | 336 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
337 message->set_interface_id(id_); | 337 message->set_interface_id(id_); |
338 return controller_->SendMessage(message); | 338 return controller_->SendMessage(message); |
339 } | 339 } |
340 | 340 |
341 void AllowWokenUpBySyncWatchOnSameThread() override { | 341 void AllowWokenUpBySyncWatchOnSameThread() override { |
342 DCHECK(task_runner_->BelongsToCurrentThread()); | 342 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
343 | 343 |
344 EnsureSyncWatcherExists(); | 344 EnsureSyncWatcherExists(); |
345 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); | 345 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); |
346 } | 346 } |
347 | 347 |
348 bool SyncWatch(const bool* should_stop) override { | 348 bool SyncWatch(const bool* should_stop) override { |
349 DCHECK(task_runner_->BelongsToCurrentThread()); | 349 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
350 | 350 |
351 // It's not legal to make sync calls from the master endpoint's thread, | 351 // It's not legal to make sync calls from the master endpoint's thread, |
352 // and in fact they must only happen from the proxy task runner. | 352 // and in fact they must only happen from the proxy task runner. |
353 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); | 353 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
354 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); | 354 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
355 | 355 |
356 EnsureSyncWatcherExists(); | 356 EnsureSyncWatcherExists(); |
357 return sync_watcher_->SyncWatch(should_stop); | 357 return sync_watcher_->SyncWatch(should_stop); |
358 } | 358 } |
359 | 359 |
360 private: | 360 private: |
361 friend class base::RefCountedThreadSafe<Endpoint>; | 361 friend class base::RefCountedThreadSafe<Endpoint>; |
362 | 362 |
363 ~Endpoint() override { | 363 ~Endpoint() override { |
364 controller_->lock_.AssertAcquired(); | 364 controller_->lock_.AssertAcquired(); |
365 DCHECK(!client_); | 365 DCHECK(!client_); |
366 DCHECK(closed_); | 366 DCHECK(closed_); |
367 DCHECK(peer_closed_); | 367 DCHECK(peer_closed_); |
368 DCHECK(!sync_watcher_); | 368 DCHECK(!sync_watcher_); |
369 } | 369 } |
370 | 370 |
371 void OnSyncMessageEventHandleReady(MojoResult result) { | 371 void OnSyncMessageEventHandleReady(MojoResult result) { |
372 DCHECK(task_runner_->BelongsToCurrentThread()); | 372 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
373 | 373 |
374 scoped_refptr<Endpoint> keepalive(this); | 374 scoped_refptr<Endpoint> keepalive(this); |
375 scoped_refptr<AssociatedGroupController> controller_keepalive( | 375 scoped_refptr<AssociatedGroupController> controller_keepalive( |
376 controller_); | 376 controller_); |
377 | 377 |
378 bool reset_sync_watcher = false; | 378 bool reset_sync_watcher = false; |
379 { | 379 { |
380 base::AutoLock locker(controller_->lock_); | 380 base::AutoLock locker(controller_->lock_); |
381 bool more_to_process = false; | 381 bool more_to_process = false; |
382 if (!sync_messages_.empty()) { | 382 if (!sync_messages_.empty()) { |
(...skipping 24 matching lines...) Expand all Loading... |
407 | 407 |
408 if (reset_sync_watcher) { | 408 if (reset_sync_watcher) { |
409 // If a SyncWatch() call (or multiple ones) of this interface endpoint | 409 // If a SyncWatch() call (or multiple ones) of this interface endpoint |
410 // is on the call stack, resetting the sync watcher will allow it to | 410 // is on the call stack, resetting the sync watcher will allow it to |
411 // exit when the call stack unwinds to that frame. | 411 // exit when the call stack unwinds to that frame. |
412 sync_watcher_.reset(); | 412 sync_watcher_.reset(); |
413 } | 413 } |
414 } | 414 } |
415 | 415 |
416 void EnsureSyncWatcherExists() { | 416 void EnsureSyncWatcherExists() { |
417 DCHECK(task_runner_->BelongsToCurrentThread()); | 417 DCHECK(task_runner_->RunsTasksOnCurrentThread()); |
418 if (sync_watcher_) | 418 if (sync_watcher_) |
419 return; | 419 return; |
420 | 420 |
421 { | 421 { |
422 base::AutoLock locker(controller_->lock_); | 422 base::AutoLock locker(controller_->lock_); |
423 EnsureSyncMessageEventExists(); | 423 EnsureSyncMessageEventExists(); |
424 if (!sync_messages_.empty()) | 424 if (!sync_messages_.empty()) |
425 SignalSyncMessageEvent(); | 425 SignalSyncMessageEvent(); |
426 } | 426 } |
427 | 427 |
(...skipping 16 matching lines...) Expand all Loading... |
444 return id; | 444 return id; |
445 } | 445 } |
446 | 446 |
447 ChannelAssociatedGroupController* const controller_; | 447 ChannelAssociatedGroupController* const controller_; |
448 const mojo::InterfaceId id_; | 448 const mojo::InterfaceId id_; |
449 | 449 |
450 bool closed_ = false; | 450 bool closed_ = false; |
451 bool peer_closed_ = false; | 451 bool peer_closed_ = false; |
452 base::Optional<mojo::DisconnectReason> disconnect_reason_; | 452 base::Optional<mojo::DisconnectReason> disconnect_reason_; |
453 mojo::InterfaceEndpointClient* client_ = nullptr; | 453 mojo::InterfaceEndpointClient* client_ = nullptr; |
454 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 454 scoped_refptr<base::SequencedTaskRunner> task_runner_; |
455 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; | 455 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; |
456 std::unique_ptr<MojoEvent> sync_message_event_; | 456 std::unique_ptr<MojoEvent> sync_message_event_; |
457 std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_; | 457 std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_; |
458 uint32_t next_sync_message_id_ = 0; | 458 uint32_t next_sync_message_id_ = 0; |
459 | 459 |
460 DISALLOW_COPY_AND_ASSIGN(Endpoint); | 460 DISALLOW_COPY_AND_ASSIGN(Endpoint); |
461 }; | 461 }; |
462 | 462 |
463 class ControlMessageProxyThunk : public MessageReceiver { | 463 class ControlMessageProxyThunk : public MessageReceiver { |
464 public: | 464 public: |
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
552 // Because a notification may in turn detach any endpoint, we have to | 552 // Because a notification may in turn detach any endpoint, we have to |
553 // check each client again here. | 553 // check each client again here. |
554 if (endpoint->client()) | 554 if (endpoint->client()) |
555 NotifyEndpointOfError(endpoint.get(), false /* force_async */); | 555 NotifyEndpointOfError(endpoint.get(), false /* force_async */); |
556 } | 556 } |
557 } | 557 } |
558 | 558 |
559 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { | 559 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { |
560 lock_.AssertAcquired(); | 560 lock_.AssertAcquired(); |
561 DCHECK(endpoint->task_runner() && endpoint->client()); | 561 DCHECK(endpoint->task_runner() && endpoint->client()); |
562 if (endpoint->task_runner()->BelongsToCurrentThread() && !force_async) { | 562 if (endpoint->task_runner()->RunsTasksOnCurrentThread() && !force_async) { |
563 mojo::InterfaceEndpointClient* client = endpoint->client(); | 563 mojo::InterfaceEndpointClient* client = endpoint->client(); |
564 base::Optional<mojo::DisconnectReason> reason( | 564 base::Optional<mojo::DisconnectReason> reason( |
565 endpoint->disconnect_reason()); | 565 endpoint->disconnect_reason()); |
566 | 566 |
567 base::AutoUnlock unlocker(lock_); | 567 base::AutoUnlock unlocker(lock_); |
568 client->NotifyError(reason); | 568 client->NotifyError(reason); |
569 } else { | 569 } else { |
570 endpoint->task_runner()->PostTask( | 570 endpoint->task_runner()->PostTask( |
571 FROM_HERE, | 571 FROM_HERE, |
572 base::Bind(&ChannelAssociatedGroupController | 572 base::Bind(&ChannelAssociatedGroupController |
573 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), | 573 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), |
574 endpoint)); | 574 endpoint)); |
575 } | 575 } |
576 } | 576 } |
577 | 577 |
578 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, | 578 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, |
579 Endpoint* endpoint) { | 579 Endpoint* endpoint) { |
580 base::AutoLock locker(lock_); | 580 base::AutoLock locker(lock_); |
581 auto iter = endpoints_.find(id); | 581 auto iter = endpoints_.find(id); |
582 if (iter == endpoints_.end() || iter->second.get() != endpoint) | 582 if (iter == endpoints_.end() || iter->second.get() != endpoint) |
583 return; | 583 return; |
584 if (!endpoint->client()) | 584 if (!endpoint->client()) |
585 return; | 585 return; |
586 | 586 |
587 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 587 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
588 NotifyEndpointOfError(endpoint, false /* force_async */); | 588 NotifyEndpointOfError(endpoint, false /* force_async */); |
589 } | 589 } |
590 | 590 |
591 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { | 591 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { |
592 lock_.AssertAcquired(); | 592 lock_.AssertAcquired(); |
593 endpoint->set_closed(); | 593 endpoint->set_closed(); |
594 if (endpoint->closed() && endpoint->peer_closed()) | 594 if (endpoint->closed() && endpoint->peer_closed()) |
595 endpoints_.erase(endpoint->id()); | 595 endpoints_.erase(endpoint->id()); |
596 } | 596 } |
597 | 597 |
(...skipping 27 matching lines...) Expand all Loading... |
625 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) | 625 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) |
626 return control_message_handler_.Accept(message); | 626 return control_message_handler_.Accept(message); |
627 | 627 |
628 mojo::InterfaceId id = message->interface_id(); | 628 mojo::InterfaceId id = message->interface_id(); |
629 DCHECK(mojo::IsValidInterfaceId(id)); | 629 DCHECK(mojo::IsValidInterfaceId(id)); |
630 | 630 |
631 base::AutoLock locker(lock_); | 631 base::AutoLock locker(lock_); |
632 Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */); | 632 Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */); |
633 mojo::InterfaceEndpointClient* client = | 633 mojo::InterfaceEndpointClient* client = |
634 endpoint ? endpoint->client() : nullptr; | 634 endpoint ? endpoint->client() : nullptr; |
635 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { | 635 if (!client || !endpoint->task_runner()->RunsTasksOnCurrentThread()) { |
636 // No client has been bound yet or the client runs tasks on another | 636 // No client has been bound yet or the client runs tasks on another |
637 // thread. We assume the other thread must always be the one on which | 637 // thread. We assume the other thread must always be the one on which |
638 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. | 638 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. |
639 // | 639 // |
640 // If the client is not yet bound, it must be bound by the time this task | 640 // If the client is not yet bound, it must be bound by the time this task |
641 // runs or else it's programmer error. | 641 // runs or else it's programmer error. |
642 DCHECK(proxy_task_runner_); | 642 DCHECK(proxy_task_runner_); |
643 | 643 |
644 if (message->has_flag(mojo::Message::kFlagIsSync)) { | 644 if (message->has_flag(mojo::Message::kFlagIsSync)) { |
645 // Sync messages may need to be handled by the endpoint if it's blocking | 645 // Sync messages may need to be handled by the endpoint if it's blocking |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
679 | 679 |
680 base::AutoLock locker(lock_); | 680 base::AutoLock locker(lock_); |
681 Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */); | 681 Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */); |
682 if (!endpoint) | 682 if (!endpoint) |
683 return; | 683 return; |
684 | 684 |
685 mojo::InterfaceEndpointClient* client = endpoint->client(); | 685 mojo::InterfaceEndpointClient* client = endpoint->client(); |
686 if (!client) | 686 if (!client) |
687 return; | 687 return; |
688 | 688 |
689 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 689 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
690 | 690 |
691 // Sync messages should never make their way to this method. | 691 // Sync messages should never make their way to this method. |
692 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); | 692 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); |
693 | 693 |
694 bool result = false; | 694 bool result = false; |
695 { | 695 { |
696 base::AutoUnlock unlocker(lock_); | 696 base::AutoUnlock unlocker(lock_); |
697 result = client->HandleIncomingMessage(&message); | 697 result = client->HandleIncomingMessage(&message); |
698 } | 698 } |
699 | 699 |
700 if (!result) | 700 if (!result) |
701 RaiseError(); | 701 RaiseError(); |
702 } | 702 } |
703 | 703 |
704 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { | 704 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { |
705 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); | 705 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); |
706 | 706 |
707 base::AutoLock locker(lock_); | 707 base::AutoLock locker(lock_); |
708 Endpoint* endpoint = | 708 Endpoint* endpoint = |
709 GetEndpointForDispatch(interface_id, false /* create */); | 709 GetEndpointForDispatch(interface_id, false /* create */); |
710 if (!endpoint) | 710 if (!endpoint) |
711 return; | 711 return; |
712 | 712 |
713 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); | 713 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread()); |
714 mojo::Message message = endpoint->PopSyncMessage(message_id); | 714 mojo::Message message = endpoint->PopSyncMessage(message_id); |
715 | 715 |
716 // The message must have already been dequeued by the endpoint waking up | 716 // The message must have already been dequeued by the endpoint waking up |
717 // from a sync wait. Nothing to do. | 717 // from a sync wait. Nothing to do. |
718 if (message.IsNull()) | 718 if (message.IsNull()) |
719 return; | 719 return; |
720 | 720 |
721 mojo::InterfaceEndpointClient* client = endpoint->client(); | 721 mojo::InterfaceEndpointClient* client = endpoint->client(); |
722 if (!client) | 722 if (!client) |
723 return; | 723 return; |
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
878 Channel::Mode mode, | 878 Channel::Mode mode, |
879 Delegate* delegate, | 879 Delegate* delegate, |
880 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 880 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
881 return base::MakeUnique<MojoBootstrapImpl>( | 881 return base::MakeUnique<MojoBootstrapImpl>( |
882 std::move(handle), delegate, | 882 std::move(handle), delegate, |
883 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, | 883 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, |
884 ipc_task_runner)); | 884 ipc_task_runner)); |
885 } | 885 } |
886 | 886 |
887 } // namespace IPC | 887 } // namespace IPC |
OLD | NEW |