Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(892)

Side by Side Diff: ipc/mojo/ipc_channel_mojo.cc

Issue 382333002: Introduce ChannelMojo (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Another build fix Created 6 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "ipc/mojo/ipc_channel_mojo.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "ipc/ipc_listener.h"
11 #include "mojo/embedder/embedder.h"
12
13 #if defined(OS_POSIX) && !defined(OS_NACL)
14 #include "ipc/file_descriptor_set_posix.h"
15 #endif
16
17 namespace IPC {
18
19 namespace {
20
21 // IPC::Listener for bootstrap channels.
22 // It should never receive any message.
23 class NullListener : public Listener {
24 public:
25 virtual bool OnMessageReceived(const Message&) OVERRIDE {
26 NOTREACHED();
27 return false;
28 }
29
30 virtual void OnChannelConnected(int32 peer_pid) OVERRIDE {
31 NOTREACHED();
32 }
33
34 virtual void OnChannelError() OVERRIDE {
35 NOTREACHED();
36 }
37
38 virtual void OnBadMessageReceived(const Message& message) OVERRIDE {
39 NOTREACHED();
40 }
41 };
42
43 base::LazyInstance<NullListener> g_null_listener = LAZY_INSTANCE_INITIALIZER;
44
45 class MojoChannelBuilder : public ChannelBuilder {
46 public:
47 MojoChannelBuilder(
48 ChannelHandle channel_handle,
49 Channel::Mode mode,
50 scoped_refptr<base::TaskRunner> io_thread_task_runner)
51 : channel_handle_(channel_handle),
52 mode_(mode),
53 io_thread_task_runner_(io_thread_task_runner) {
54 }
55
56 virtual std::string GetName() const OVERRIDE {
57 return channel_handle_.name;
58 }
59
60 virtual scoped_ptr<Channel> BuildChannel(Listener* listener) OVERRIDE {
61 return ChannelMojo::Create(
62 channel_handle_,
63 mode_,
64 listener,
65 io_thread_task_runner_).PassAs<Channel>();
66 }
67
68 private:
69 ChannelHandle channel_handle_;
70 Channel::Mode mode_;
71 scoped_refptr<base::TaskRunner> io_thread_task_runner_;
72 };
73
74 mojo::embedder::PlatformHandle ToPlatformHandle(
75 const ChannelHandle& handle) {
76 #if defined(OS_POSIX) && !defined(OS_NACL)
77 return mojo::embedder::PlatformHandle(handle.socket.fd);
78 #elif defined(OS_WIN)
79 return mojo::embedder::PlatformHandle(handle.pipe.handle);
80 #else
81 #error "Unsupported Platform!"
82 #endif
83 }
84
85 //------------------------------------------------------------------------------
86
87 // TODO(morrita): This should be built using higher-level Mojo construct
88 // for clarify and extensibility.
89 class HelloMessage {
viettrungluu 2014/07/17 15:11:54 Why do you need to send any hello messages, etc.?
Hajime Morrita 2014/07/17 18:32:12 I'm using the magic header so that we can detect a
90 public:
91 static Pickle CreateRequest(int32 pid) {
92 Pickle request;
93 request.WriteString(kHelloRequestMagic);
94 request.WriteInt(pid);
95 return request;
96 }
97
98 static bool ReadRequest(Pickle& pickle, int32* pid) {
99 PickleIterator iter(pickle);
100 std::string hello;
101 if (!iter.ReadString(&hello)) {
102 DLOG(WARNING) << "Failed to Read magic string.";
103 return false;
104 }
105
106 if (hello != kHelloRequestMagic) {
107 DLOG(WARNING) << "Magic mismatch:" << hello;
108 return false;
109 }
110
111 int read_pid;
112 if (!iter.ReadInt(&read_pid)) {
113 DLOG(WARNING) << "Failed to Read PID.";
114 return false;
115 }
116
117 *pid = read_pid;
118 return true;
119 }
120
121 static Pickle CreateResponse(int32 pid) {
122 Pickle request;
123 request.WriteString(kHelloResponseMagic);
124 request.WriteInt(pid);
125 return request;
126 }
127
128 static bool ReadResponse(Pickle& pickle, int32* pid) {
129 PickleIterator iter(pickle);
130 std::string hello;
131 if (!iter.ReadString(&hello)) {
132 DLOG(WARNING) << "Failed to read magic string.";
133 return false;
134 }
135
136 if (hello != kHelloResponseMagic) {
137 DLOG(WARNING) << "Magic mismatch:" << hello;
138 return false;
139 }
140
141 int read_pid;
142 if (!iter.ReadInt(&read_pid)) {
143 DLOG(WARNING) << "Failed to read PID.";
144 return false;
145 }
146
147 *pid = read_pid;
148 return true;
149 }
150
151 private:
152 static const char* kHelloRequestMagic;
153 static const char* kHelloResponseMagic;
154 };
155
156 const char* HelloMessage::kHelloRequestMagic = "MREQ";
157 const char* HelloMessage::kHelloResponseMagic = "MRES";
158
159 } // namespace
160
161 //------------------------------------------------------------------------------
162
163 // A MessagePipeReader implemenation for IPC::Message communication.
164 class ChannelMojo::MessageReader : public internal::MessagePipeReader {
165 public:
166 MessageReader(ChannelMojo* owner)
167 : owner_(owner) { }
168
169 bool Send(scoped_ptr<Message> message);
170 virtual void OnMessageArrived() OVERRIDE;
171 virtual void OnPipeClosed() OVERRIDE;
172 virtual void OnPipeError(MojoResult error) OVERRIDE;
173
174 private:
175 ChannelMojo* owner_;
176 };
177
178 void ChannelMojo::MessageReader::OnMessageArrived() {
179 Message message(data_buffer().empty() ? "" : &data_buffer()[0],
180 data_buffer().size());
181
182 #if defined(OS_POSIX) && !defined(OS_NACL)
183 for (size_t i = 0; i < handle_buffer().size(); ++i) {
184 mojo::embedder::ScopedPlatformHandle platform_handle;
185 MojoResult unwrap_result = mojo::embedder::PassWrappedPlatformHandle(
186 handle_buffer()[i], &platform_handle);
187 if (unwrap_result != MOJO_RESULT_OK) {
188 DLOG(WARNING) << "Pipe failed to covert handles. Closing: "
189 << unwrap_result;
190 CloseWithError(unwrap_result);
191 return;
192 }
193
194 bool ok = message.file_descriptor_set()->Add(platform_handle.release().fd);
195 DCHECK(ok);
196 }
197 #else
198 DCHECK(handle_buffer_.empty());
199 #endif
200
201 message.TraceMessageEnd();
202 owner_->OnMessageReceived(message);
203 }
204
205 void ChannelMojo::MessageReader::OnPipeClosed() {
206 if (!owner_)
207 return;
208 owner_->OnPipeClosed(this);
209 owner_ = NULL;
210 }
211
212 void ChannelMojo::MessageReader::OnPipeError(MojoResult error) {
213 if (!owner_)
214 return;
215 owner_->OnPipeError(this);
216 }
217
218 bool ChannelMojo::MessageReader::Send(scoped_ptr<Message> message) {
219 DCHECK(IsValid());
220
221 message->TraceMessageBegin();
222 std::vector<MojoHandle> handles;
223 #if defined(OS_POSIX) && !defined(OS_NACL)
224 if (message->HasFileDescriptors()) {
225 FileDescriptorSet* fdset = message->file_descriptor_set();
226 for (size_t i = 0; i < fdset->size(); ++i) {
227 MojoHandle wrapped_handle;
228 MojoResult wrap_result = CreatePlatformHandleWrapper(
229 mojo::embedder::ScopedPlatformHandle(
230 mojo::embedder::PlatformHandle(
231 fdset->GetDescriptorAt(i))),
232 &wrapped_handle);
233 if (MOJO_RESULT_OK != wrap_result) {
234 DLOG(WARNING) << "Pipe failed to wrap handles. Closing: "
235 << wrap_result;
236 CloseWithError(wrap_result);
237 return false;
238 }
239
240 handles.push_back(wrapped_handle);
241 }
242 }
243 #endif
244 MojoResult write_result = MojoWriteMessage(
245 handle(),
246 message->data(), message->size(),
247 handles.empty() ? NULL : &handles[0], handles.size(),
248 MOJO_WRITE_MESSAGE_FLAG_NONE);
249 if (MOJO_RESULT_OK != write_result) {
250 CloseWithError(write_result);
251 return false;
252 }
253
254 return true;
255 }
256
257 //------------------------------------------------------------------------------
258
259 // MessagePipeReader implemenation for control messages.
260 // Actual message handling is implemented by sublcasses.
261 class ChannelMojo::ControlReader : public internal::MessagePipeReader {
262 public:
263 ControlReader(ChannelMojo* owner)
264 : owner_(owner) { }
265
266 virtual bool Connect() { return true; }
267 virtual void OnPipeClosed() OVERRIDE;
268 virtual void OnPipeError(MojoResult error) OVERRIDE;
269
270 protected:
271 ChannelMojo* owner_;
272 };
273
274 void ChannelMojo::ControlReader::OnPipeClosed() {
275 if (!owner_)
276 return;
277 owner_->OnPipeClosed(this);
278 owner_ = NULL;
279 }
280
281 void ChannelMojo::ControlReader::OnPipeError(MojoResult error) {
282 if (!owner_)
283 return;
284 owner_->OnPipeError(this);
285 }
286
287 //------------------------------------------------------------------------------
288
289 // ControlReader for server-side ChannelMojo.
290 class ChannelMojo::ServerControlReader : public ChannelMojo::ControlReader {
291 public:
292 ServerControlReader(ChannelMojo* owner)
293 : ControlReader(owner) { }
294
295 virtual bool Connect() OVERRIDE;
296 virtual void OnMessageArrived() OVERRIDE;
297
298 private:
299 MojoResult SendHelloRequest();
300 MojoResult RespondHelloResponse();
301
302 mojo::ScopedMessagePipeHandle message_pipe_;
303 };
304
305 bool ChannelMojo::ServerControlReader::Connect() {
306 MojoResult result = SendHelloRequest();
307 if (result != MOJO_RESULT_OK) {
308 CloseWithError(result);
309 return false;
310 }
311
312 return true;
313 }
314
315 MojoResult ChannelMojo::ServerControlReader::SendHelloRequest() {
316 DCHECK(IsValid());
317 DCHECK(!message_pipe_.is_valid());
318
319 mojo::ScopedMessagePipeHandle self;
320 mojo::ScopedMessagePipeHandle peer;
321 MojoResult create_result = mojo::CreateMessagePipe(
322 NULL, &message_pipe_, &peer);
323 if (MOJO_RESULT_OK != create_result) {
324 DLOG(WARNING) << "mojo::CreateMessagePipe failed: " << create_result;
325 return create_result;
326 }
327
328 MojoHandle peer_to_send = peer.get().value();
329 Pickle request = HelloMessage::CreateRequest(owner_->GetSelfPID());
330 MojoResult write_result = MojoWriteMessage(
331 handle(),
332 request.data(), request.size(),
333 &peer_to_send, 1,
334 MOJO_WRITE_MESSAGE_FLAG_NONE);
335 if (MOJO_RESULT_OK != write_result) {
336 DLOG(WARNING) << "Writing Hello request failed: " << create_result;
337 return write_result;
338 }
339
340 // |peer| is sent and no longer owned by |this|.
341 (void)peer.release();
342 return MOJO_RESULT_OK;
343 }
344
345 MojoResult ChannelMojo::ServerControlReader::RespondHelloResponse() {
346 Pickle request(data_buffer_.empty() ? "" : data_buffer_.data(),
347 data_buffer_.size());
348
349 int32 read_pid = 0;
350 if (!HelloMessage::ReadResponse(request, &read_pid)) {
351 DLOG(ERROR) << "Failed to parse Hello response.";
352 return MOJO_RESULT_UNKNOWN;
353 }
354
355 base::ProcessId pid = static_cast<base::ProcessId>(read_pid);
356 owner_->set_peer_pid(pid);
357 owner_->OnConnected(message_pipe_.Pass());
358 return MOJO_RESULT_OK;
359 }
360
361 void ChannelMojo::ServerControlReader::OnMessageArrived() {
362 MojoResult result = RespondHelloResponse();
363 if (result != MOJO_RESULT_OK)
364 CloseWithError(result);
365 }
366
367 //------------------------------------------------------------------------------
368
369 // ControlReader for client-side ChannelMojo.
370 class ChannelMojo::ClientControlReader : public ChannelMojo::ControlReader {
371 public:
372 ClientControlReader(ChannelMojo* owner)
373 : ControlReader(owner) { }
374
375 virtual void OnMessageArrived() OVERRIDE;
376
377 private:
378 MojoResult RespondHelloRequest();
379 };
380
381 MojoResult ChannelMojo::ClientControlReader::RespondHelloRequest() {
382 DCHECK(IsValid());
383
384 if (handle_buffer_.size() != 1) {
385 DLOG(ERROR) << "Hello request doesn't contains required handle: "
386 << handle_buffer_.size();
387 return MOJO_RESULT_UNKNOWN;
388 }
389
390 mojo::ScopedMessagePipeHandle received_pipe(
391 (mojo::MessagePipeHandle(handle_buffer_[0])));
392
393 int32 read_request = 0;
394 Pickle request(data_buffer_.empty() ? "" : data_buffer_.data(),
395 data_buffer_.size());
396 if (!HelloMessage::ReadRequest(request, &read_request)) {
397 DLOG(ERROR) << "Hello request has wrong magic.";
398 return MOJO_RESULT_UNKNOWN;
399 }
400
401 base::ProcessId pid = read_request;
402 Pickle response = HelloMessage::CreateResponse(owner_->GetSelfPID());
403 MojoResult write_result = MojoWriteMessage(
404 handle(),
405 response.data(), response.size(),
406 NULL, 0,
407 MOJO_WRITE_MESSAGE_FLAG_NONE);
408 if (MOJO_RESULT_OK != write_result) {
409 DLOG(ERROR) << "Writing Hello response failed: " << write_result;
410 return write_result;
411 }
412
413 owner_->set_peer_pid(pid);
414 owner_->OnConnected(received_pipe.Pass());
415 return MOJO_RESULT_OK;
416 }
417
418 void ChannelMojo::ClientControlReader::OnMessageArrived() {
419 MojoResult result = RespondHelloRequest();
420 if (result != MOJO_RESULT_OK) {
421 DLOG(ERROR) << "Failed to respond Hello request. Closing: "
422 << result;
423 CloseWithError(result);
424 }
425 }
426
427 // static
428 scoped_ptr<ChannelMojo> ChannelMojo::Create(
429 scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
430 scoped_refptr<base::TaskRunner> io_thread_task_runner) {
431 return make_scoped_ptr(new ChannelMojo(
432 bootstrap.Pass(), mode, listener, io_thread_task_runner));
433 }
434
435 // static
436 scoped_ptr<ChannelMojo> ChannelMojo::Create(
437 const ChannelHandle &channel_handle, Mode mode, Listener* listener,
438 scoped_refptr<base::TaskRunner> io_thread_task_runner) {
439 return Create(
440 Channel::Create(channel_handle, mode, g_null_listener.Pointer()),
441 mode, listener, io_thread_task_runner);
442 }
443
444 // static
445 scoped_ptr<ChannelBuilder> ChannelMojo::CreateBuilder(
446 const ChannelHandle &channel_handle, Mode mode,
447 scoped_refptr<base::TaskRunner> io_thread_task_runner) {
448 return make_scoped_ptr(
449 new MojoChannelBuilder(
450 channel_handle, mode,
451 io_thread_task_runner)).PassAs<ChannelBuilder>();
452 }
453
454 //------------------------------------------------------------------------------
455
456 ChannelMojo::ChannelMojo(
457 scoped_ptr<Channel> bootstrap, Mode mode, Listener* listener,
458 scoped_refptr<base::TaskRunner> io_thread_task_runner)
459 : weak_factory_(this),
460 bootstrap_(bootstrap.Pass()),
461 mode_(mode), listener_(listener),
462 peer_pid_(base::kNullProcessId) {
463 DCHECK(mode_ == MODE_SERVER || mode_ == MODE_CLIENT);
464 mojo::ScopedMessagePipeHandle control_pipe
465 = mojo::embedder::CreateChannel(
466 mojo::embedder::ScopedPlatformHandle(
467 ToPlatformHandle(bootstrap_->TakePipeHandle())),
468 io_thread_task_runner,
469 base::Bind(&ChannelMojo::DidCreateChannel, base::Unretained(this)),
470 io_thread_task_runner);
471
472 // MessagePipeReader, that is crated in InitOnIOThread(), should live only in
473 // IO thread, but IPC::Channel can be instantiated outside of it.
474 // So we move the creation to the appropriate thread.
475 if (base::MessageLoopProxy::current() == io_thread_task_runner) {
476 InitOnIOThread(control_pipe.Pass());
477 } else {
478 io_thread_task_runner->PostTask(
479 FROM_HERE,
480 base::Bind(&ChannelMojo::InitOnIOThread,
481 weak_factory_.GetWeakPtr(),
482 base::Passed(control_pipe.Pass())));
483 }
484 }
485
486 ChannelMojo::~ChannelMojo() {
487 Close();
488 }
489
490 void ChannelMojo::InitOnIOThread(mojo::ScopedMessagePipeHandle control_pipe) {
491 control_reader_ = CreateControlReader();
492 control_reader_->SetPipe(control_pipe.Pass());
493 }
494
495 scoped_ptr<ChannelMojo::ControlReader> ChannelMojo::CreateControlReader() {
496 if (MODE_SERVER == mode_) {
497 return make_scoped_ptr(
498 new ServerControlReader(this)).PassAs<ControlReader>();
499 }
500
501 DCHECK(mode_ == MODE_CLIENT);
502 return make_scoped_ptr(
503 new ClientControlReader(this)).PassAs<ControlReader>();
504 }
505
506 bool ChannelMojo::Connect() {
507 DCHECK(!message_reader_);
508 return control_reader_->Connect();
509 }
510
511 void ChannelMojo::Close() {
512 control_reader_.reset();
513 message_reader_.reset();
514 }
515
516 void ChannelMojo::OnConnected(mojo::ScopedMessagePipeHandle pipe) {
517 message_reader_ = make_scoped_ptr(new MessageReader(this));
518 message_reader_->SetPipe(pipe.Pass());
519
520 for (size_t i = 0; i < pending_messages_.size(); ++i) {
521 message_reader_->Send(make_scoped_ptr(pending_messages_[i]));
522 pending_messages_[i] = NULL;
523 }
524
525 pending_messages_.clear();
526
527 listener_->OnChannelConnected(GetPeerPID());
528 }
529
530 void ChannelMojo::OnPipeClosed(internal::MessagePipeReader* reader) {
531 Close();
532 }
533
534 void ChannelMojo::OnPipeError(internal::MessagePipeReader* reader) {
535 listener_->OnChannelError();
536 }
537
538
539 bool ChannelMojo::Send(Message* message) {
540 if (!message_reader_) {
541 pending_messages_.push_back(message);
542 return true;
543 }
544
545 return message_reader_->Send(make_scoped_ptr(message));
546 }
547
548 base::ProcessId ChannelMojo::GetPeerPID() const {
549 return peer_pid_;
550 }
551
552 base::ProcessId ChannelMojo::GetSelfPID() const {
553 return bootstrap_->GetSelfPID();
554 }
555
556 ChannelHandle ChannelMojo::TakePipeHandle() {
557 return bootstrap_->TakePipeHandle();
558 }
559
560 void ChannelMojo::DidCreateChannel(mojo::embedder::ChannelInfo*) {
561 // TODO(morrita): I'm not sure what should be done here.
562 // Apparently there is no way to delete ChannelInfo and it will leak.
viettrungluu 2014/07/17 15:11:54 See DestroyChannelOnIOThread(), which for some rea
Hajime Morrita 2014/07/17 18:32:12 Oh I see. Problem solved!
563 // Mojo Embedder API needs some way to release it.
564 }
565
566 void ChannelMojo::OnMessageReceived(Message& message) {
567 listener_->OnMessageReceived(message);
568 if (message.dispatch_error())
569 listener_->OnBadMessageReceived(message);
570 }
571
572 #if defined(OS_POSIX) && !defined(OS_NACL)
573 int ChannelMojo::GetClientFileDescriptor() const {
574 return bootstrap_->GetClientFileDescriptor();
575 }
576
577 int ChannelMojo::TakeClientFileDescriptor() {
578 return bootstrap_->TakeClientFileDescriptor();
579 }
580 #endif // defined(OS_POSIX) && !defined(OS_NACL)
581
582 } // namespace IPC
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698