OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "ipc/ipc_mojo_bootstrap.h" | 5 #include "ipc/ipc_mojo_bootstrap.h" |
6 | 6 |
7 #include <stdint.h> | 7 #include <stdint.h> |
8 | 8 |
9 #include <map> | 9 #include <map> |
10 #include <memory> | 10 #include <memory> |
11 #include <utility> | 11 #include <utility> |
12 #include <vector> | 12 #include <vector> |
13 | 13 |
14 #include "base/atomicops.h" | |
15 #include "base/callback.h" | 14 #include "base/callback.h" |
16 #include "base/logging.h" | 15 #include "base/logging.h" |
17 #include "base/macros.h" | 16 #include "base/macros.h" |
18 #include "base/memory/ptr_util.h" | 17 #include "base/memory/ptr_util.h" |
19 #include "base/single_thread_task_runner.h" | 18 #include "base/single_thread_task_runner.h" |
20 #include "base/stl_util.h" | |
21 #include "base/synchronization/lock.h" | 19 #include "base/synchronization/lock.h" |
22 #include "base/threading/thread_task_runner_handle.h" | 20 #include "base/threading/thread_task_runner_handle.h" |
23 #include "build/build_config.h" | |
24 #include "ipc/ipc_message_utils.h" | |
25 #include "ipc/ipc_platform_file.h" | |
26 #include "mojo/public/cpp/bindings/associated_group.h" | 21 #include "mojo/public/cpp/bindings/associated_group.h" |
27 #include "mojo/public/cpp/bindings/associated_group_controller.h" | 22 #include "mojo/public/cpp/bindings/associated_group_controller.h" |
28 #include "mojo/public/cpp/bindings/binding.h" | |
29 #include "mojo/public/cpp/bindings/connector.h" | 23 #include "mojo/public/cpp/bindings/connector.h" |
30 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" | 24 #include "mojo/public/cpp/bindings/interface_endpoint_client.h" |
31 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" | 25 #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" |
32 #include "mojo/public/cpp/bindings/interface_id.h" | 26 #include "mojo/public/cpp/bindings/interface_id.h" |
33 #include "mojo/public/cpp/bindings/message.h" | 27 #include "mojo/public/cpp/bindings/message.h" |
34 #include "mojo/public/cpp/bindings/message_header_validator.h" | 28 #include "mojo/public/cpp/bindings/message_header_validator.h" |
35 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" | 29 #include "mojo/public/cpp/bindings/pipe_control_message_handler.h" |
36 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" | 30 #include "mojo/public/cpp/bindings/pipe_control_message_handler_delegate.h" |
37 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" | 31 #include "mojo/public/cpp/bindings/pipe_control_message_proxy.h" |
38 | 32 |
(...skipping 18 matching lines...) Expand all Loading... |
57 control_message_proxy_thunk_(this), | 51 control_message_proxy_thunk_(this), |
58 control_message_proxy_(&control_message_proxy_thunk_) { | 52 control_message_proxy_(&control_message_proxy_thunk_) { |
59 thread_checker_.DetachFromThread(); | 53 thread_checker_.DetachFromThread(); |
60 control_message_handler_.SetDescription( | 54 control_message_handler_.SetDescription( |
61 "IPC::mojom::Bootstrap [master] PipeControlMessageHandler"); | 55 "IPC::mojom::Bootstrap [master] PipeControlMessageHandler"); |
62 } | 56 } |
63 | 57 |
64 void Bind(mojo::ScopedMessagePipeHandle handle) { | 58 void Bind(mojo::ScopedMessagePipeHandle handle) { |
65 DCHECK(thread_checker_.CalledOnValidThread()); | 59 DCHECK(thread_checker_.CalledOnValidThread()); |
66 DCHECK(task_runner_->BelongsToCurrentThread()); | 60 DCHECK(task_runner_->BelongsToCurrentThread()); |
67 thread_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | |
68 base::subtle::Release_Store(&is_thread_task_runner_set_, 1); | |
69 | 61 |
70 connector_.reset(new mojo::Connector( | 62 connector_.reset(new mojo::Connector( |
71 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, | 63 std::move(handle), mojo::Connector::SINGLE_THREADED_SEND, |
72 task_runner_)); | 64 task_runner_)); |
73 connector_->set_incoming_receiver(&header_validator_); | 65 connector_->set_incoming_receiver(&header_validator_); |
74 connector_->set_connection_error_handler( | 66 connector_->set_connection_error_handler( |
75 base::Bind(&ChannelAssociatedGroupController::OnPipeError, | 67 base::Bind(&ChannelAssociatedGroupController::OnPipeError, |
76 base::Unretained(this))); | 68 base::Unretained(this))); |
77 | 69 |
78 std::vector<std::unique_ptr<mojo::Message>> outgoing_messages; | 70 std::vector<std::unique_ptr<mojo::Message>> outgoing_messages; |
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
203 DCHECK(mojo::IsValidInterfaceId(id)); | 195 DCHECK(mojo::IsValidInterfaceId(id)); |
204 | 196 |
205 base::AutoLock locker(lock_); | 197 base::AutoLock locker(lock_); |
206 DCHECK(ContainsKey(endpoints_, id)); | 198 DCHECK(ContainsKey(endpoints_, id)); |
207 | 199 |
208 Endpoint* endpoint = endpoints_[id].get(); | 200 Endpoint* endpoint = endpoints_[id].get(); |
209 endpoint->DetachClient(); | 201 endpoint->DetachClient(); |
210 } | 202 } |
211 | 203 |
212 void RaiseError() override { | 204 void RaiseError() override { |
213 if (IsRunningOnIPCThread()) { | 205 if (task_runner_->BelongsToCurrentThread()) { |
214 connector_->RaiseError(); | 206 connector_->RaiseError(); |
215 } else { | 207 } else { |
216 task_runner_->PostTask( | 208 task_runner_->PostTask( |
217 FROM_HERE, | 209 FROM_HERE, |
218 base::Bind(&ChannelAssociatedGroupController::RaiseError, this)); | 210 base::Bind(&ChannelAssociatedGroupController::RaiseError, this)); |
219 } | 211 } |
220 } | 212 } |
221 | 213 |
222 private: | 214 private: |
223 class Endpoint; | 215 class Endpoint; |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
295 | 287 |
296 // TODO(rockot): Implement sync waiting. | 288 // TODO(rockot): Implement sync waiting. |
297 NOTREACHED(); | 289 NOTREACHED(); |
298 } | 290 } |
299 | 291 |
300 bool SyncWatch(const bool* should_stop) override { | 292 bool SyncWatch(const bool* should_stop) override { |
301 DCHECK(task_runner_->BelongsToCurrentThread()); | 293 DCHECK(task_runner_->BelongsToCurrentThread()); |
302 | 294 |
303 // It's not legal to make sync calls from the master endpoint's thread, | 295 // It's not legal to make sync calls from the master endpoint's thread, |
304 // and in fact they must only happen from the proxy task runner. | 296 // and in fact they must only happen from the proxy task runner. |
305 DCHECK(!controller_->IsRunningOnIPCThread()); | 297 DCHECK(!controller_->task_runner_->BelongsToCurrentThread()); |
306 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); | 298 DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread()); |
307 | 299 |
308 // TODO(rockot): Implement sync waiting. | 300 // TODO(rockot): Implement sync waiting. |
309 NOTREACHED(); | 301 NOTREACHED(); |
310 return false; | 302 return false; |
311 } | 303 } |
312 | 304 |
313 private: | 305 private: |
314 friend class base::RefCountedThreadSafe<Endpoint>; | 306 friend class base::RefCountedThreadSafe<Endpoint>; |
315 | 307 |
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
351 Endpoint* endpoint = iter->second.get(); | 343 Endpoint* endpoint = iter->second.get(); |
352 ++iter; | 344 ++iter; |
353 | 345 |
354 DCHECK(endpoint->closed()); | 346 DCHECK(endpoint->closed()); |
355 MarkPeerClosedAndMaybeRemove(endpoint); | 347 MarkPeerClosedAndMaybeRemove(endpoint); |
356 } | 348 } |
357 | 349 |
358 DCHECK(endpoints_.empty()); | 350 DCHECK(endpoints_.empty()); |
359 } | 351 } |
360 | 352 |
361 bool IsRunningOnIPCThread() { | |
362 // |task_runner_| is always non-null but may incorrectly report that | |
363 // BelongsToCurrentThread() == false during shutdown. By the time shutdown | |
364 // occurs, |thread_task_runner_| will be non-null and is guaranteed to run | |
365 // tasks on the same thread as |task_runner_|. | |
366 base::subtle::Atomic32 has_thread_task_runner = | |
367 base::subtle::Acquire_Load(&is_thread_task_runner_set_); | |
368 if (has_thread_task_runner) | |
369 return thread_task_runner_->BelongsToCurrentThread(); | |
370 return task_runner_->BelongsToCurrentThread(); | |
371 } | |
372 | |
373 bool SendMessage(mojo::Message* message) { | 353 bool SendMessage(mojo::Message* message) { |
374 if (IsRunningOnIPCThread()) { | 354 if (task_runner_->BelongsToCurrentThread()) { |
375 DCHECK(thread_checker_.CalledOnValidThread()); | 355 DCHECK(thread_checker_.CalledOnValidThread()); |
376 if (!connector_) { | 356 if (!connector_) { |
377 // Pipe may not be bound yet, so we queue the message. | 357 // Pipe may not be bound yet, so we queue the message. |
378 std::unique_ptr<mojo::Message> queued_message(new mojo::Message); | 358 std::unique_ptr<mojo::Message> queued_message(new mojo::Message); |
379 message->MoveTo(queued_message.get()); | 359 message->MoveTo(queued_message.get()); |
380 outgoing_messages_.emplace_back(std::move(queued_message)); | 360 outgoing_messages_.emplace_back(std::move(queued_message)); |
381 return true; | 361 return true; |
382 } | 362 } |
383 return connector_->Accept(message); | 363 return connector_->Accept(message); |
384 } else { | 364 } else { |
(...skipping 220 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
605 MarkClosedAndMaybeRemove(endpoint); | 585 MarkClosedAndMaybeRemove(endpoint); |
606 control_message_proxy_.NotifyPeerEndpointClosed(id); | 586 control_message_proxy_.NotifyPeerEndpointClosed(id); |
607 return true; | 587 return true; |
608 } | 588 } |
609 | 589 |
610 // Checked in places which must be run on the master endpoint's thread. | 590 // Checked in places which must be run on the master endpoint's thread. |
611 base::ThreadChecker thread_checker_; | 591 base::ThreadChecker thread_checker_; |
612 | 592 |
613 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; | 593 scoped_refptr<base::SingleThreadTaskRunner> task_runner_; |
614 | 594 |
615 // A TaskRunner that runs tasks on the same thread as |task_runner_| but which | |
616 // is used exclusively to do thread safety checking. This is an unfortunate | |
617 // consequence of bad interaction between some TaskRunner implementations and | |
618 // MessageLoop destruction which may cause the user-provided |task_runner_| to | |
619 // incorrectly report that BelongsToCurrentThread() == false during shutdown. | |
620 scoped_refptr<base::SingleThreadTaskRunner> thread_task_runner_; | |
621 base::subtle::Atomic32 is_thread_task_runner_set_ = 0; | |
622 | |
623 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; | 595 scoped_refptr<base::SingleThreadTaskRunner> proxy_task_runner_; |
624 const bool set_interface_id_namespace_bit_; | 596 const bool set_interface_id_namespace_bit_; |
625 std::unique_ptr<mojo::Connector> connector_; | 597 std::unique_ptr<mojo::Connector> connector_; |
626 mojo::MessageHeaderValidator header_validator_; | 598 mojo::MessageHeaderValidator header_validator_; |
627 mojo::PipeControlMessageHandler control_message_handler_; | 599 mojo::PipeControlMessageHandler control_message_handler_; |
628 ControlMessageProxyThunk control_message_proxy_thunk_; | 600 ControlMessageProxyThunk control_message_proxy_thunk_; |
629 mojo::PipeControlMessageProxy control_message_proxy_; | 601 mojo::PipeControlMessageProxy control_message_proxy_; |
630 | 602 |
631 // Outgoing messages that were sent before this controller was bound to a | 603 // Outgoing messages that were sent before this controller was bound to a |
632 // real message pipe. | 604 // real message pipe. |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
694 Channel::Mode mode, | 666 Channel::Mode mode, |
695 Delegate* delegate, | 667 Delegate* delegate, |
696 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { | 668 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { |
697 return base::MakeUnique<MojoBootstrapImpl>( | 669 return base::MakeUnique<MojoBootstrapImpl>( |
698 std::move(handle), delegate, | 670 std::move(handle), delegate, |
699 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, | 671 new ChannelAssociatedGroupController(mode == Channel::MODE_SERVER, |
700 ipc_task_runner)); | 672 ipc_task_runner)); |
701 } | 673 } |
702 | 674 |
703 } // namespace IPC | 675 } // namespace IPC |
OLD | NEW |