Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1355)

Unified Diff: ipc/mojo/ipc_channel_mojo.cc

Issue 1130413002: Mojo IPC threading fixes (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Correct some outdated expectations during shutdown Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: ipc/mojo/ipc_channel_mojo.cc
diff --git a/ipc/mojo/ipc_channel_mojo.cc b/ipc/mojo/ipc_channel_mojo.cc
index cff2789c710301fbb5a251a0ab4f2288a06e2777..9badfdf5d4b4461b0bbcdb4fe571bc3566bd6c88 100644
--- a/ipc/mojo/ipc_channel_mojo.cc
+++ b/ipc/mojo/ipc_channel_mojo.cc
@@ -7,6 +7,7 @@
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
+#include "base/thread_task_runner_handle.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
#include "ipc/ipc_message_attachment_set.h"
@@ -74,7 +75,10 @@ class ClientChannelMojo : public ChannelMojo,
const mojo::Callback<void(int32_t)>& callback) override;
private:
+ void BindPipe(mojo::ScopedMessagePipeHandle handle);
+
mojo::Binding<ClientChannel> binding_;
+ base::WeakPtrFactory<ClientChannelMojo> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo);
};
@@ -84,7 +88,8 @@ ClientChannelMojo::ClientChannelMojo(ChannelMojo::Delegate* delegate,
const ChannelHandle& handle,
Listener* listener)
: ChannelMojo(delegate, io_runner, handle, Channel::MODE_CLIENT, listener),
- binding_(this) {
+ binding_(this),
+ weak_factory_(this) {
}
ClientChannelMojo::~ClientChannelMojo() {
@@ -92,7 +97,8 @@ ClientChannelMojo::~ClientChannelMojo() {
void ClientChannelMojo::OnPipeAvailable(
mojo::embedder::ScopedPlatformHandle handle) {
- binding_.Bind(CreateMessagingPipe(handle.Pass()));
+ CreateMessagingPipe(handle.Pass(), base::Bind(&ClientChannelMojo::BindPipe,
+ weak_factory_.GetWeakPtr()));
}
void ClientChannelMojo::OnConnectionError() {
@@ -107,6 +113,10 @@ void ClientChannelMojo::Init(
callback.Run(GetSelfPID());
}
+void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle) {
+ binding_.Bind(handle.Pass());
+}
+
//------------------------------------------------------------------------------
class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler {
@@ -125,11 +135,15 @@ class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler {
void Close() override;
private:
+ void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle,
+ mojo::ScopedMessagePipeHandle handle);
+
// ClientChannelClient implementation
void ClientChannelWasInitialized(int32_t peer_pid);
mojo::InterfacePtr<ClientChannel> client_channel_;
mojo::ScopedMessagePipeHandle message_pipe_;
+ base::WeakPtrFactory<ServerChannelMojo> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo);
};
@@ -138,7 +152,8 @@ ServerChannelMojo::ServerChannelMojo(ChannelMojo::Delegate* delegate,
scoped_refptr<base::TaskRunner> io_runner,
const ChannelHandle& handle,
Listener* listener)
- : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener) {
+ : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener),
+ weak_factory_(this) {
}
ServerChannelMojo::~ServerChannelMojo() {
@@ -155,14 +170,20 @@ void ServerChannelMojo::OnPipeAvailable(
listener()->OnChannelError();
return;
}
+ CreateMessagingPipe(
+ handle.Pass(),
+ base::Bind(&ServerChannelMojo::InitClientChannel,
+ weak_factory_.GetWeakPtr(), base::Passed(&peer)));
+}
+void ServerChannelMojo::InitClientChannel(
+ mojo::ScopedMessagePipeHandle peer_handle,
+ mojo::ScopedMessagePipeHandle handle) {
client_channel_.Bind(
- mojo::InterfacePtrInfo<ClientChannel>(
- CreateMessagingPipe(handle.Pass()), 0u));
+ mojo::InterfacePtrInfo<ClientChannel>(handle.Pass(), 0u));
client_channel_.set_error_handler(this);
client_channel_->Init(
- peer.Pass(),
- static_cast<int32_t>(GetSelfPID()),
+ peer_handle.Pass(), static_cast<int32_t>(GetSelfPID()),
base::Bind(&ServerChannelMojo::ClientChannelWasInitialized,
base::Unretained(this)));
}
@@ -190,13 +211,26 @@ base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) {
#endif
-} // namespace
+} // namespace
//------------------------------------------------------------------------------
+ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter(
+ scoped_refptr<base::TaskRunner> io_runner)
+ : io_runner(io_runner) {
+}
+
+ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() {
+}
+
void ChannelMojo::ChannelInfoDeleter::operator()(
mojo::embedder::ChannelInfo* ptr) const {
- mojo::embedder::DestroyChannelOnIOThread(ptr);
+ if (base::ThreadTaskRunnerHandle::Get() == io_runner) {
+ mojo::embedder::DestroyChannelOnIOThread(ptr);
+ } else {
+ io_runner->PostTask(
+ FROM_HERE, base::Bind(&mojo::embedder::DestroyChannelOnIOThread, ptr));
+ }
}
//------------------------------------------------------------------------------
@@ -254,6 +288,7 @@ ChannelMojo::ChannelMojo(ChannelMojo::Delegate* delegate,
listener_(listener),
peer_pid_(base::kNullProcessId),
io_runner_(io_runner),
+ channel_info_(nullptr, ChannelInfoDeleter(nullptr)),
weak_factory_(this) {
// Create MojoBootstrap after all members are set as it touches
// ChannelMojo from a different thread.
@@ -280,14 +315,47 @@ void ChannelMojo::InitOnIOThread(ChannelMojo::Delegate* delegate) {
delegate_->OnChannelCreated(weak_factory_.GetWeakPtr());
}
-mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe(
- mojo::embedder::ScopedPlatformHandle handle) {
- DCHECK(!channel_info_.get());
+void ChannelMojo::CreateMessagingPipe(
+ mojo::embedder::ScopedPlatformHandle handle,
+ const CreateMessagingPipeCallback& callback) {
+ auto return_callback = base::Bind(&ChannelMojo::OnMessagingPipeCreated,
+ weak_factory_.GetWeakPtr(), callback);
+ if (base::ThreadTaskRunnerHandle::Get() == io_runner_) {
+ CreateMessagingPipeOnIOThread(
+ handle.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback);
+ } else {
+ io_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread,
+ base::Passed(&handle), base::ThreadTaskRunnerHandle::Get(),
+ return_callback));
+ }
+}
+
+// static
+void ChannelMojo::CreateMessagingPipeOnIOThread(
+ mojo::embedder::ScopedPlatformHandle handle,
+ scoped_refptr<base::TaskRunner> callback_runner,
+ const CreateMessagingPipeOnIOThreadCallback& callback) {
mojo::embedder::ChannelInfo* channel_info;
mojo::ScopedMessagePipeHandle pipe =
mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info);
- channel_info_.reset(channel_info);
- return pipe.Pass();
+ if (base::ThreadTaskRunnerHandle::Get() == callback_runner) {
+ callback.Run(pipe.Pass(), channel_info);
+ } else {
+ callback_runner->PostTask(
+ FROM_HERE, base::Bind(callback, base::Passed(&pipe), channel_info));
+ }
+}
+
+void ChannelMojo::OnMessagingPipeCreated(
+ const CreateMessagingPipeCallback& callback,
+ mojo::ScopedMessagePipeHandle handle,
+ mojo::embedder::ChannelInfo* channel_info) {
+ DCHECK(!channel_info_.get());
+ channel_info_ = scoped_ptr<mojo::embedder::ChannelInfo, ChannelInfoDeleter>(
+ channel_info, ChannelInfoDeleter(io_runner_));
+ callback.Run(handle.Pass());
}
bool ChannelMojo::Connect() {

Powered by Google App Engine
This is Rietveld 408576698