Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2608163003: Change single-interface mojo bindings to use SequencedTaskRunner. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « device/wake_lock/wake_lock_service_context.h ('k') | media/base/android/media_codec_loop.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_mojo_bootstrap.h" 5 #include "ipc/ipc_mojo_bootstrap.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <map> 9 #include <map>
10 #include <memory> 10 #include <memory>
(...skipping 175 matching lines...) Expand 10 before | Expand all | Expand 10 after
186 MarkClosedAndMaybeRemove(endpoint); 186 MarkClosedAndMaybeRemove(endpoint);
187 187
188 base::AutoUnlock unlocker(lock_); 188 base::AutoUnlock unlocker(lock_);
189 if (!mojo::IsMasterInterfaceId(id) || reason) 189 if (!mojo::IsMasterInterfaceId(id) || reason)
190 control_message_proxy_.NotifyPeerEndpointClosed(id, reason); 190 control_message_proxy_.NotifyPeerEndpointClosed(id, reason);
191 } 191 }
192 192
193 mojo::InterfaceEndpointController* AttachEndpointClient( 193 mojo::InterfaceEndpointController* AttachEndpointClient(
194 const mojo::ScopedInterfaceEndpointHandle& handle, 194 const mojo::ScopedInterfaceEndpointHandle& handle,
195 mojo::InterfaceEndpointClient* client, 195 mojo::InterfaceEndpointClient* client,
196 scoped_refptr<base::SingleThreadTaskRunner> runner) override { 196 scoped_refptr<base::SequencedTaskRunner> runner) override {
197 const mojo::InterfaceId id = handle.id(); 197 const mojo::InterfaceId id = handle.id();
198 198
199 DCHECK(mojo::IsValidInterfaceId(id)); 199 DCHECK(mojo::IsValidInterfaceId(id));
200 DCHECK(client); 200 DCHECK(client);
201 201
202 base::AutoLock locker(lock_); 202 base::AutoLock locker(lock_);
203 DCHECK(ContainsKey(endpoints_, id)); 203 DCHECK(ContainsKey(endpoints_, id));
204 204
205 Endpoint* endpoint = endpoints_[id].get(); 205 Endpoint* endpoint = endpoints_[id].get();
206 endpoint->AttachClient(client, std::move(runner)); 206 endpoint->AttachClient(client, std::move(runner));
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
270 270
271 const base::Optional<mojo::DisconnectReason>& disconnect_reason() const { 271 const base::Optional<mojo::DisconnectReason>& disconnect_reason() const {
272 return disconnect_reason_; 272 return disconnect_reason_;
273 } 273 }
274 274
275 void set_disconnect_reason( 275 void set_disconnect_reason(
276 const base::Optional<mojo::DisconnectReason>& disconnect_reason) { 276 const base::Optional<mojo::DisconnectReason>& disconnect_reason) {
277 disconnect_reason_ = disconnect_reason; 277 disconnect_reason_ = disconnect_reason;
278 } 278 }
279 279
280 base::SingleThreadTaskRunner* task_runner() const { 280 base::SequencedTaskRunner* task_runner() const {
281 return task_runner_.get(); 281 return task_runner_.get();
282 } 282 }
283 283
284 mojo::InterfaceEndpointClient* client() const { 284 mojo::InterfaceEndpointClient* client() const {
285 controller_->lock_.AssertAcquired(); 285 controller_->lock_.AssertAcquired();
286 return client_; 286 return client_;
287 } 287 }
288 288
289 void AttachClient(mojo::InterfaceEndpointClient* client, 289 void AttachClient(mojo::InterfaceEndpointClient* client,
290 scoped_refptr<base::SingleThreadTaskRunner> runner) { 290 scoped_refptr<base::SequencedTaskRunner> runner) {
291 controller_->lock_.AssertAcquired(); 291 controller_->lock_.AssertAcquired();
292 DCHECK(!client_); 292 DCHECK(!client_);
293 DCHECK(!closed_); 293 DCHECK(!closed_);
294 DCHECK(runner->BelongsToCurrentThread()); 294 DCHECK(runner->RunsTasksOnCurrentThread());
295 295
296 task_runner_ = std::move(runner); 296 task_runner_ = std::move(runner);
297 client_ = client; 297 client_ = client;
298 } 298 }
299 299
300 void DetachClient() { 300 void DetachClient() {
301 controller_->lock_.AssertAcquired(); 301 controller_->lock_.AssertAcquired();
302 DCHECK(client_); 302 DCHECK(client_);
303 DCHECK(task_runner_->BelongsToCurrentThread()); 303 DCHECK(task_runner_->RunsTasksOnCurrentThread());
304 DCHECK(!closed_); 304 DCHECK(!closed_);
305 305
306 task_runner_ = nullptr; 306 task_runner_ = nullptr;
307 client_ = nullptr; 307 client_ = nullptr;
308 sync_watcher_.reset(); 308 sync_watcher_.reset();
309 } 309 }
310 310
311 uint32_t EnqueueSyncMessage(mojo::Message message) { 311 uint32_t EnqueueSyncMessage(mojo::Message message) {
312 controller_->lock_.AssertAcquired(); 312 controller_->lock_.AssertAcquired();
313 uint32_t id = GenerateSyncMessageId(); 313 uint32_t id = GenerateSyncMessageId();
(...skipping 12 matching lines...) Expand all
326 controller_->lock_.AssertAcquired(); 326 controller_->lock_.AssertAcquired();
327 if (sync_messages_.empty() || sync_messages_.front().first != id) 327 if (sync_messages_.empty() || sync_messages_.front().first != id)
328 return mojo::Message(); 328 return mojo::Message();
329 mojo::Message message = std::move(sync_messages_.front().second); 329 mojo::Message message = std::move(sync_messages_.front().second);
330 sync_messages_.pop(); 330 sync_messages_.pop();
331 return message; 331 return message;
332 } 332 }
333 333
334 // mojo::InterfaceEndpointController: 334 // mojo::InterfaceEndpointController:
335 bool SendMessage(mojo::Message* message) override { 335 bool SendMessage(mojo::Message* message) override {
336 DCHECK(task_runner_->BelongsToCurrentThread()); 336 DCHECK(task_runner_->RunsTasksOnCurrentThread());
337 message->set_interface_id(id_); 337 message->set_interface_id(id_);
338 return controller_->SendMessage(message); 338 return controller_->SendMessage(message);
339 } 339 }
340 340
341 void AllowWokenUpBySyncWatchOnSameThread() override { 341 void AllowWokenUpBySyncWatchOnSameThread() override {
342 DCHECK(task_runner_->BelongsToCurrentThread()); 342 DCHECK(task_runner_->RunsTasksOnCurrentThread());
343 343
344 EnsureSyncWatcherExists(); 344 EnsureSyncWatcherExists();
345 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); 345 sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
346 } 346 }
347 347
348 bool SyncWatch(const bool* should_stop) override { 348 bool SyncWatch(const bool* should_stop) override {
349 DCHECK(task_runner_->BelongsToCurrentThread()); 349 DCHECK(task_runner_->RunsTasksOnCurrentThread());
350 350
351 // It's not legal to make sync calls from the master endpoint's thread, 351 // It's not legal to make sync calls from the master endpoint's thread,
352 // and in fact they must only happen from the proxy task runner. 352 // and in fact they must only happen from the proxy task runner.
353 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); 353 DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
354 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); 354 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
355 355
356 EnsureSyncWatcherExists(); 356 EnsureSyncWatcherExists();
357 return sync_watcher_->SyncWatch(should_stop); 357 return sync_watcher_->SyncWatch(should_stop);
358 } 358 }
359 359
360 private: 360 private:
361 friend class base::RefCountedThreadSafe<Endpoint>; 361 friend class base::RefCountedThreadSafe<Endpoint>;
362 362
363 ~Endpoint() override { 363 ~Endpoint() override {
364 controller_->lock_.AssertAcquired(); 364 controller_->lock_.AssertAcquired();
365 DCHECK(!client_); 365 DCHECK(!client_);
366 DCHECK(closed_); 366 DCHECK(closed_);
367 DCHECK(peer_closed_); 367 DCHECK(peer_closed_);
368 DCHECK(!sync_watcher_); 368 DCHECK(!sync_watcher_);
369 } 369 }
370 370
371 void OnSyncMessageEventHandleReady(MojoResult result) { 371 void OnSyncMessageEventHandleReady(MojoResult result) {
372 DCHECK(task_runner_->BelongsToCurrentThread()); 372 DCHECK(task_runner_->RunsTasksOnCurrentThread());
373 373
374 scoped_refptr<Endpoint> keepalive(this); 374 scoped_refptr<Endpoint> keepalive(this);
375 scoped_refptr<AssociatedGroupController> controller_keepalive( 375 scoped_refptr<AssociatedGroupController> controller_keepalive(
376 controller_); 376 controller_);
377 377
378 bool reset_sync_watcher = false; 378 bool reset_sync_watcher = false;
379 { 379 {
380 base::AutoLock locker(controller_->lock_); 380 base::AutoLock locker(controller_->lock_);
381 bool more_to_process = false; 381 bool more_to_process = false;
382 if (!sync_messages_.empty()) { 382 if (!sync_messages_.empty()) {
(...skipping 24 matching lines...) Expand all
407 407
408 if (reset_sync_watcher) { 408 if (reset_sync_watcher) {
409 // If a SyncWatch() call (or multiple ones) of this interface endpoint 409 // If a SyncWatch() call (or multiple ones) of this interface endpoint
410 // is on the call stack, resetting the sync watcher will allow it to 410 // is on the call stack, resetting the sync watcher will allow it to
411 // exit when the call stack unwinds to that frame. 411 // exit when the call stack unwinds to that frame.
412 sync_watcher_.reset(); 412 sync_watcher_.reset();
413 } 413 }
414 } 414 }
415 415
416 void EnsureSyncWatcherExists() { 416 void EnsureSyncWatcherExists() {
417 DCHECK(task_runner_->BelongsToCurrentThread()); 417 DCHECK(task_runner_->RunsTasksOnCurrentThread());
418 if (sync_watcher_) 418 if (sync_watcher_)
419 return; 419 return;
420 420
421 { 421 {
422 base::AutoLock locker(controller_->lock_); 422 base::AutoLock locker(controller_->lock_);
423 EnsureSyncMessageEventExists(); 423 EnsureSyncMessageEventExists();
424 if (!sync_messages_.empty()) 424 if (!sync_messages_.empty())
425 SignalSyncMessageEvent(); 425 SignalSyncMessageEvent();
426 } 426 }
427 427
(...skipping 16 matching lines...) Expand all
444 return id; 444 return id;
445 } 445 }
446 446
447 ChannelAssociatedGroupController* const controller_; 447 ChannelAssociatedGroupController* const controller_;
448 const mojo::InterfaceId id_; 448 const mojo::InterfaceId id_;
449 449
450 bool closed_ = false; 450 bool closed_ = false;
451 bool peer_closed_ = false; 451 bool peer_closed_ = false;
452 base::Optional<mojo::DisconnectReason> disconnect_reason_; 452 base::Optional<mojo::DisconnectReason> disconnect_reason_;
453 mojo::InterfaceEndpointClient* client_ = nullptr; 453 mojo::InterfaceEndpointClient* client_ = nullptr;
454 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 454 scoped_refptr<base::SequencedTaskRunner> task_runner_;
455 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_; 455 std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
456 std::unique_ptr<MojoEvent> sync_message_event_; 456 std::unique_ptr<MojoEvent> sync_message_event_;
457 std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_; 457 std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_;
458 uint32_t next_sync_message_id_ = 0; 458 uint32_t next_sync_message_id_ = 0;
459 459
460 DISALLOW_COPY_AND_ASSIGN(Endpoint); 460 DISALLOW_COPY_AND_ASSIGN(Endpoint);
461 }; 461 };
462 462
463 class ControlMessageProxyThunk : public MessageReceiver { 463 class ControlMessageProxyThunk : public MessageReceiver {
464 public: 464 public:
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
552 // Because a notification may in turn detach any endpoint, we have to 552 // Because a notification may in turn detach any endpoint, we have to
553 // check each client again here. 553 // check each client again here.
554 if (endpoint->client()) 554 if (endpoint->client())
555 NotifyEndpointOfError(endpoint.get(), false /* force_async */); 555 NotifyEndpointOfError(endpoint.get(), false /* force_async */);
556 } 556 }
557 } 557 }
558 558
559 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) { 559 void NotifyEndpointOfError(Endpoint* endpoint, bool force_async) {
560 lock_.AssertAcquired(); 560 lock_.AssertAcquired();
561 DCHECK(endpoint->task_runner() && endpoint->client()); 561 DCHECK(endpoint->task_runner() && endpoint->client());
562 if (endpoint->task_runner()->BelongsToCurrentThread() && !force_async) { 562 if (endpoint->task_runner()->RunsTasksOnCurrentThread() && !force_async) {
563 mojo::InterfaceEndpointClient* client = endpoint->client(); 563 mojo::InterfaceEndpointClient* client = endpoint->client();
564 base::Optional<mojo::DisconnectReason> reason( 564 base::Optional<mojo::DisconnectReason> reason(
565 endpoint->disconnect_reason()); 565 endpoint->disconnect_reason());
566 566
567 base::AutoUnlock unlocker(lock_); 567 base::AutoUnlock unlocker(lock_);
568 client->NotifyError(reason); 568 client->NotifyError(reason);
569 } else { 569 } else {
570 endpoint->task_runner()->PostTask( 570 endpoint->task_runner()->PostTask(
571 FROM_HERE, 571 FROM_HERE,
572 base::Bind(&ChannelAssociatedGroupController 572 base::Bind(&ChannelAssociatedGroupController
573 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(), 573 ::NotifyEndpointOfErrorOnEndpointThread, this, endpoint->id(),
574 endpoint)); 574 endpoint));
575 } 575 }
576 } 576 }
577 577
578 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id, 578 void NotifyEndpointOfErrorOnEndpointThread(mojo::InterfaceId id,
579 Endpoint* endpoint) { 579 Endpoint* endpoint) {
580 base::AutoLock locker(lock_); 580 base::AutoLock locker(lock_);
581 auto iter = endpoints_.find(id); 581 auto iter = endpoints_.find(id);
582 if (iter == endpoints_.end() || iter->second.get() != endpoint) 582 if (iter == endpoints_.end() || iter->second.get() != endpoint)
583 return; 583 return;
584 if (!endpoint->client()) 584 if (!endpoint->client())
585 return; 585 return;
586 586
587 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 587 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread());
588 NotifyEndpointOfError(endpoint, false /* force_async */); 588 NotifyEndpointOfError(endpoint, false /* force_async */);
589 } 589 }
590 590
591 void MarkClosedAndMaybeRemove(Endpoint* endpoint) { 591 void MarkClosedAndMaybeRemove(Endpoint* endpoint) {
592 lock_.AssertAcquired(); 592 lock_.AssertAcquired();
593 endpoint->set_closed(); 593 endpoint->set_closed();
594 if (endpoint->closed() && endpoint->peer_closed()) 594 if (endpoint->closed() && endpoint->peer_closed())
595 endpoints_.erase(endpoint->id()); 595 endpoints_.erase(endpoint->id());
596 } 596 }
597 597
(...skipping 27 matching lines...) Expand all
625 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message)) 625 if (mojo::PipeControlMessageHandler::IsPipeControlMessage(message))
626 return control_message_handler_.Accept(message); 626 return control_message_handler_.Accept(message);
627 627
628 mojo::InterfaceId id = message->interface_id(); 628 mojo::InterfaceId id = message->interface_id();
629 DCHECK(mojo::IsValidInterfaceId(id)); 629 DCHECK(mojo::IsValidInterfaceId(id));
630 630
631 base::AutoLock locker(lock_); 631 base::AutoLock locker(lock_);
632 Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */); 632 Endpoint* endpoint = GetEndpointForDispatch(id, true /* create */);
633 mojo::InterfaceEndpointClient* client = 633 mojo::InterfaceEndpointClient* client =
634 endpoint ? endpoint->client() : nullptr; 634 endpoint ? endpoint->client() : nullptr;
635 if (!client || !endpoint->task_runner()->BelongsToCurrentThread()) { 635 if (!client || !endpoint->task_runner()->RunsTasksOnCurrentThread()) {
636 // No client has been bound yet or the client runs tasks on another 636 // No client has been bound yet or the client runs tasks on another
637 // thread. We assume the other thread must always be the one on which 637 // thread. We assume the other thread must always be the one on which
638 // |proxy_task_runner_| runs tasks, since that's the only valid scenario. 638 // |proxy_task_runner_| runs tasks, since that's the only valid scenario.
639 // 639 //
640 // If the client is not yet bound, it must be bound by the time this task 640 // If the client is not yet bound, it must be bound by the time this task
641 // runs or else it's programmer error. 641 // runs or else it's programmer error.
642 DCHECK(proxy_task_runner_); 642 DCHECK(proxy_task_runner_);
643 643
644 if (message->has_flag(mojo::Message::kFlagIsSync)) { 644 if (message->has_flag(mojo::Message::kFlagIsSync)) {
645 // Sync messages may need to be handled by the endpoint if it's blocking 645 // Sync messages may need to be handled by the endpoint if it's blocking
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
679 679
680 base::AutoLock locker(lock_); 680 base::AutoLock locker(lock_);
681 Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */); 681 Endpoint* endpoint = GetEndpointForDispatch(id, false /* create */);
682 if (!endpoint) 682 if (!endpoint)
683 return; 683 return;
684 684
685 mojo::InterfaceEndpointClient* client = endpoint->client(); 685 mojo::InterfaceEndpointClient* client = endpoint->client();
686 if (!client) 686 if (!client)
687 return; 687 return;
688 688
689 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 689 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread());
690 690
691 // Sync messages should never make their way to this method. 691 // Sync messages should never make their way to this method.
692 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); 692 DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));
693 693
694 bool result = false; 694 bool result = false;
695 { 695 {
696 base::AutoUnlock unlocker(lock_); 696 base::AutoUnlock unlocker(lock_);
697 result = client->HandleIncomingMessage(&message); 697 result = client->HandleIncomingMessage(&message);
698 } 698 }
699 699
700 if (!result) 700 if (!result)
701 RaiseError(); 701 RaiseError();
702 } 702 }
703 703
704 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) { 704 void AcceptSyncMessage(mojo::InterfaceId interface_id, uint32_t message_id) {
705 DCHECK(proxy_task_runner_->BelongsToCurrentThread()); 705 DCHECK(proxy_task_runner_->BelongsToCurrentThread());
706 706
707 base::AutoLock locker(lock_); 707 base::AutoLock locker(lock_);
708 Endpoint* endpoint = 708 Endpoint* endpoint =
709 GetEndpointForDispatch(interface_id, false /* create */); 709 GetEndpointForDispatch(interface_id, false /* create */);
710 if (!endpoint) 710 if (!endpoint)
711 return; 711 return;
712 712
713 DCHECK(endpoint->task_runner()->BelongsToCurrentThread()); 713 DCHECK(endpoint->task_runner()->RunsTasksOnCurrentThread());
714 mojo::Message message = endpoint->PopSyncMessage(message_id); 714 mojo::Message message = endpoint->PopSyncMessage(message_id);
715 715
716 // The message must have already been dequeued by the endpoint waking up 716 // The message must have already been dequeued by the endpoint waking up
717 // from a sync wait. Nothing to do. 717 // from a sync wait. Nothing to do.
718 if (message.IsNull()) 718 if (message.IsNull())
719 return; 719 return;
720 720
721 mojo::InterfaceEndpointClient* client = endpoint->client(); 721 mojo::InterfaceEndpointClient* client = endpoint->client();
722 if (!client) 722 if (!client)
723 return; 723 return;
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after
878 Channel::Mode mode, 878 Channel::Mode mode,
879 Delegate* delegate, 879 Delegate* delegate,
880 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 880 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
881 return base::MakeUnique<MojoBootstrapImpl>( 881 return base::MakeUnique<MojoBootstrapImpl>(
882 std::move(handle), delegate, 882 std::move(handle), delegate,
883 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, 883 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
884 ipc_task_runner)); 884 ipc_task_runner));
885 } 885 }
886 886
887 } // namespace IPC 887 } // namespace IPC
OLDNEW
« no previous file with comments | « device/wake_lock/wake_lock_service_context.h ('k') | media/base/android/media_codec_loop.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698