Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(23)

Side by Side Diff: ipc/ipc_channel_mojo.cc

Issue 2163633003: Support early associated interface binding on ChannelMojo (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@explicit-channel-ipc-task-runner
Patch Set: . Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/ipc_channel_mojo.h" 5 #include "ipc/ipc_channel_mojo.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 #include <stdint.h> 8 #include <stdint.h>
9 9
10 #include <memory> 10 #include <memory>
11 #include <utility> 11 #include <utility>
12 12
13 #include "base/bind.h" 13 #include "base/bind.h"
14 #include "base/bind_helpers.h" 14 #include "base/bind_helpers.h"
15 #include "base/command_line.h" 15 #include "base/command_line.h"
16 #include "base/lazy_instance.h" 16 #include "base/lazy_instance.h"
17 #include "base/macros.h" 17 #include "base/macros.h"
18 #include "base/memory/ptr_util.h" 18 #include "base/memory/ptr_util.h"
19 #include "base/process/process_handle.h"
19 #include "base/threading/thread_task_runner_handle.h" 20 #include "base/threading/thread_task_runner_handle.h"
20 #include "build/build_config.h" 21 #include "build/build_config.h"
21 #include "ipc/ipc_listener.h" 22 #include "ipc/ipc_listener.h"
22 #include "ipc/ipc_logging.h" 23 #include "ipc/ipc_logging.h"
23 #include "ipc/ipc_message_attachment_set.h" 24 #include "ipc/ipc_message_attachment_set.h"
24 #include "ipc/ipc_message_macros.h" 25 #include "ipc/ipc_message_macros.h"
25 #include "ipc/ipc_mojo_bootstrap.h" 26 #include "ipc/ipc_mojo_bootstrap.h"
26 #include "ipc/ipc_mojo_handle_attachment.h" 27 #include "ipc/ipc_mojo_handle_attachment.h"
27 #include "mojo/public/cpp/bindings/binding.h" 28 #include "mojo/public/cpp/bindings/binding.h"
28 #include "mojo/public/cpp/system/platform_handle.h" 29 #include "mojo/public/cpp/system/platform_handle.h"
(...skipping 233 matching lines...) Expand 10 before | Expand all | Expand 10 after
262 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) { 263 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) {
263 return base::WrapUnique(new MojoChannelFactory( 264 return base::WrapUnique(new MojoChannelFactory(
264 std::move(handle), Channel::MODE_CLIENT, ipc_task_runner)); 265 std::move(handle), Channel::MODE_CLIENT, ipc_task_runner));
265 } 266 }
266 267
267 ChannelMojo::ChannelMojo( 268 ChannelMojo::ChannelMojo(
268 mojo::ScopedMessagePipeHandle handle, 269 mojo::ScopedMessagePipeHandle handle,
269 Mode mode, 270 Mode mode,
270 Listener* listener, 271 Listener* listener,
271 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner) 272 const scoped_refptr<base::SingleThreadTaskRunner>& ipc_task_runner)
272 : pipe_(handle.get()), 273 : pipe_(handle.get()), listener_(listener), weak_factory_(this) {
273 listener_(listener),
274 waiting_connect_(true),
275 weak_factory_(this) {
276 // Create MojoBootstrap after all members are set as it touches 274 // Create MojoBootstrap after all members are set as it touches
277 // ChannelMojo from a different thread. 275 // ChannelMojo from a different thread.
278 bootstrap_ = MojoBootstrap::Create(std::move(handle), mode, this); 276 bootstrap_ =
277 MojoBootstrap::Create(std::move(handle), mode, this, ipc_task_runner);
279 } 278 }
280 279
281 ChannelMojo::~ChannelMojo() { 280 ChannelMojo::~ChannelMojo() {
282 Close(); 281 Close();
283 } 282 }
284 283
285 bool ChannelMojo::Connect() { 284 bool ChannelMojo::Connect() {
286 WillConnect(); 285 WillConnect();
287 { 286 {
288 base::AutoLock lock(lock_); 287 base::AutoLock lock(lock_);
289 DCHECK(!task_runner_); 288 DCHECK(!task_runner_);
290 task_runner_ = base::ThreadTaskRunnerHandle::Get(); 289 task_runner_ = base::ThreadTaskRunnerHandle::Get();
291 DCHECK(!message_reader_); 290 DCHECK(!message_reader_);
292 } 291 }
293 bootstrap_->Connect(); 292 bootstrap_->Connect();
294 return true; 293 return true;
295 } 294 }
296 295
297 void ChannelMojo::Close() { 296 void ChannelMojo::Close() {
298 std::unique_ptr<internal::MessagePipeReader, ReaderDeleter> reader; 297 std::unique_ptr<internal::MessagePipeReader, ReaderDeleter> reader;
299 { 298 {
300 base::AutoLock lock(lock_); 299 base::AutoLock lock(lock_);
300 associated_interfaces_.clear();
yzshen1 2016/07/20 17:13:15 A few questions about lock usage: - Is it safe to
Ken Rockot(use gerrit already) 2016/07/20 21:08:39 Good catch :)
301 if (!message_reader_) 301 if (!message_reader_)
302 return; 302 return;
303 // The reader's destructor may re-enter Close, so we swap it out first to 303 // The reader's destructor may re-enter Close, so we swap it out first to
304 // avoid deadlock when freeing it below. 304 // avoid deadlock when freeing it below.
305 std::swap(message_reader_, reader); 305 std::swap(message_reader_, reader);
306
307 // We might Close() before we Connect().
308 waiting_connect_ = false;
309 } 306 }
310 307
311 reader.reset(); 308 reader.reset();
312 } 309 }
313 310
314 // MojoBootstrap::Delegate implementation 311 // MojoBootstrap::Delegate implementation
315 void ChannelMojo::OnPipesAvailable( 312 void ChannelMojo::OnPipesAvailable(mojom::ChannelAssociatedPtr sender,
316 mojom::ChannelAssociatedPtrInfo send_channel, 313 mojom::ChannelAssociatedRequest receiver) {
317 mojom::ChannelAssociatedRequest receive_channel, 314 sender->SetPeerPid(GetSelfPID());
318 int32_t peer_pid) {
319 InitMessageReader(std::move(send_channel), std::move(receive_channel),
320 peer_pid);
321 }
322 315
323 void ChannelMojo::OnBootstrapError() {
324 listener_->OnChannelError();
325 }
326
327 void ChannelMojo::OnAssociatedInterfaceRequest(
328 const std::string& name,
329 mojo::ScopedInterfaceEndpointHandle handle) {
330 auto iter = associated_interfaces_.find(name);
331 if (iter != associated_interfaces_.end())
332 iter->second.Run(std::move(handle));
333 }
334
335 void ChannelMojo::InitMessageReader(mojom::ChannelAssociatedPtrInfo sender,
336 mojom::ChannelAssociatedRequest receiver,
337 base::ProcessId peer_pid) {
338 mojom::ChannelAssociatedPtr sender_ptr;
339 sender_ptr.Bind(std::move(sender));
340 std::unique_ptr<internal::MessagePipeReader, ChannelMojo::ReaderDeleter>
341 reader(new internal::MessagePipeReader(
342 pipe_, std::move(sender_ptr), std::move(receiver), peer_pid, this));
343
344 bool connected = true;
345 { 316 {
346 base::AutoLock lock(lock_); 317 base::AutoLock lock(lock_);
347 for (size_t i = 0; i < pending_messages_.size(); ++i) { 318 message_reader_.reset(new internal::MessagePipeReader(
348 if (!reader->Send(std::move(pending_messages_[i]))) { 319 pipe_, std::move(sender), std::move(receiver), this));
349 LOG(ERROR) << "Failed to flush pending messages";
350 pending_messages_.clear();
351 connected = false;
352 break;
353 }
354 }
355
356 if (connected) {
357 // We set |message_reader_| here and won't get any |pending_messages_|
358 // hereafter. Although we might have some if there is an error, we don't
359 // care. They cannot be sent anyway.
360 message_reader_ = std::move(reader);
361 pending_messages_.clear();
362 waiting_connect_ = false;
363 }
364 } 320 }
365
366 if (connected)
367 listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
368 else
369 OnPipeError();
370 } 321 }
371 322
372 void ChannelMojo::OnPipeError() { 323 void ChannelMojo::OnPipeError() {
373 DCHECK(task_runner_); 324 DCHECK(task_runner_);
374 if (task_runner_->RunsTasksOnCurrentThread()) { 325 if (task_runner_->RunsTasksOnCurrentThread()) {
375 listener_->OnChannelError(); 326 listener_->OnChannelError();
376 } else { 327 } else {
377 task_runner_->PostTask( 328 task_runner_->PostTask(
378 FROM_HERE, 329 FROM_HERE,
379 base::Bind(&ChannelMojo::OnPipeError, weak_factory_.GetWeakPtr())); 330 base::Bind(&ChannelMojo::OnPipeError, weak_factory_.GetWeakPtr()));
380 } 331 }
381 } 332 }
382 333
334 void ChannelMojo::OnAssociatedInterfaceRequest(
335 const std::string& name,
336 mojo::ScopedInterfaceEndpointHandle handle) {
337 GenericAssociatedInterfaceFactory factory;
338 {
339 base::AutoLock locker(associated_interface_lock_);
340 auto iter = associated_interfaces_.find(name);
341 if (iter != associated_interfaces_.end())
342 factory = iter->second;
343 }
344
345 if (!factory.is_null())
346 factory.Run(std::move(handle));
347 }
348
383 bool ChannelMojo::Send(Message* message) { 349 bool ChannelMojo::Send(Message* message) {
350 std::unique_ptr<Message> scoped_message = base::WrapUnique(message);
384 base::AutoLock lock(lock_); 351 base::AutoLock lock(lock_);
385 if (!message_reader_) { 352 if (!message_reader_)
386 pending_messages_.push_back(base::WrapUnique(message)); 353 return false;
387 // Counts as OK before the connection is established, but it's an
388 // error otherwise.
389 return waiting_connect_;
390 }
391 354
392 // Comment copied from ipc_channel_posix.cc: 355 // Comment copied from ipc_channel_posix.cc:
393 // We can't close the pipe here, because calling OnChannelError may destroy 356 // We can't close the pipe here, because calling OnChannelError may destroy
394 // this object, and that would be bad if we are called from Send(). Instead, 357 // this object, and that would be bad if we are called from Send(). Instead,
395 // we return false and hope the caller will close the pipe. If they do not, 358 // we return false and hope the caller will close the pipe. If they do not,
396 // the pipe will still be closed next time OnFileCanReadWithoutBlocking is 359 // the pipe will still be closed next time OnFileCanReadWithoutBlocking is
397 // called. 360 // called.
398 // 361 //
399 // With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the 362 // With Mojo, there's no OnFileCanReadWithoutBlocking, but we expect the
400 // pipe's connection error handler will be invoked in its place. 363 // pipe's connection error handler will be invoked in its place.
401 return message_reader_->Send(base::WrapUnique(message)); 364 return message_reader_->Send(std::move(scoped_message));
402 } 365 }
403 366
404 bool ChannelMojo::IsSendThreadSafe() const { 367 bool ChannelMojo::IsSendThreadSafe() const {
405 return false; 368 return false;
406 } 369 }
407 370
408 base::ProcessId ChannelMojo::GetPeerPID() const { 371 base::ProcessId ChannelMojo::GetPeerPID() const {
409 base::AutoLock lock(lock_); 372 base::AutoLock lock(lock_);
410 if (!message_reader_) 373 if (!message_reader_)
411 return base::kNullProcessId; 374 return base::kNullProcessId;
412 375
413 return message_reader_->GetPeerPid(); 376 return message_reader_->GetPeerPid();
414 } 377 }
415 378
416 base::ProcessId ChannelMojo::GetSelfPID() const { 379 base::ProcessId ChannelMojo::GetSelfPID() const {
417 return bootstrap_->GetSelfPID(); 380 #if defined(OS_LINUX)
381 if (int global_pid = GetGlobalPid())
382 return global_pid;
383 #endif // OS_LINUX
384 #if defined(OS_NACL)
385 return -1;
386 #else
387 return base::GetCurrentProcId();
388 #endif // defined(OS_NACL)
418 } 389 }
419 390
420 Channel::AssociatedInterfaceSupport* 391 Channel::AssociatedInterfaceSupport*
421 ChannelMojo::GetAssociatedInterfaceSupport() { return this; } 392 ChannelMojo::GetAssociatedInterfaceSupport() { return this; }
422 393
394 void ChannelMojo::OnPeerPidReceived() {
395 listener_->OnChannelConnected(static_cast<int32_t>(GetPeerPID()));
396 }
397
423 void ChannelMojo::OnMessageReceived(const Message& message) { 398 void ChannelMojo::OnMessageReceived(const Message& message) {
424 TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived", 399 TRACE_EVENT2("ipc,toplevel", "ChannelMojo::OnMessageReceived",
425 "class", IPC_MESSAGE_ID_CLASS(message.type()), 400 "class", IPC_MESSAGE_ID_CLASS(message.type()),
426 "line", IPC_MESSAGE_ID_LINE(message.type())); 401 "line", IPC_MESSAGE_ID_LINE(message.type()));
427 if (AttachmentBroker* broker = AttachmentBroker::GetGlobal()) { 402 if (AttachmentBroker* broker = AttachmentBroker::GetGlobal()) {
428 if (broker->OnMessageReceived(message)) 403 if (broker->OnMessageReceived(message))
429 return; 404 return;
430 } 405 }
431 listener_->OnMessageReceived(message); 406 listener_->OnMessageReceived(message);
432 if (message.dispatch_error()) 407 if (message.dispatch_error())
(...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after
497 } 472 }
498 473
499 mojo::AssociatedGroup* ChannelMojo::GetAssociatedGroup() { 474 mojo::AssociatedGroup* ChannelMojo::GetAssociatedGroup() {
500 DCHECK(bootstrap_); 475 DCHECK(bootstrap_);
501 return bootstrap_->GetAssociatedGroup(); 476 return bootstrap_->GetAssociatedGroup();
502 } 477 }
503 478
504 void ChannelMojo::AddGenericAssociatedInterface( 479 void ChannelMojo::AddGenericAssociatedInterface(
505 const std::string& name, 480 const std::string& name,
506 const GenericAssociatedInterfaceFactory& factory) { 481 const GenericAssociatedInterfaceFactory& factory) {
482 base::AutoLock locker(associated_interface_lock_);
507 auto result = associated_interfaces_.insert({ name, factory }); 483 auto result = associated_interfaces_.insert({ name, factory });
508 DCHECK(result.second); 484 DCHECK(result.second);
509 } 485 }
510 486
511 void ChannelMojo::GetGenericRemoteAssociatedInterface( 487 void ChannelMojo::GetGenericRemoteAssociatedInterface(
512 const std::string& name, 488 const std::string& name,
513 mojo::ScopedInterfaceEndpointHandle handle) { 489 mojo::ScopedInterfaceEndpointHandle handle) {
514 DCHECK(message_reader_); 490 base::AutoLock locker(lock_);
515 message_reader_->GetRemoteInterface(name, std::move(handle)); 491 if (message_reader_)
516 } 492 message_reader_->GetRemoteInterface(name, std::move(handle));
517
518 void ChannelMojo::SetProxyTaskRunner(
519 scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
520 DCHECK(bootstrap_);
521 bootstrap_->SetProxyTaskRunner(task_runner);
522 } 493 }
523 494
524 } // namespace IPC 495 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698