Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Side by Side Diff: ipc/mojo/ipc_channel_mojo.cc

Issue 1130413002: Mojo IPC threading fixes (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Force shutdown on the correct thread Created 5 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ipc/mojo/ipc_channel_mojo.h ('k') | ipc/mojo/scoped_ipc_support.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "ipc/mojo/ipc_channel_mojo.h" 5 #include "ipc/mojo/ipc_channel_mojo.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/bind_helpers.h" 8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h" 9 #include "base/lazy_instance.h"
10 #include "base/thread_task_runner_handle.h"
10 #include "ipc/ipc_listener.h" 11 #include "ipc/ipc_listener.h"
11 #include "ipc/ipc_logging.h" 12 #include "ipc/ipc_logging.h"
12 #include "ipc/ipc_message_attachment_set.h" 13 #include "ipc/ipc_message_attachment_set.h"
13 #include "ipc/ipc_message_macros.h" 14 #include "ipc/ipc_message_macros.h"
14 #include "ipc/mojo/client_channel.mojom.h" 15 #include "ipc/mojo/client_channel.mojom.h"
15 #include "ipc/mojo/ipc_mojo_bootstrap.h" 16 #include "ipc/mojo/ipc_mojo_bootstrap.h"
16 #include "ipc/mojo/ipc_mojo_handle_attachment.h" 17 #include "ipc/mojo/ipc_mojo_handle_attachment.h"
17 #include "third_party/mojo/src/mojo/edk/embedder/embedder.h" 18 #include "third_party/mojo/src/mojo/edk/embedder/embedder.h"
18 #include "third_party/mojo/src/mojo/public/cpp/bindings/error_handler.h" 19 #include "third_party/mojo/src/mojo/public/cpp/bindings/error_handler.h"
19 20
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
67 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override; 68 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override;
68 // mojo::ErrorHandler implementation 69 // mojo::ErrorHandler implementation
69 void OnConnectionError() override; 70 void OnConnectionError() override;
70 // ClientChannel implementation 71 // ClientChannel implementation
71 void Init( 72 void Init(
72 mojo::ScopedMessagePipeHandle pipe, 73 mojo::ScopedMessagePipeHandle pipe,
73 int32_t peer_pid, 74 int32_t peer_pid,
74 const mojo::Callback<void(int32_t)>& callback) override; 75 const mojo::Callback<void(int32_t)>& callback) override;
75 76
76 private: 77 private:
78 void BindPipe(mojo::ScopedMessagePipeHandle handle);
79
77 mojo::Binding<ClientChannel> binding_; 80 mojo::Binding<ClientChannel> binding_;
81 base::WeakPtrFactory<ClientChannelMojo> weak_factory_;
78 82
79 DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo); 83 DISALLOW_COPY_AND_ASSIGN(ClientChannelMojo);
80 }; 84 };
81 85
82 ClientChannelMojo::ClientChannelMojo(ChannelMojo::Delegate* delegate, 86 ClientChannelMojo::ClientChannelMojo(ChannelMojo::Delegate* delegate,
83 scoped_refptr<base::TaskRunner> io_runner, 87 scoped_refptr<base::TaskRunner> io_runner,
84 const ChannelHandle& handle, 88 const ChannelHandle& handle,
85 Listener* listener) 89 Listener* listener)
86 : ChannelMojo(delegate, io_runner, handle, Channel::MODE_CLIENT, listener), 90 : ChannelMojo(delegate, io_runner, handle, Channel::MODE_CLIENT, listener),
87 binding_(this) { 91 binding_(this),
92 weak_factory_(this) {
88 } 93 }
89 94
90 ClientChannelMojo::~ClientChannelMojo() { 95 ClientChannelMojo::~ClientChannelMojo() {
91 } 96 }
92 97
93 void ClientChannelMojo::OnPipeAvailable( 98 void ClientChannelMojo::OnPipeAvailable(
94 mojo::embedder::ScopedPlatformHandle handle) { 99 mojo::embedder::ScopedPlatformHandle handle) {
95 binding_.Bind(CreateMessagingPipe(handle.Pass())); 100 CreateMessagingPipe(handle.Pass(), base::Bind(&ClientChannelMojo::BindPipe,
101 weak_factory_.GetWeakPtr()));
96 } 102 }
97 103
98 void ClientChannelMojo::OnConnectionError() { 104 void ClientChannelMojo::OnConnectionError() {
99 listener()->OnChannelError(); 105 listener()->OnChannelError();
100 } 106 }
101 107
102 void ClientChannelMojo::Init( 108 void ClientChannelMojo::Init(
103 mojo::ScopedMessagePipeHandle pipe, 109 mojo::ScopedMessagePipeHandle pipe,
104 int32_t peer_pid, 110 int32_t peer_pid,
105 const mojo::Callback<void(int32_t)>& callback) { 111 const mojo::Callback<void(int32_t)>& callback) {
106 InitMessageReader(pipe.Pass(), static_cast<base::ProcessId>(peer_pid)); 112 InitMessageReader(pipe.Pass(), static_cast<base::ProcessId>(peer_pid));
107 callback.Run(GetSelfPID()); 113 callback.Run(GetSelfPID());
108 } 114 }
109 115
116 void ClientChannelMojo::BindPipe(mojo::ScopedMessagePipeHandle handle) {
117 binding_.Bind(handle.Pass());
118 }
119
110 //------------------------------------------------------------------------------ 120 //------------------------------------------------------------------------------
111 121
112 class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler { 122 class ServerChannelMojo : public ChannelMojo, public mojo::ErrorHandler {
113 public: 123 public:
114 ServerChannelMojo(ChannelMojo::Delegate* delegate, 124 ServerChannelMojo(ChannelMojo::Delegate* delegate,
115 scoped_refptr<base::TaskRunner> io_runner, 125 scoped_refptr<base::TaskRunner> io_runner,
116 const ChannelHandle& handle, 126 const ChannelHandle& handle,
117 Listener* listener); 127 Listener* listener);
118 ~ServerChannelMojo() override; 128 ~ServerChannelMojo() override;
119 129
120 // MojoBootstrap::Delegate implementation 130 // MojoBootstrap::Delegate implementation
121 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override; 131 void OnPipeAvailable(mojo::embedder::ScopedPlatformHandle handle) override;
122 // mojo::ErrorHandler implementation 132 // mojo::ErrorHandler implementation
123 void OnConnectionError() override; 133 void OnConnectionError() override;
124 // Channel override 134 // Channel override
125 void Close() override; 135 void Close() override;
126 136
127 private: 137 private:
138 void InitClientChannel(mojo::ScopedMessagePipeHandle peer_handle,
139 mojo::ScopedMessagePipeHandle handle);
140
128 // ClientChannelClient implementation 141 // ClientChannelClient implementation
129 void ClientChannelWasInitialized(int32_t peer_pid); 142 void ClientChannelWasInitialized(int32_t peer_pid);
130 143
131 mojo::InterfacePtr<ClientChannel> client_channel_; 144 mojo::InterfacePtr<ClientChannel> client_channel_;
132 mojo::ScopedMessagePipeHandle message_pipe_; 145 mojo::ScopedMessagePipeHandle message_pipe_;
146 base::WeakPtrFactory<ServerChannelMojo> weak_factory_;
Hajime Morrita 2015/05/11 22:19:21 Having weak_factory_ for Channel is tricky but I b
133 147
134 DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo); 148 DISALLOW_COPY_AND_ASSIGN(ServerChannelMojo);
135 }; 149 };
136 150
137 ServerChannelMojo::ServerChannelMojo(ChannelMojo::Delegate* delegate, 151 ServerChannelMojo::ServerChannelMojo(ChannelMojo::Delegate* delegate,
138 scoped_refptr<base::TaskRunner> io_runner, 152 scoped_refptr<base::TaskRunner> io_runner,
139 const ChannelHandle& handle, 153 const ChannelHandle& handle,
140 Listener* listener) 154 Listener* listener)
141 : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener) { 155 : ChannelMojo(delegate, io_runner, handle, Channel::MODE_SERVER, listener),
156 weak_factory_(this) {
142 } 157 }
143 158
144 ServerChannelMojo::~ServerChannelMojo() { 159 ServerChannelMojo::~ServerChannelMojo() {
145 Close(); 160 Close();
146 } 161 }
147 162
148 void ServerChannelMojo::OnPipeAvailable( 163 void ServerChannelMojo::OnPipeAvailable(
149 mojo::embedder::ScopedPlatformHandle handle) { 164 mojo::embedder::ScopedPlatformHandle handle) {
150 mojo::ScopedMessagePipeHandle peer; 165 mojo::ScopedMessagePipeHandle peer;
151 MojoResult create_result = 166 MojoResult create_result =
152 mojo::CreateMessagePipe(nullptr, &message_pipe_, &peer); 167 mojo::CreateMessagePipe(nullptr, &message_pipe_, &peer);
153 if (create_result != MOJO_RESULT_OK) { 168 if (create_result != MOJO_RESULT_OK) {
154 LOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result; 169 LOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
155 listener()->OnChannelError(); 170 listener()->OnChannelError();
156 return; 171 return;
157 } 172 }
173 CreateMessagingPipe(
174 handle.Pass(),
175 base::Bind(&ServerChannelMojo::InitClientChannel,
176 weak_factory_.GetWeakPtr(), base::Passed(&peer)));
177 }
158 178
179 void ServerChannelMojo::InitClientChannel(
180 mojo::ScopedMessagePipeHandle peer_handle,
181 mojo::ScopedMessagePipeHandle handle) {
159 client_channel_.Bind( 182 client_channel_.Bind(
160 mojo::InterfacePtrInfo<ClientChannel>( 183 mojo::InterfacePtrInfo<ClientChannel>(handle.Pass(), 0u));
161 CreateMessagingPipe(handle.Pass()), 0u));
162 client_channel_.set_error_handler(this); 184 client_channel_.set_error_handler(this);
163 client_channel_->Init( 185 client_channel_->Init(
164 peer.Pass(), 186 peer_handle.Pass(), static_cast<int32_t>(GetSelfPID()),
165 static_cast<int32_t>(GetSelfPID()),
166 base::Bind(&ServerChannelMojo::ClientChannelWasInitialized, 187 base::Bind(&ServerChannelMojo::ClientChannelWasInitialized,
167 base::Unretained(this))); 188 base::Unretained(this)));
168 } 189 }
169 190
170 void ServerChannelMojo::ClientChannelWasInitialized(int32_t peer_pid) { 191 void ServerChannelMojo::ClientChannelWasInitialized(int32_t peer_pid) {
171 InitMessageReader(message_pipe_.Pass(), peer_pid); 192 InitMessageReader(message_pipe_.Pass(), peer_pid);
172 } 193 }
173 194
174 void ServerChannelMojo::OnConnectionError() { 195 void ServerChannelMojo::OnConnectionError() {
175 listener()->OnChannelError(); 196 listener()->OnChannelError();
176 } 197 }
177 198
178 void ServerChannelMojo::Close() { 199 void ServerChannelMojo::Close() {
179 client_channel_.reset(); 200 client_channel_.reset();
180 message_pipe_.reset(); 201 message_pipe_.reset();
181 ChannelMojo::Close(); 202 ChannelMojo::Close();
182 } 203 }
183 204
184 #if defined(OS_POSIX) && !defined(OS_NACL) 205 #if defined(OS_POSIX) && !defined(OS_NACL)
185 206
186 base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) { 207 base::ScopedFD TakeOrDupFile(internal::PlatformFileAttachment* attachment) {
187 return attachment->Owns() ? base::ScopedFD(attachment->TakePlatformFile()) 208 return attachment->Owns() ? base::ScopedFD(attachment->TakePlatformFile())
188 : base::ScopedFD(dup(attachment->file())); 209 : base::ScopedFD(dup(attachment->file()));
189 } 210 }
190 211
191 #endif 212 #endif
192 213
193 } // namespace 214 } // namespace
194 215
195 //------------------------------------------------------------------------------ 216 //------------------------------------------------------------------------------
196 217
218 ChannelMojo::ChannelInfoDeleter::ChannelInfoDeleter(
219 scoped_refptr<base::TaskRunner> io_runner)
220 : io_runner(io_runner) {
221 }
222
223 ChannelMojo::ChannelInfoDeleter::~ChannelInfoDeleter() {
224 }
225
197 void ChannelMojo::ChannelInfoDeleter::operator()( 226 void ChannelMojo::ChannelInfoDeleter::operator()(
198 mojo::embedder::ChannelInfo* ptr) const { 227 mojo::embedder::ChannelInfo* ptr) const {
199 mojo::embedder::DestroyChannelOnIOThread(ptr); 228 if (base::ThreadTaskRunnerHandle::Get() == io_runner) {
229 mojo::embedder::DestroyChannelOnIOThread(ptr);
230 } else {
231 io_runner->PostTask(
232 FROM_HERE, base::Bind(&mojo::embedder::DestroyChannelOnIOThread, ptr));
233 }
200 } 234 }
201 235
202 //------------------------------------------------------------------------------ 236 //------------------------------------------------------------------------------
203 237
204 // static 238 // static
205 bool ChannelMojo::ShouldBeUsed() { 239 bool ChannelMojo::ShouldBeUsed() {
206 // TODO(morrita): Remove this if it sticks. 240 // TODO(morrita): Remove this if it sticks.
207 return true; 241 return true;
208 } 242 }
209 243
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
247 281
248 ChannelMojo::ChannelMojo(ChannelMojo::Delegate* delegate, 282 ChannelMojo::ChannelMojo(ChannelMojo::Delegate* delegate,
249 scoped_refptr<base::TaskRunner> io_runner, 283 scoped_refptr<base::TaskRunner> io_runner,
250 const ChannelHandle& handle, 284 const ChannelHandle& handle,
251 Mode mode, 285 Mode mode,
252 Listener* listener) 286 Listener* listener)
253 : mode_(mode), 287 : mode_(mode),
254 listener_(listener), 288 listener_(listener),
255 peer_pid_(base::kNullProcessId), 289 peer_pid_(base::kNullProcessId),
256 io_runner_(io_runner), 290 io_runner_(io_runner),
291 channel_info_(nullptr, ChannelInfoDeleter(nullptr)),
257 weak_factory_(this) { 292 weak_factory_(this) {
258 // Create MojoBootstrap after all members are set as it touches 293 // Create MojoBootstrap after all members are set as it touches
259 // ChannelMojo from a different thread. 294 // ChannelMojo from a different thread.
260 bootstrap_ = MojoBootstrap::Create(handle, mode, this); 295 bootstrap_ = MojoBootstrap::Create(handle, mode, this);
261 if (io_runner == base::MessageLoop::current()->message_loop_proxy()) { 296 if (io_runner == base::MessageLoop::current()->message_loop_proxy()) {
262 InitOnIOThread(delegate); 297 InitOnIOThread(delegate);
263 } else { 298 } else {
264 io_runner->PostTask(FROM_HERE, 299 io_runner->PostTask(FROM_HERE,
265 base::Bind(&ChannelMojo::InitOnIOThread, 300 base::Bind(&ChannelMojo::InitOnIOThread,
266 base::Unretained(this), delegate)); 301 base::Unretained(this), delegate));
267 } 302 }
268 } 303 }
269 304
270 ChannelMojo::~ChannelMojo() { 305 ChannelMojo::~ChannelMojo() {
271 Close(); 306 Close();
272 } 307 }
273 308
274 void ChannelMojo::InitOnIOThread(ChannelMojo::Delegate* delegate) { 309 void ChannelMojo::InitOnIOThread(ChannelMojo::Delegate* delegate) {
275 ipc_support_.reset( 310 ipc_support_.reset(
276 new ScopedIPCSupport(base::MessageLoop::current()->task_runner())); 311 new ScopedIPCSupport(base::MessageLoop::current()->task_runner()));
277 if (!delegate) 312 if (!delegate)
278 return; 313 return;
279 delegate_ = delegate->ToWeakPtr(); 314 delegate_ = delegate->ToWeakPtr();
280 delegate_->OnChannelCreated(weak_factory_.GetWeakPtr()); 315 delegate_->OnChannelCreated(weak_factory_.GetWeakPtr());
281 } 316 }
282 317
283 mojo::ScopedMessagePipeHandle ChannelMojo::CreateMessagingPipe( 318 void ChannelMojo::CreateMessagingPipe(
284 mojo::embedder::ScopedPlatformHandle handle) { 319 mojo::embedder::ScopedPlatformHandle handle,
285 DCHECK(!channel_info_.get()); 320 const CreateMessagingPipeCallback& callback) {
321 auto return_callback = base::Bind(&ChannelMojo::OnMessagingPipeCreated,
322 weak_factory_.GetWeakPtr(), callback);
323 if (base::ThreadTaskRunnerHandle::Get() == io_runner_) {
324 CreateMessagingPipeOnIOThread(
325 handle.Pass(), base::ThreadTaskRunnerHandle::Get(), return_callback);
326 } else {
327 io_runner_->PostTask(
328 FROM_HERE,
329 base::Bind(&ChannelMojo::CreateMessagingPipeOnIOThread,
330 base::Passed(&handle), base::ThreadTaskRunnerHandle::Get(),
331 return_callback));
332 }
333 }
334
335 // static
336 void ChannelMojo::CreateMessagingPipeOnIOThread(
337 mojo::embedder::ScopedPlatformHandle handle,
338 scoped_refptr<base::TaskRunner> callback_runner,
339 const CreateMessagingPipeOnIOThreadCallback& callback) {
286 mojo::embedder::ChannelInfo* channel_info; 340 mojo::embedder::ChannelInfo* channel_info;
287 mojo::ScopedMessagePipeHandle pipe = 341 mojo::ScopedMessagePipeHandle pipe =
288 mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info); 342 mojo::embedder::CreateChannelOnIOThread(handle.Pass(), &channel_info);
289 channel_info_.reset(channel_info); 343 if (base::ThreadTaskRunnerHandle::Get() == callback_runner) {
290 return pipe.Pass(); 344 callback.Run(pipe.Pass(), channel_info);
345 } else {
346 callback_runner->PostTask(
347 FROM_HERE, base::Bind(callback, base::Passed(&pipe), channel_info));
348 }
349 }
350
351 void ChannelMojo::OnMessagingPipeCreated(
352 const CreateMessagingPipeCallback& callback,
353 mojo::ScopedMessagePipeHandle handle,
354 mojo::embedder::ChannelInfo* channel_info) {
355 DCHECK(!channel_info_.get());
356 channel_info_ = scoped_ptr<mojo::embedder::ChannelInfo, ChannelInfoDeleter>(
357 channel_info, ChannelInfoDeleter(io_runner_));
358 callback.Run(handle.Pass());
291 } 359 }
292 360
293 bool ChannelMojo::Connect() { 361 bool ChannelMojo::Connect() {
294 DCHECK(!message_reader_); 362 DCHECK(!message_reader_);
295 return bootstrap_->Connect(); 363 return bootstrap_->Connect();
296 } 364 }
297 365
298 void ChannelMojo::Close() { 366 void ChannelMojo::Close() {
299 scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_deleted; 367 scoped_ptr<internal::MessagePipeReader, ReaderDeleter> to_be_deleted;
300 368
(...skipping 183 matching lines...) Expand 10 before | Expand all | Expand 10 after
484 if (!ok) { 552 if (!ok) {
485 LOG(ERROR) << "Failed to add new Mojo handle."; 553 LOG(ERROR) << "Failed to add new Mojo handle.";
486 return MOJO_RESULT_UNKNOWN; 554 return MOJO_RESULT_UNKNOWN;
487 } 555 }
488 } 556 }
489 557
490 return MOJO_RESULT_OK; 558 return MOJO_RESULT_OK;
491 } 559 }
492 560
493 } // namespace IPC 561 } // namespace IPC
OLDNEW
« no previous file with comments | « ipc/mojo/ipc_channel_mojo.h ('k') | ipc/mojo/scoped_ipc_support.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698