Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(89)

Side by Side Diff: ipc/ipc_mojo_bootstrap.cc

Issue 2177933004: Remove thread TaskRunner hack from ChannelAssociatedGroupController (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@fix-browser-thread-task-runner
Patch Set: rebase Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_mojo_bootstrap.h" 5 #include "ipc/ipc_mojo_bootstrap.h"
6 6
7 #include <stdint.h> 7 #include <stdint.h>
8 8
9 #include <map> 9 #include <map>
10 #include <memory> 10 #include <memory>
11 #include <utility> 11 #include <utility>
12 #include <vector> 12 #include <vector>
13 13
14 #include "base/atomicops.h"
15 #include "base/callback.h" 14 #include "base/callback.h"
16 #include "base/logging.h" 15 #include "base/logging.h"
17 #include "base/macros.h" 16 #include "base/macros.h"
18 #include "base/memory/ptr_util.h" 17 #include "base/memory/ptr_util.h"
19 #include "base/single_thread_task_runner.h" 18 #include "base/single_thread_task_runner.h"
20 #include "base/stl_util.h"
21 #include "base/synchronization/lock.h" 19 #include "base/synchronization/lock.h"
22 #include "base/threading/thread_task_runner_handle.h" 20 #include "base/threading/thread_task_runner_handle.h"
23 #include "build/build_config.h"
24 #include "ipc/ipc_message_utils.h"
25 #include "ipc/ipc_platform_file.h"
26 #include "mojo/public/cpp/bindings/associated_group.h" 21 #include "mojo/public/cpp/bindings/associated_group.h"
27 #include "mojo/public/cpp/bindings/associated_group_controller.h" 22 #include "mojo/public/cpp/bindings/associated_group_controller.h"
28 #include "mojo/public/cpp/bindings/binding.h"
29 #include "mojo/public/cpp/bindings/connector.h" 23 #include "mojo/public/cpp/bindings/connector.h"
30 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" 24 #include "mojo/public/cpp/bindings/interface_endpoint_client.h"
31 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" 25 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h"
32 #include "mojo/public/cpp/bindings/interface_id.h" 26 #include "mojo/public/cpp/bindings/interface_id.h"
33 #include "mojo/public/cpp/bindings/message.h" 27 #include "mojo/public/cpp/bindings/message.h"
34 #include "mojo/public/cpp/bindings/message_header_validator.h" 28 #include "mojo/public/cpp/bindings/message_header_validator.h"
35 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" 29 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h"
36 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" 30 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h"
37 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" 31 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h"
38 32
(...skipping 18 matching lines...) Expand all
57 control_message_proxy_thunk_(this), 51 control_message_proxy_thunk_(this),
58 control_message_proxy_(&control_message_proxy_thunk_) { 52 control_message_proxy_(&control_message_proxy_thunk_) {
59 thread_checker_.DetachFromThread(); 53 thread_checker_.DetachFromThread();
60 control_message_handler_.SetDescription( 54 control_message_handler_.SetDescription(
61 "IPC::mojom::Bootstrap [master] PipeControlMessageHandler"); 55 "IPC::mojom::Bootstrap [master] PipeControlMessageHandler");
62 } 56 }
63 57
64 void Bind(mojo::ScopedMessagePipeHandle handle) { 58 void Bind(mojo::ScopedMessagePipeHandle handle) {
65 DCHECK(thread_checker_.CalledOnValidThread()); 59 DCHECK(thread_checker_.CalledOnValidThread());
66 DCHECK(task_runner_->BelongsToCurrentThread()); 60 DCHECK(task_runner_->BelongsToCurrentThread());
67 thread_task_runner_ = base::ThreadTaskRunnerHandle::Get();
68 base::subtle::Release_Store(&is_thread_task_runner_set_, 1);
69 61
70 connector_.reset(new mojo::Connector( 62 connector_.reset(new mojo::Connector(
71 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, 63 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
72 task_runner_)); 64 task_runner_));
73 connector_->set_incoming_receiver(&header_validator_); 65 connector_->set_incoming_receiver(&header_validator_);
74 connector_->set_connection_error_handler( 66 connector_->set_connection_error_handler(
75 base::Bind(&ChannelAssociatedGroupController::OnPipeError, 67 base::Bind(&ChannelAssociatedGroupController::OnPipeError,
76 base::Unretained(this))); 68 base::Unretained(this)));
77 69
78 std::vector<std::unique_ptr<mojo::Message>> outgoing_messages; 70 std::vector<std::unique_ptr<mojo::Message>> outgoing_messages;
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
203 DCHECK(mojo::IsValidInterfaceId(id)); 195 DCHECK(mojo::IsValidInterfaceId(id));
204 196
205 base::AutoLock locker(lock_); 197 base::AutoLock locker(lock_);
206 DCHECK(ContainsKey(endpoints_, id)); 198 DCHECK(ContainsKey(endpoints_, id));
207 199
208 Endpoint* endpoint = endpoints_[id].get(); 200 Endpoint* endpoint = endpoints_[id].get();
209 endpoint->DetachClient(); 201 endpoint->DetachClient();
210 } 202 }
211 203
212 void RaiseError() override { 204 void RaiseError() override {
213 if (IsRunningOnIPCThread()) { 205 if (task_runner_->BelongsToCurrentThread()) {
214 connector_->RaiseError(); 206 connector_->RaiseError();
215 } else { 207 } else {
216 task_runner_->PostTask( 208 task_runner_->PostTask(
217 FROM_HERE, 209 FROM_HERE,
218 base::Bind(&ChannelAssociatedGroupController::RaiseError, this)); 210 base::Bind(&ChannelAssociatedGroupController::RaiseError, this));
219 } 211 }
220 } 212 }
221 213
222 private: 214 private:
223 class Endpoint; 215 class Endpoint;
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
295 287
296 // TODO(rockot): Implement sync waiting. 288 // TODO(rockot): Implement sync waiting.
297 NOTREACHED(); 289 NOTREACHED();
298 } 290 }
299 291
300 bool SyncWatch(const bool* should_stop) override { 292 bool SyncWatch(const bool* should_stop) override {
301 DCHECK(task_runner_->BelongsToCurrentThread()); 293 DCHECK(task_runner_->BelongsToCurrentThread());
302 294
303 // It's not legal to make sync calls from the master endpoint's thread, 295 // It's not legal to make sync calls from the master endpoint's thread,
304 // and in fact they must only happen from the proxy task runner. 296 // and in fact they must only happen from the proxy task runner.
305 DCHECK(!controller_->IsRunningOnIPCThread()); 297 DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
306 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); 298 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());
307 299
308 // TODO(rockot): Implement sync waiting. 300 // TODO(rockot): Implement sync waiting.
309 NOTREACHED(); 301 NOTREACHED();
310 return false; 302 return false;
311 } 303 }
312 304
313 private: 305 private:
314 friend class base::RefCountedThreadSafe<Endpoint>; 306 friend class base::RefCountedThreadSafe<Endpoint>;
315 307
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 Endpoint* endpoint = iter->second.get(); 343 Endpoint* endpoint = iter->second.get();
352 ++iter; 344 ++iter;
353 345
354 DCHECK(endpoint->closed()); 346 DCHECK(endpoint->closed());
355 MarkPeerClosedAndMaybeRemove(endpoint); 347 MarkPeerClosedAndMaybeRemove(endpoint);
356 } 348 }
357 349
358 DCHECK(endpoints_.empty()); 350 DCHECK(endpoints_.empty());
359 } 351 }
360 352
361 bool IsRunningOnIPCThread() {
362 // |task_runner_| is always non-null but may incorrectly report that
363 // BelongsToCurrentThread() == false during shutdown. By the time shutdown
364 // occurs, |thread_task_runner_| will be non-null and is guaranteed to run
365 // tasks on the same thread as |task_runner_|.
366 base::subtle::Atomic32 has_thread_task_runner =
367 base::subtle::Acquire_Load(&is_thread_task_runner_set_);
368 if (has_thread_task_runner)
369 return thread_task_runner_->BelongsToCurrentThread();
370 return task_runner_->BelongsToCurrentThread();
371 }
372
373 bool SendMessage(mojo::Message* message) { 353 bool SendMessage(mojo::Message* message) {
374 if (IsRunningOnIPCThread()) { 354 if (task_runner_->BelongsToCurrentThread()) {
375 DCHECK(thread_checker_.CalledOnValidThread()); 355 DCHECK(thread_checker_.CalledOnValidThread());
376 if (!connector_) { 356 if (!connector_) {
377 // Pipe may not be bound yet, so we queue the message. 357 // Pipe may not be bound yet, so we queue the message.
378 std::unique_ptr<mojo::Message> queued_message(new mojo::Message); 358 std::unique_ptr<mojo::Message> queued_message(new mojo::Message);
379 message->MoveTo(queued_message.get()); 359 message->MoveTo(queued_message.get());
380 outgoing_messages_.emplace_back(std::move(queued_message)); 360 outgoing_messages_.emplace_back(std::move(queued_message));
381 return true; 361 return true;
382 } 362 }
383 return connector_->Accept(message); 363 return connector_->Accept(message);
384 } else { 364 } else {
(...skipping 220 matching lines...) Expand 10 before | Expand all | Expand 10 after
605 MarkClosedAndMaybeRemove(endpoint); 585 MarkClosedAndMaybeRemove(endpoint);
606 control_message_proxy_.NotifyPeerEndpointClosed(id); 586 control_message_proxy_.NotifyPeerEndpointClosed(id);
607 return true; 587 return true;
608 } 588 }
609 589
610 // Checked in places which must be run on the master endpoint's thread. 590 // Checked in places which must be run on the master endpoint's thread.
611 base::ThreadChecker thread_checker_; 591 base::ThreadChecker thread_checker_;
612 592
613 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; 593 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
614 594
615 // A TaskRunner that runs tasks on the same thread as |task_runner_| but which
616 // is used exclusively to do thread safety checking. This is an unfortunate
617 // consequence of bad interaction between some TaskRunner implementations and
618 // MessageLoop destruction which may cause the user-provided |task_runner_| to
619 // incorrectly report that BelongsToCurrentThread() == false during shutdown.
620 scoped_refptr<base::SingleThreadTaskRunner> thread_task_runner_;
621 base::subtle::Atomic32 is_thread_task_runner_set_ = 0;
622
623 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; 595 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_;
624 const bool set_interface_id_namespace_bit_; 596 const bool set_interface_id_namespace_bit_;
625 std::unique_ptr<mojo::Connector> connector_; 597 std::unique_ptr<mojo::Connector> connector_;
626 mojo::MessageHeaderValidator header_validator_; 598 mojo::MessageHeaderValidator header_validator_;
627 mojo::PipeControlMessageHandler control_message_handler_; 599 mojo::PipeControlMessageHandler control_message_handler_;
628 ControlMessageProxyThunk control_message_proxy_thunk_; 600 ControlMessageProxyThunk control_message_proxy_thunk_;
629 mojo::PipeControlMessageProxy control_message_proxy_; 601 mojo::PipeControlMessageProxy control_message_proxy_;
630 602
631 // Outgoing messages that were sent before this controller was bound to a 603 // Outgoing messages that were sent before this controller was bound to a
632 // real message pipe. 604 // real message pipe.
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
694 Channel::Mode mode, 666 Channel::Mode mode,
695 Delegate* delegate, 667 Delegate* delegate,
696 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 668 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
697 return base::MakeUnique<MojoBootstrapImpl>( 669 return base::MakeUnique<MojoBootstrapImpl>(
698 std::move(handle), delegate, 670 std::move(handle), delegate,
699 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, 671 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER,
700 ipc_task_runner)); 672 ipc_task_runner));
701 } 673 }
702 674
703 } // namespace IPC 675 } // namespace IPC
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698