Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(157)

Side by Side Diff: mojo/edk/system/raw_channel_win.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/raw_channel.h"
6
7 #include <windows.h>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/process/process.h"
16 #include "base/synchronization/lock.h"
17 #include "base/win/object_watcher.h"
18 #include "base/win/windows_version.h"
19 #include "mojo/edk/embedder/platform_handle.h"
20 #include "mojo/public/cpp/system/macros.h"
21
22 #define STATUS_CANCELLED 0xC0000120
23 #define STATUS_PIPE_BROKEN 0xC000014B
24
25 // We can't use IO completion ports if we send a message pipe. The reason is
26 // that the only way to stop an existing IOCP is by closing the pipe handle.
27 // See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.8 5).aspx
28 bool g_use_iocp = false;
29
30 // Manual reset per
31 // Doc for overlapped I/O says use manual per
32 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85). aspx
33 // However using an auto-reset event makes the perf test 5x faster and also
34 // works since we don't wait on the event elsewhere or call GetOverlappedResult
35 // before it fires.
36 bool g_use_autoreset_event = true;
37
38 namespace mojo {
39 namespace system {
40
41 namespace {
42
43 class VistaOrHigherFunctions {
44 public:
45 VistaOrHigherFunctions()
46 : is_vista_or_higher_(
47 base::win::GetVersion() >= base::win::VERSION_VISTA),
48 set_file_completion_notification_modes_(nullptr),
49 cancel_io_ex_(nullptr) {
50 if (!is_vista_or_higher_)
51 return;
52
53 HMODULE module = GetModuleHandleW(L"kernel32.dll");
54 set_file_completion_notification_modes_ =
55 reinterpret_cast<SetFileCompletionNotificationModesFunc>(
56 GetProcAddress(module, "SetFileCompletionNotificationModes"));
57 DCHECK(set_file_completion_notification_modes_);
58
59 cancel_io_ex_ =
60 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx"));
61 DCHECK(cancel_io_ex_);
62 }
63
64 bool is_vista_or_higher() const { return is_vista_or_higher_; }
65
66 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
67 return set_file_completion_notification_modes_(handle, flags);
68 }
69
70 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
71 return cancel_io_ex_(handle, overlapped);
72 }
73
74 private:
75 using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR);
76 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED);
77
78 bool is_vista_or_higher_;
79 SetFileCompletionNotificationModesFunc
80 set_file_completion_notification_modes_;
81 CancelIoExFunc cancel_io_ex_;
82 };
83
84 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
85 LAZY_INSTANCE_INITIALIZER;
86
87 class RawChannelWin final : public RawChannel {
88 public:
89 RawChannelWin(embedder::ScopedPlatformHandle handle)
90 : handle_(handle.Pass()),
91 io_handler_(nullptr),
92 skip_completion_port_on_success_(
93 g_use_iocp &&
94 g_vista_or_higher_functions.Get().is_vista_or_higher()) {
95 DCHECK(handle_.is_valid());
96 }
97 ~RawChannelWin() override {
98 DCHECK(!io_handler_);
99 }
100
101 private:
102 // RawChannelIOHandler receives OS notifications for I/O completion. It must
103 // be created on the I/O thread.
104 //
105 // It manages its own destruction. Destruction happens on the I/O thread when
106 // all the following conditions are satisfied:
107 // - |DetachFromOwnerNoLock()| has been called;
108 // - there is no pending read;
109 // - there is no pending write.
110 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler,
111 public base::win::ObjectWatcher::Delegate {
112 public:
113 RawChannelIOHandler(RawChannelWin* owner,
114 embedder::ScopedPlatformHandle handle)
115 : handle_(handle.Pass()),
116 owner_(owner),
117 suppress_self_destruct_(false),
118 pending_read_(false),
119 pending_write_(false),
120 platform_handles_written_(0),
121 pipe_disconnected_(false) {
122 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
123 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
124 if (g_use_iocp) {
125 owner_->message_loop_for_io()->RegisterIOHandler(
126 handle_.get().handle, this);
127 read_context_.handler = this;
128 write_context_.handler = this;
129 } else {
130 read_event = CreateEvent(
131 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
132 write_event = CreateEvent(
133 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
134 read_context_.overlapped.hEvent = read_event;
135 write_context_.overlapped.hEvent = write_event;
136
137
138 if (g_use_autoreset_event) {
139 read_watcher_.StartWatching(read_event, this, true);
140 write_watcher_.StartWatching(write_event, this, true);
141 }
142 }
143 }
144
145 ~RawChannelIOHandler() override {
146 DCHECK(ShouldSelfDestruct());
147 }
148
149 HANDLE handle() const { return handle_.get().handle; }
150
151 // The following methods are only called by the owner on the I/O thread.
152 bool pending_read() const {
153 DCHECK(owner_);
154 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
155 return pending_read_;
156 }
157
158 base::MessageLoopForIO::IOContext* read_context() {
159 DCHECK(owner_);
160 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
161 return &read_context_;
162 }
163
164 // Instructs the object to wait for an |OnIOCompleted()| notification.
165 void OnPendingReadStarted() {
166 DCHECK(owner_);
167 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
168 DCHECK(!pending_read_);
169 pending_read_ = true;
170 }
171
172 // The following methods are only called by the owner under
173 // |owner_->write_lock()|.
174 bool pending_write_no_lock() const {
175 DCHECK(owner_);
176 owner_->write_lock().AssertAcquired();
177 return pending_write_;
178 }
179
180 base::MessageLoopForIO::IOContext* write_context_no_lock() {
181 DCHECK(owner_);
182 owner_->write_lock().AssertAcquired();
183 return &write_context_;
184 }
185 // Instructs the object to wait for an |OnIOCompleted()| notification.
186 void OnPendingWriteStartedNoLock(size_t platform_handles_written) {
187 DCHECK(owner_);
188 owner_->write_lock().AssertAcquired();
189 DCHECK(!pending_write_);
190 pending_write_ = true;
191 platform_handles_written_ = platform_handles_written;
192 }
193
194 // |base::MessageLoopForIO::IOHandler| implementation:
195 // Must be called on the I/O thread. It could be called before or after
196 // detached from the owner.
197 void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
198 DWORD bytes_transferred,
199 DWORD error) override {
200 DCHECK(owner_);
201 DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io());
202
203 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case
204 // they result in a call to |Shutdown()|).
205 bool old_suppress_self_destruct = suppress_self_destruct_;
206 suppress_self_destruct_ = true;
207
208 if (context == &read_context_)
209 OnReadCompleted(bytes_transferred, error);
210 else if (context == &write_context_)
211 OnWriteCompleted(bytes_transferred, error);
212 else
213 NOTREACHED();
214
215 // Maybe allow self-destruction again.
216 suppress_self_destruct_ = old_suppress_self_destruct;
217
218 if (ShouldSelfDestruct())
219 delete this;
220 }
221
222 // Must be called on the I/O thread under |owner_->write_lock()|.
223 // After this call, the owner must not make any further calls on this
224 // object, and therefore the object is used on the I/O thread exclusively
225 // (if it stays alive).
226 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
227 scoped_ptr<WriteBuffer> write_buffer) {
228 DCHECK(owner_);
229 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
230 //owner_->write_lock().AssertAcquired();
231
232 // If read/write is pending, we have to retain the corresponding buffer.
233 if (pending_read_)
234 preserved_read_buffer_after_detach_ = read_buffer.Pass();
235 if (pending_write_)
236 preserved_write_buffer_after_detach_ = write_buffer.Pass();
237
238 owner_ = nullptr;
239 read_watcher_.StopWatching();
240 write_watcher_.StopWatching();
241 bool read_signalled =
242 WaitForSingleObjectEx(read_event, 0, true) != WAIT_TIMEOUT;
243 bool write_signalled =
244 WaitForSingleObjectEx(write_event, 0, true) != WAIT_TIMEOUT;
245
246 if (read_signalled) {
247 if (read_context_.overlapped.Internal == STATUS_PIPE_BROKEN) {
248 pipe_disconnected_ = true;
249 } else if (read_context_.overlapped.Internal == STATUS_CANCELLED) {
250 // fine condition
251 } else {
252 NOTREACHED() << "TODO(jam)";
253 }
254 }
255
256 // Otherwise we will never quit since both endpoints of the channel could
257 // be waiting for their Read calls.
258 pending_read_ = false;
259
260 if (write_signalled) {
261 NOTREACHED() << "TODO(jam)";
262 }
263
264 if (ShouldSelfDestruct())
265 delete this;
266 }
267
268 embedder::ScopedPlatformHandle ReleaseHandle(
269 std::vector<char>* read_buffer) {
270 // TODO(jam): handle XP
271 CancelIoEx(handle(), NULL);
272 // NOTE!!!!
273 // The above call will cancel pending IO calls.
274 // HOWEVER, some could have already finished and posted task to IO thread
275 // that will execute
276
277
278 size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes();
279
280 if (pending_read_) {
281 DWORD bytes_read_dword = 0;
282
283 DWORD old_bytes = read_context_.overlapped.InternalHigh;
284
285 //TODO(jam): for XP, can return TRUE here to wait. also below.
286 BOOL rv = GetOverlappedResult(
287 handle(), &read_context_.overlapped, &bytes_read_dword, FALSE);
288 DCHECK_EQ(old_bytes, bytes_read_dword);
289 if (rv) {
290 if (read_context_.overlapped.Internal != STATUS_CANCELLED) {
291 read_buffer_byte_size += read_context_.overlapped.InternalHigh;
292 }
293 }
294 pending_read_ = false;
295 }
296
297 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock();
298
299 if (pending_write_) {
300 DWORD bytes_written_dword = 0;
301 DWORD old_bytes = write_context_.overlapped.InternalHigh;
302
303
304 BOOL rv = GetOverlappedResult(
305 handle(), &write_context_.overlapped, &bytes_written_dword, FALSE);
306
307 if (old_bytes != bytes_written_dword) {
308 NOTREACHED();
309 }
310
311 if (rv) {
312 if (write_context_.overlapped.Internal != STATUS_CANCELLED) {
313 CHECK(write_buffer->queue_size() != 0);
314
315 // TODO(jam)
316 DCHECK(!write_buffer->HavePlatformHandlesToSend());
317
318 write_buffer->data_offset_ += bytes_written_dword;
319
320 // TODO(jam): copied from OnWriteCompletedNoLock
321 MessageInTransit* message =
322 write_buffer->message_queue()->PeekMessage();
323 if (write_buffer->data_offset_ >= message->total_size()) {
324 // Complete write.
325 CHECK_EQ(write_buffer->data_offset_, message->total_size());
326 write_buffer->message_queue_.DiscardMessage();
327 write_buffer->platform_handles_offset_ = 0;
328 write_buffer->data_offset_ = 0;
329 }
330
331
332 //TODO(jam): handle more write msgs
333 DCHECK(write_buffer->message_queue_.IsEmpty());
334 }
335 }
336 pending_write_ = false;
337 }
338
339 if (read_buffer_byte_size) {
340 read_buffer->resize(read_buffer_byte_size);
341 memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(),
342 read_buffer_byte_size);
343 owner_->read_buffer()->Reset();
344 }
345
346 return embedder::ScopedPlatformHandle(handle_.release());
347 }
348
349 void OnObjectSignaled(HANDLE object) override {
350 // TODO(jam): need to call out to owner?
351
352
353 //not needed per doc, since reset by readfile & writefile
354 //ResetEvent(object);
355 if (object == read_event) {
356 if (read_context_.overlapped.Internal == STATUS_CANCELLED)
357 return;
358
359 {
360 // TODO(jam): improve this locking
361 base::AutoLock locker(owner_->read_lock());
362 if (!handle_.is_valid()) {
363
364
365
366
367 // TODO(jam): have way to cancel objectwatcher posttask so that we
368 // can have a dcheck here and catch any erronous conditions. right
369 // now this fires for non-problems.
370 return;
371
372
373
374 }
375 // this isn't covered with above lock
376 // DCHECK(!owner_->debug_started_sending());
377 DCHECK(handle_.is_valid());
378 }
379 OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh,
380 read_context_.overlapped.Internal);
381
382 } else {
383 if (write_context_.overlapped.Internal == STATUS_CANCELLED)
384 return;
385 DCHECK(!owner_->debug_started_sending());
386 DCHECK(handle_.is_valid());
387 CHECK(object == write_event);
388 OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh,
yzshen1 2015/09/23 22:47:09 Since write could happen on any thread, and we sta
389 write_context_.overlapped.Internal);
390 }
391 }
392 HANDLE read_event, write_event;
393 base::win::ObjectWatcher read_watcher_, write_watcher_;
394
395 bool pipe_disconnected() { return pipe_disconnected_; }
396
397 void QuitNow() {
398 pipe_disconnected_ = true;
399 owner_ = nullptr;
400 base::MessageLoop::current()->DeleteSoon(FROM_HERE, this);
401 }
402
403 private:
404 // Returns true if |owner_| has been reset and there is not pending read or
405 // write.
406 // Must be called on the I/O thread.
407 bool ShouldSelfDestruct() const {
408 if (owner_ || suppress_self_destruct_)
409 return false;
410
411 if (pipe_disconnected_)
412 return true;
413
414 // Note: Detached, hence no lock needed for |pending_write_|.
415 return !pending_read_ && !pending_write_;
416 }
417
418 // Must be called on the I/O thread. It may be called before or after
419 // detaching from the owner.
420 void OnReadCompleted(DWORD bytes_read, DWORD error) {
421 DCHECK(owner_);
422 DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io());
423 DCHECK(suppress_self_destruct_);
424
425 if (g_use_autoreset_event && !pending_read_)
426 return;
427
428 CHECK(pending_read_);
429 pending_read_ = false;
430 if (!owner_)
431 return;
432
433 // Note: |OnReadCompleted()| may detach us from |owner_|.
434 if (error == ERROR_SUCCESS) {
435 DCHECK_GT(bytes_read, 0u);
436 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
437 } else if (error == ERROR_BROKEN_PIPE ||
438 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
439 DCHECK_EQ(bytes_read, 0u);
440 pipe_disconnected_ = true;
441 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0);
442 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
443 return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
444 } else {
445 DCHECK_EQ(bytes_read, 0u);
446 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
447 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0);
448 }
449 }
450
451 // Must be called on the I/O thread. It may be called before or after
452 // detaching from the owner.
453 void OnWriteCompleted(DWORD bytes_written, DWORD error) {
454 DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io());
455 DCHECK(suppress_self_destruct_);
456
457 if (!owner_) {
458 // No lock needed.
459 CHECK(pending_write_);
460 pending_write_ = false;
461 return;
462 }
463
464 {
465 base::AutoLock locker(owner_->write_lock());
466 if (g_use_autoreset_event && !pending_write_)
467 return;
468
469 CHECK(pending_write_);
470 pending_write_ = false;
471 }
472
473 // Note: |OnWriteCompleted()| may detach us from |owner_|.
474 if (error == ERROR_SUCCESS) {
475 // Reset |platform_handles_written_| before calling |OnWriteCompleted()|
476 // since that function may call back to this class and set it again.
477 size_t local_platform_handles_written_ = platform_handles_written_;
478 platform_handles_written_ = 0;
479 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
480 bytes_written);
481 } else if (error == ERROR_BROKEN_PIPE ||
482 (g_use_autoreset_event && error ==STATUS_PIPE_BROKEN)) {
yzshen1 2015/09/23 22:47:09 It seems nice to unify the error code used.
483 pipe_disconnected_ = true;
484 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0);
485 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
486 size_t local_platform_handles_written_ = platform_handles_written_;
487 platform_handles_written_ = 0;
488 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
489 bytes_written);
490 } else {
491 LOG(WARNING) << "WriteFile: "
492 << logging::SystemErrorCodeToString(error);
493 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
494 }
495 }
496
497 embedder::ScopedPlatformHandle handle_;
498
499 // |owner_| is reset on the I/O thread under |owner_->write_lock()|.
500 // Therefore, it may be used on any thread under lock; or on the I/O thread
501 // without locking.
502 RawChannelWin* owner_;
503
504 // The following members must be used on the I/O thread.
505 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
506 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
507 bool suppress_self_destruct_;
508
509 bool pending_read_;
510 base::MessageLoopForIO::IOContext read_context_;
511
512 // The following members must be used under |owner_->write_lock()| while the
513 // object is still attached to the owner, and only on the I/O thread
514 // afterwards.
515 bool pending_write_;
516 size_t platform_handles_written_;
517 base::MessageLoopForIO::IOContext write_context_;
518
519 bool pipe_disconnected_;
520
521 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
522 };
523
524 embedder::ScopedPlatformHandle ReleaseHandleNoLock(
525 std::vector<char>* read_buffer_out) override {
526 std::vector<WriteBuffer::Buffer> buffers;
527 write_buffer_no_lock()->GetBuffers(&buffers);
528 if (!buffers.empty()) {
529 // TODO(jam): copy code in OnShutdownNoLock
530 NOTREACHED() << "releasing handle with pending write buffer";
531 }
532
533
534 if( handle_.is_valid()) {
535 // SetInitialBuffer could have been called on main thread before OnInit
536 // is called on Io thread. and in meantime releasehandle called.
537 //DCHECK(read_buffer()->num_valid_bytes() == 0);
538 if (read_buffer()->num_valid_bytes()) {
539 read_buffer_out->resize(read_buffer()->num_valid_bytes());
540 memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(),
541 read_buffer()->num_valid_bytes());
542 read_buffer()->Reset();
543 }
544 DCHECK(write_buffer_no_lock()->queue_size() == 0);
545 return embedder::ScopedPlatformHandle(
546 embedder::PlatformHandle(handle_.release().handle));
547 }
548
549 return io_handler_->ReleaseHandle(read_buffer_out);
550 }
551 embedder::PlatformHandle HandleForDebuggingNoLock() override {
552 if (handle_.is_valid())
553 return handle_.get();
554
555 if (!io_handler_)
556 return embedder::PlatformHandle();
557
558 return embedder::PlatformHandle(io_handler_->handle());
559 }
560
561 IOResult Read(size_t* bytes_read) override {
562 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
563
564 char* buffer = nullptr;
565 size_t bytes_to_read = 0;
566 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
567
568 DCHECK(io_handler_);
569 DCHECK(!io_handler_->pending_read());
570 BOOL result = ReadFile(
571 io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read),
572 nullptr, &io_handler_->read_context()->overlapped);
573 if (!result) {
574 DWORD error = GetLastError();
575 if (error == ERROR_BROKEN_PIPE)
576 return IO_FAILED_SHUTDOWN;
577 if (error != ERROR_IO_PENDING) {
578 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
579 return IO_FAILED_UNKNOWN;
580 }
581 }
582
583 if (result && skip_completion_port_on_success_) {
584 DWORD bytes_read_dword = 0;
585 BOOL get_size_result = GetOverlappedResult(
586 io_handler_->handle(), &io_handler_->read_context()->overlapped,
587 &bytes_read_dword, FALSE);
588 DPCHECK(get_size_result);
589 *bytes_read = bytes_read_dword;
590 return IO_SUCCEEDED;
591 }
592
593 if (!g_use_autoreset_event) {
594 if (!g_use_iocp) {
595 io_handler_->read_watcher_.StartWatching(
596 io_handler_->read_event, io_handler_, false);
597 }
598 }
599 // If the read is pending or the read has succeeded but we don't skip
600 // completion port on success, instruct |io_handler_| to wait for the
601 // completion packet.
602 //
603 // TODO(yzshen): It seems there isn't document saying that all error cases
604 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
605 // packet. If we do get one for errors,
606 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
607 // it.
608
609 io_handler_->OnPendingReadStarted();
610 return IO_PENDING;
611 }
612
613 IOResult ScheduleRead() override {
614 if (!io_handler_)
615 return IO_PENDING; // OnInit could have earlied out.
616
617 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
618 DCHECK(io_handler_);
619 DCHECK(!io_handler_->pending_read());
620
621 size_t bytes_read = 0;
622 IOResult io_result = Read(&bytes_read);
623 if (io_result == IO_SUCCEEDED) {
624 DCHECK(skip_completion_port_on_success_);
625
626 // We have finished reading successfully. Queue a notification manually.
627 io_handler_->OnPendingReadStarted();
628 // |io_handler_| won't go away before the task is run, so it is safe to
629 // use |base::Unretained()|.
630 message_loop_for_io()->PostTask(
631 FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted,
632 base::Unretained(io_handler_),
633 base::Unretained(io_handler_->read_context()),
634 static_cast<DWORD>(bytes_read), ERROR_SUCCESS));
635 return IO_PENDING;
636 }
637
638 return io_result;
639 }
640 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
641 size_t num_platform_handles,
642 const void* platform_handle_table) override {
643 // TODO(jam): this code will have to be updated once it's used in a sandbox
644 // and the receiving process doesn't have duplicate permission for the
645 // receiver. Once there's a broker and we have a connection to it (possibly
646 // through ConnectionManager), then we can make a sync IPC to it here to get
647 // a token for this handle, and it will duplicate the handle to is process.
648 // Then we pass the token to the receiver, which will then make a sync call
649 // to the broker to get a duplicated handle. This will also allow us to
650 // avoid leaks of the handle if the receiver dies, since the broker can
651 // notice that.
652 DCHECK_GT(num_platform_handles, 0u);
653 embedder::ScopedPlatformHandleVectorPtr rv(
654 new embedder::PlatformHandleVector());
655
656 const char* serialization_data =
657 static_cast<const char*>(platform_handle_table);
658 for (size_t i = 0; i < num_platform_handles; i++) {
659 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data);
660 serialization_data += sizeof(DWORD);
661 HANDLE source_handle =
662 *reinterpret_cast<const HANDLE*>(serialization_data);
663 serialization_data += sizeof(HANDLE);
664 base::Process sender =
665 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
666 DCHECK(sender.IsValid());
667 HANDLE target_handle = NULL;
668 BOOL dup_result = DuplicateHandle(
669 sender.Handle(), source_handle,
670 base::GetCurrentProcessHandle(), &target_handle, 0,
671 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
672 DCHECK(dup_result);
673 rv->push_back(embedder::PlatformHandle(target_handle));
674 }
675 return rv.Pass();
676 }
677
678 IOResult WriteNoLock(size_t* platform_handles_written,
679 size_t* bytes_written) override {
680 write_lock().AssertAcquired();
681
682 DCHECK(io_handler_);
683 DCHECK(!io_handler_->pending_write_no_lock());
684
685 size_t num_platform_handles = 0;
686 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
687 // Since we're not sure which process might ultimately deserialize this
688 // message, we can't duplicate the handle now. Instead, write the process
689 // ID and handle now and let the receiver duplicate it.
690 embedder::PlatformHandle* platform_handles;
691 void* serialization_data_temp;
692 write_buffer_no_lock()->GetPlatformHandlesToSend(
693 &num_platform_handles, &platform_handles, &serialization_data_temp);
694 char* serialization_data = static_cast<char*>(serialization_data_temp);
695 DCHECK_GT(num_platform_handles, 0u);
696 DCHECK(platform_handles);
697
698 DWORD current_process_id = base::GetCurrentProcId();
699 for (size_t i = 0; i < num_platform_handles; i++) {
700 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id;
701 serialization_data += sizeof(DWORD);
702 *reinterpret_cast<HANDLE*>(serialization_data) =
703 platform_handles[i].handle;
704 serialization_data += sizeof(HANDLE);
705 platform_handles[i] = embedder::PlatformHandle();
706 }
707 }
708
709 std::vector<WriteBuffer::Buffer> buffers;
710 write_buffer_no_lock()->GetBuffers(&buffers);
711 DCHECK(!buffers.empty());
712
713 // TODO(yzshen): Handle multi-segment writes more efficiently.
714 DWORD bytes_written_dword = 0;
715
716
717
718
719 // TODO(jam): right now we get in bad situation where we might first write
yzshen1 2015/09/23 22:47:09 I haven't understood: I thought we synchronously f
720 // the main buffer and then the MP gets sent before we write the transport
721 // buffer. We can fix this by sending information about partially written
722 // messages, or by teaching transport buffer how to grow the main buffer and
723 // write its data there.
724 // Until that's done, for now make another copy.
725
726 size_t total_size = buffers[0].size;
727 if (buffers.size() > 1)
728 total_size+=buffers[1].size;
729 char* buf = new char[total_size];
730 memcpy(buf, buffers[0].addr, buffers[0].size);
731 if (buffers.size() > 1)
732 memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size);
733
734 BOOL result = WriteFile(
735 io_handler_->handle(), buf,
736 static_cast<DWORD>(total_size),
737 &bytes_written_dword,
738 &io_handler_->write_context_no_lock()->overlapped);
739 delete [] buf;
yzshen1 2015/09/23 22:47:09 I think we need to keep the buffer valid until the
740
741 if (!result) {
742 DWORD error = GetLastError();
743 if (error == ERROR_BROKEN_PIPE)
744 return IO_FAILED_SHUTDOWN;
745 if (error != ERROR_IO_PENDING) {
746 LOG(WARNING) << "WriteFile: "
747 << logging::SystemErrorCodeToString(error);
748 return IO_FAILED_UNKNOWN;
749 }
750 }
751
752 if (result && skip_completion_port_on_success_) {
753 *platform_handles_written = num_platform_handles;
754 *bytes_written = bytes_written_dword;
755 return IO_SUCCEEDED;
756 }
757
758 if (!g_use_autoreset_event) {
759 if (!g_use_iocp) {
760 io_handler_->write_watcher_.StartWatching(
761 io_handler_->write_event, io_handler_, false);
762 }
763 }
764 // If the write is pending or the write has succeeded but we don't skip
765 // completion port on success, instruct |io_handler_| to wait for the
766 // completion packet.
767 //
768 // TODO(yzshen): it seems there isn't document saying that all error cases
769 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
770 // packet. If we do get one for errors,
771 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
772 // it.
773
774 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles);
775 return IO_PENDING;
776 }
777
778 IOResult ScheduleWriteNoLock() override {
779 write_lock().AssertAcquired();
780
781 DCHECK(io_handler_);
782 DCHECK(!io_handler_->pending_write_no_lock());
783
784 size_t platform_handles_written = 0;
785 size_t bytes_written = 0;
786 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
787 if (io_result == IO_SUCCEEDED) {
788 DCHECK(skip_completion_port_on_success_);
789
790 // We have finished writing successfully. Queue a notification manually.
791 io_handler_->OnPendingWriteStartedNoLock(platform_handles_written);
792 // |io_handler_| won't go away before that task is run, so it is safe to
793 // use |base::Unretained()|.
794 message_loop_for_io()->PostTask(
795 FROM_HERE,
796 base::Bind(&RawChannelIOHandler::OnIOCompleted,
797 base::Unretained(io_handler_),
798 base::Unretained(io_handler_->write_context_no_lock()),
799 static_cast<DWORD>(bytes_written), ERROR_SUCCESS));
800 return IO_PENDING;
801 }
802
803 return io_result;
804 }
805
806 void OnInit() override {
807 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
808
809 if (!handle_.is_valid()) {
810 LOG(ERROR) << "Note: RawChannelWin " << this
811 << " early exiting in OnInit because no handle";
812 return;
813 }
814
815 DCHECK(handle_.is_valid());
816 if (skip_completion_port_on_success_) {
817 // I don't know how this can fail (unless |handle_| is bad, in which case
818 // it's a bug in our code).
819 CHECK(g_vista_or_higher_functions.Get().
820 SetFileCompletionNotificationModes(
821 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS));
822 }
823
824 DCHECK(!io_handler_);
825 io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
826 }
827
828 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
829 scoped_ptr<WriteBuffer> write_buffer) override {
830 // happens on shutdown if didn't call init when doing createduplicate
831 if (message_loop_for_io()) {
832 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
833 }
834
835 if (!io_handler_) {
836 // This is hit when creating a duplicate dispatcher since we don't call
yzshen1 2015/09/23 22:47:09 Before this CL, Init() is restricted to be called
837 // Init on it.
838 DCHECK_EQ(read_buffer->num_valid_bytes(), 0U);
839 DCHECK_EQ(write_buffer->queue_size(), 0U);
840 return;
841 }
842
843 if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
844 // |io_handler_| will be alive until pending read/write (if any)
845 // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be
846 // freed up as soon as possible.
847 // Note: |CancelIo()| only cancels read/write requests started from this
848 // thread.
849 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) {
850 g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(),
851 nullptr);
852 } else {
853 CancelIo(io_handler_->handle());
854 }
855 }
856
857 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
858 io_handler_ = nullptr;
859 }
860
861 // Passed to |io_handler_| during initialization.
862 embedder::ScopedPlatformHandle handle_;
863
864 RawChannelIOHandler* io_handler_;
865
866 const bool skip_completion_port_on_success_;
867
868 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
869 };
870
871
872 } // namespace
873
874 // -----------------------------------------------------------------------------
875
876 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle) {
877 return new RawChannelWin(handle.Pass());
878 }
879
880 size_t RawChannel::GetSerializedPlatformHandleSize() {
881 return sizeof(DWORD) + sizeof(HANDLE);
882 }
883
884 } // namespace system
885 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698