Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(354)

Side by Side Diff: mojo/edk/system/raw_channel_win.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: convert remaining MP tests and simplify RawChannel destruction Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/raw_channel.h"
6
7 #include <windows.h>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/process/process.h"
16 #include "base/synchronization/lock.h"
17 #include "base/win/object_watcher.h"
18 #include "base/win/windows_version.h"
19 #include "mojo/edk/embedder/platform_handle.h"
20 #include "mojo/public/cpp/system/macros.h"
21
22 #define STATUS_CANCELLED 0xC0000120
23 #define STATUS_PIPE_BROKEN 0xC000014B
24
25 // We can't use IO completion ports if we send a message pipe. The reason is
26 // that the only way to stop an existing IOCP is by closing the pipe handle.
27 // See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.8 5).aspx
28 bool g_use_iocp = false;
29
30 // Manual reset per
31 // Doc for overlapped I/O says use manual per
32 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85). aspx
33 // However using an auto-reset event makes the perf test 5x faster and also
34 // works since we don't wait on the event elsewhere or call GetOverlappedResult
35 // before it fires.
36 bool g_use_autoreset_event = true;
37
38 namespace mojo {
39 namespace system {
40
41 namespace {
42
43 class VistaOrHigherFunctions {
44 public:
45 VistaOrHigherFunctions()
46 : is_vista_or_higher_(
47 base::win::GetVersion() >= base::win::VERSION_VISTA),
48 set_file_completion_notification_modes_(nullptr),
49 cancel_io_ex_(nullptr) {
50 if (!is_vista_or_higher_)
51 return;
52
53 HMODULE module = GetModuleHandleW(L"kernel32.dll");
54 set_file_completion_notification_modes_ =
55 reinterpret_cast<SetFileCompletionNotificationModesFunc>(
56 GetProcAddress(module, "SetFileCompletionNotificationModes"));
57 DCHECK(set_file_completion_notification_modes_);
58
59 cancel_io_ex_ =
60 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx"));
61 DCHECK(cancel_io_ex_);
62 }
63
64 bool is_vista_or_higher() const { return is_vista_or_higher_; }
65
66 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
67 return set_file_completion_notification_modes_(handle, flags);
68 }
69
70 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
71 return cancel_io_ex_(handle, overlapped);
72 }
73
74 private:
75 using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR);
76 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED);
77
78 bool is_vista_or_higher_;
79 SetFileCompletionNotificationModesFunc
80 set_file_completion_notification_modes_;
81 CancelIoExFunc cancel_io_ex_;
82 };
83
84 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
85 LAZY_INSTANCE_INITIALIZER;
86
87 class RawChannelWin final : public RawChannel {
88 public:
89 RawChannelWin(embedder::ScopedPlatformHandle handle)
90 : handle_(handle.Pass()),
91 io_handler_(nullptr),
92 skip_completion_port_on_success_(
93 g_use_iocp &&
94 g_vista_or_higher_functions.Get().is_vista_or_higher()) {
95 DCHECK(handle_.is_valid());
96 }
97 ~RawChannelWin() override {
98 DCHECK(!io_handler_);
99 }
100
101 private:
102 // RawChannelIOHandler receives OS notifications for I/O completion. It must
103 // be created on the I/O thread.
104 //
105 // It manages its own destruction. Destruction happens on the I/O thread when
106 // all the following conditions are satisfied:
107 // - |DetachFromOwnerNoLock()| has been called;
108 // - there is no pending read;
109 // - there is no pending write.
110 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler,
111 public base::win::ObjectWatcher::Delegate {
112 public:
113 RawChannelIOHandler(RawChannelWin* owner,
114 embedder::ScopedPlatformHandle handle)
115 : handle_(handle.Pass()),
116 owner_(owner),
117 suppress_self_destruct_(false),
118 pending_read_(false),
119 pending_write_(false),
120 platform_handles_written_(0) {
121 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
122 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
123 if (g_use_iocp) {
124 owner_->message_loop_for_io()->RegisterIOHandler(
125 handle_.get().handle, this);
126 read_context_.handler = this;
127 write_context_.handler = this;
128 } else {
129 read_event = CreateEvent(
130 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
131 write_event = CreateEvent(
132 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
133 read_context_.overlapped.hEvent = read_event;
134 write_context_.overlapped.hEvent = write_event;
135
136
137 if (g_use_autoreset_event) {
138 read_watcher_.StartWatching(read_event, this, true);
139 write_watcher_.StartWatching(write_event, this, true);
140 }
141 }
142 }
143
144 ~RawChannelIOHandler() override {
145 DCHECK(ShouldSelfDestruct());
146 }
147
148 HANDLE handle() const { return handle_.get().handle; }
149
150 // The following methods are only called by the owner on the I/O thread.
151 bool pending_read() const {
152 DCHECK(owner_);
153 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
154 return pending_read_;
155 }
156
157 base::MessageLoopForIO::IOContext* read_context() {
158 DCHECK(owner_);
159 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
160 return &read_context_;
161 }
162
163 // Instructs the object to wait for an |OnIOCompleted()| notification.
164 void OnPendingReadStarted() {
165 DCHECK(owner_);
166 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
167 DCHECK(!pending_read_);
168 pending_read_ = true;
169 }
170
171 // The following methods are only called by the owner under
172 // |owner_->write_lock()|.
173 bool pending_write_no_lock() const {
174 DCHECK(owner_);
175 owner_->write_lock().AssertAcquired();
176 return pending_write_;
177 }
178
179 base::MessageLoopForIO::IOContext* write_context_no_lock() {
180 DCHECK(owner_);
181 owner_->write_lock().AssertAcquired();
182 return &write_context_;
183 }
184 // Instructs the object to wait for an |OnIOCompleted()| notification.
185 void OnPendingWriteStartedNoLock(size_t platform_handles_written) {
186 DCHECK(owner_);
187 owner_->write_lock().AssertAcquired();
188 DCHECK(!pending_write_);
189 pending_write_ = true;
190 platform_handles_written_ = platform_handles_written;
191 }
192
193 // |base::MessageLoopForIO::IOHandler| implementation:
194 // Must be called on the I/O thread. It could be called before or after
195 // detached from the owner.
196 void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
197 DWORD bytes_transferred,
198 DWORD error) override {
199 DCHECK(!owner_ ||
200 base::MessageLoop::current() == owner_->message_loop_for_io());
201
202 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case
203 // they result in a call to |Shutdown()|).
204 bool old_suppress_self_destruct = suppress_self_destruct_;
205 suppress_self_destruct_ = true;
206
207 if (context == &read_context_)
208 OnReadCompleted(bytes_transferred, error);
209 else if (context == &write_context_)
210 OnWriteCompleted(bytes_transferred, error);
211 else
212 NOTREACHED();
213
214 // Maybe allow self-destruction again.
215 suppress_self_destruct_ = old_suppress_self_destruct;
216
217 if (ShouldSelfDestruct())
218 delete this;
219 }
220
221 // Must be called on the I/O thread under |owner_->write_lock()|.
222 // After this call, the owner must not make any further calls on this
223 // object, and therefore the object is used on the I/O thread exclusively
224 // (if it stays alive).
225 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
226 scoped_ptr<WriteBuffer> write_buffer) {
227 DCHECK(owner_);
228 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
229 //owner_->write_lock().AssertAcquired();
230
231 // If read/write is pending, we have to retain the corresponding buffer.
232 if (pending_read_)
233 preserved_read_buffer_after_detach_ = read_buffer.Pass();
234 if (pending_write_)
235 preserved_write_buffer_after_detach_ = write_buffer.Pass();
236
237 owner_ = nullptr;
238 if (ShouldSelfDestruct())
239 delete this;
240 }
241
242 embedder::ScopedPlatformHandle ReleaseHandle(
243 std::vector<char>* read_buffer) {
244 // TODO(jam): handle XP
245 CancelIoEx(handle(), NULL);
246 // NOTE!!!!
247 // The above call will cancel pending IO calls.
248 // HOWEVER, some could have already finished and posted task to IO thread
249 // that will execute
250
251
252 size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes();
253
254 if (pending_read_) {
255 DWORD bytes_read_dword = 0;
256
257 DWORD old_bytes = read_context_.overlapped.InternalHigh;
258
259 //TODO(jam): for XP, can return TRUE here to wait. also below.
260 BOOL rv = GetOverlappedResult(
yzshen1 2015/09/23 22:47:09 Please add a comment about why the if (pending_rea
261 handle(), &read_context_.overlapped, &bytes_read_dword, FALSE);
262 DCHECK_EQ(old_bytes, bytes_read_dword);
263 if (rv) {
264 if (read_context_.overlapped.Internal != STATUS_CANCELLED) {
265 read_buffer_byte_size += read_context_.overlapped.InternalHigh;
266 }
267 }
268 pending_read_ = false;
269 }
270
271 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock();
272
273 if (pending_write_) {
274 DWORD bytes_written_dword = 0;
275 DWORD old_bytes = write_context_.overlapped.InternalHigh;
276
277
278 BOOL rv = GetOverlappedResult(
279 handle(), &write_context_.overlapped, &bytes_written_dword, FALSE);
280
281 if (old_bytes != bytes_written_dword) {
282 NOTREACHED();
283 }
284
285 if (rv) {
286 if (write_context_.overlapped.Internal != STATUS_CANCELLED) {
287 CHECK(write_buffer->queue_size() != 0);
288
289 // TODO(jam)
290 DCHECK(!write_buffer->HavePlatformHandlesToSend());
291
292 write_buffer->data_offset_ += bytes_written_dword;
293
294 // TODO(jam): copied from OnWriteCompletedNoLock
295 MessageInTransit* message =
296 write_buffer->message_queue()->PeekMessage();
297 if (write_buffer->data_offset_ >= message->total_size()) {
298 // Complete write.
299 CHECK_EQ(write_buffer->data_offset_, message->total_size());
300 write_buffer->message_queue_.DiscardMessage();
301 write_buffer->platform_handles_offset_ = 0;
302 write_buffer->data_offset_ = 0;
303 }
304
305
306 //TODO(jam): handle more write msgs
307 DCHECK(write_buffer->message_queue_.IsEmpty());
308 }
309 }
310 pending_write_ = false;
311 }
312
313 if (read_buffer_byte_size) {
314 read_buffer->resize(read_buffer_byte_size);
315 memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(),
316 read_buffer_byte_size);
317 owner_->read_buffer()->Reset();
318 }
319
320 return embedder::ScopedPlatformHandle(handle_.release());
321 }
322
323 void OnObjectSignaled(HANDLE object) override {
324
325 // Since this is called on the IO thread, no locks needed for owner_.
326 bool handle_is_valid = false;
327 if (owner_)
328 owner_->read_lock().Acquire();
329 handle_is_valid = handle_.is_valid();
330 if (owner_)
331 owner_->read_lock().Release();
332 if (!handle_is_valid) {
333 if (object == read_event)
334 pending_read_ = false;
335 else
336 pending_write_ = false;
337 if (ShouldSelfDestruct())
338 delete this;
339 return;
340 }
341
342 if (object == read_event) {
343 OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh,
344 read_context_.overlapped.Internal);
345
346 } else {
347 CHECK(object == write_event);
348 OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh,
349 write_context_.overlapped.Internal);
350 }
351 }
352 HANDLE read_event, write_event;
353 base::win::ObjectWatcher read_watcher_, write_watcher_;
354
355 private:
356 // Returns true if |owner_| has been reset and there is not pending read or
357 // write.
358 // Must be called on the I/O thread.
359 bool ShouldSelfDestruct() const {
360 if (owner_ || suppress_self_destruct_)
361 return false;
362
363 // Note: Detached, hence no lock needed for |pending_write_|.
364 return !pending_read_ && !pending_write_;
365 }
366
367 // Must be called on the I/O thread. It may be called before or after
368 // detaching from the owner.
369 void OnReadCompleted(DWORD bytes_read, DWORD error) {
370 DCHECK(!owner_ ||
371 base::MessageLoop::current() == owner_->message_loop_for_io());
372 DCHECK(suppress_self_destruct_);
373
374 if (g_use_autoreset_event && !pending_read_)
375 return;
376
377 CHECK(pending_read_);
378 pending_read_ = false;
379 if (!owner_)
380 return;
381
382 // Note: |OnReadCompleted()| may detach us from |owner_|.
383 if (error == ERROR_SUCCESS) {
384 DCHECK_GT(bytes_read, 0u);
385 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
386 } else if (error == ERROR_BROKEN_PIPE ||
387 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
388 DCHECK_EQ(bytes_read, 0u);
389 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0);
390 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
391 return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
392 } else {
393 DCHECK_EQ(bytes_read, 0u);
394 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
395 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0);
396 }
397 }
398
399 // Must be called on the I/O thread. It may be called before or after
400 // detaching from the owner.
401 void OnWriteCompleted(DWORD bytes_written, DWORD error) {
402 DCHECK(!owner_ ||
403 base::MessageLoop::current() == owner_->message_loop_for_io());
404 DCHECK(suppress_self_destruct_);
405
406 if (!owner_) {
407 // No lock needed.
408 CHECK(pending_write_);
409 pending_write_ = false;
410 return;
411 }
412
413 {
414 base::AutoLock locker(owner_->write_lock());
415 if (g_use_autoreset_event && !pending_write_)
416 return;
417
418 CHECK(pending_write_);
419 pending_write_ = false;
420 }
421
422 // Note: |OnWriteCompleted()| may detach us from |owner_|.
423 if (error == ERROR_SUCCESS) {
424 // Reset |platform_handles_written_| before calling |OnWriteCompleted()|
425 // since that function may call back to this class and set it again.
426 size_t local_platform_handles_written_ = platform_handles_written_;
427 platform_handles_written_ = 0;
428 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
429 bytes_written);
430 } else if (error == ERROR_BROKEN_PIPE ||
431 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
432 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0);
433 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
434 size_t local_platform_handles_written_ = platform_handles_written_;
435 platform_handles_written_ = 0;
436 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
437 bytes_written);
438 } else {
439 LOG(WARNING) << "WriteFile: "
440 << logging::SystemErrorCodeToString(error);
441 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
442 }
443 }
444
445 embedder::ScopedPlatformHandle handle_;
446
447 // |owner_| is reset on the I/O thread under |owner_->write_lock()|.
448 // Therefore, it may be used on any thread under lock; or on the I/O thread
449 // without locking.
450 RawChannelWin* owner_;
451
452 // The following members must be used on the I/O thread.
453 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
454 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
455 bool suppress_self_destruct_;
456
457 bool pending_read_;
458 base::MessageLoopForIO::IOContext read_context_;
459
460 // The following members must be used under |owner_->write_lock()| while the
461 // object is still attached to the owner, and only on the I/O thread
462 // afterwards.
463 bool pending_write_;
464 size_t platform_handles_written_;
465 base::MessageLoopForIO::IOContext write_context_;
466
467 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
468 };
469
470 embedder::ScopedPlatformHandle ReleaseHandleNoLock(
471 std::vector<char>* read_buffer_out) override {
472 std::vector<WriteBuffer::Buffer> buffers;
473 write_buffer_no_lock()->GetBuffers(&buffers);
474 if (!buffers.empty()) {
475 // TODO(jam): copy code in OnShutdownNoLock
476 NOTREACHED() << "releasing handle with pending write buffer";
477 }
478
479
480 if( handle_.is_valid()) {
481 // SetInitialBuffer could have been called on main thread before OnInit
482 // is called on Io thread. and in meantime releasehandle called.
483 //DCHECK(read_buffer()->num_valid_bytes() == 0);
484 if (read_buffer()->num_valid_bytes()) {
485 read_buffer_out->resize(read_buffer()->num_valid_bytes());
486 memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(),
487 read_buffer()->num_valid_bytes());
488 read_buffer()->Reset();
yzshen1 2015/09/23 22:47:09 Before we cancel IO (which happens in io_handler_-
489 }
490 DCHECK(write_buffer_no_lock()->queue_size() == 0);
491 return embedder::ScopedPlatformHandle(
492 embedder::PlatformHandle(handle_.release().handle));
493 }
494
495 return io_handler_->ReleaseHandle(read_buffer_out);
496 }
497 embedder::PlatformHandle HandleForDebuggingNoLock() override {
498 if (handle_.is_valid())
499 return handle_.get();
500
501 if (!io_handler_)
502 return embedder::PlatformHandle();
503
504 return embedder::PlatformHandle(io_handler_->handle());
505 }
506
507 IOResult Read(size_t* bytes_read) override {
508 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
509
510 char* buffer = nullptr;
511 size_t bytes_to_read = 0;
512 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
513
514 DCHECK(io_handler_);
515 DCHECK(!io_handler_->pending_read());
516 BOOL result = ReadFile(
517 io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read),
518 nullptr, &io_handler_->read_context()->overlapped);
519 if (!result) {
520 DWORD error = GetLastError();
521 if (error == ERROR_BROKEN_PIPE)
522 return IO_FAILED_SHUTDOWN;
523 if (error != ERROR_IO_PENDING) {
524 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
525 return IO_FAILED_UNKNOWN;
526 }
527 }
528
529 if (result && skip_completion_port_on_success_) {
530 DWORD bytes_read_dword = 0;
531 BOOL get_size_result = GetOverlappedResult(
532 io_handler_->handle(), &io_handler_->read_context()->overlapped,
533 &bytes_read_dword, FALSE);
534 DPCHECK(get_size_result);
535 *bytes_read = bytes_read_dword;
536 return IO_SUCCEEDED;
537 }
538
539 if (!g_use_autoreset_event) {
540 if (!g_use_iocp) {
541 io_handler_->read_watcher_.StartWatching(
542 io_handler_->read_event, io_handler_, false);
543 }
544 }
545 // If the read is pending or the read has succeeded but we don't skip
546 // completion port on success, instruct |io_handler_| to wait for the
547 // completion packet.
548 //
549 // TODO(yzshen): It seems there isn't document saying that all error cases
550 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
551 // packet. If we do get one for errors,
552 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
553 // it.
554
555 io_handler_->OnPendingReadStarted();
556 return IO_PENDING;
557 }
558
559 IOResult ScheduleRead() override {
560 if (!io_handler_)
561 return IO_PENDING; // OnInit could have earlied out.
562
563 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
564 DCHECK(io_handler_);
565 DCHECK(!io_handler_->pending_read());
566
567 size_t bytes_read = 0;
568 IOResult io_result = Read(&bytes_read);
569 if (io_result == IO_SUCCEEDED) {
570 DCHECK(skip_completion_port_on_success_);
571
572 // We have finished reading successfully. Queue a notification manually.
573 io_handler_->OnPendingReadStarted();
574 // |io_handler_| won't go away before the task is run, so it is safe to
575 // use |base::Unretained()|.
576 message_loop_for_io()->PostTask(
577 FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted,
578 base::Unretained(io_handler_),
579 base::Unretained(io_handler_->read_context()),
580 static_cast<DWORD>(bytes_read), ERROR_SUCCESS));
581 return IO_PENDING;
582 }
583
584 return io_result;
585 }
586 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
587 size_t num_platform_handles,
588 const void* platform_handle_table) override {
589 // TODO(jam): this code will have to be updated once it's used in a sandbox
590 // and the receiving process doesn't have duplicate permission for the
591 // receiver. Once there's a broker and we have a connection to it (possibly
592 // through ConnectionManager), then we can make a sync IPC to it here to get
593 // a token for this handle, and it will duplicate the handle to is process.
594 // Then we pass the token to the receiver, which will then make a sync call
595 // to the broker to get a duplicated handle. This will also allow us to
596 // avoid leaks of the handle if the receiver dies, since the broker can
597 // notice that.
598 DCHECK_GT(num_platform_handles, 0u);
599 embedder::ScopedPlatformHandleVectorPtr rv(
600 new embedder::PlatformHandleVector());
601
602 const char* serialization_data =
603 static_cast<const char*>(platform_handle_table);
604 for (size_t i = 0; i < num_platform_handles; i++) {
605 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data);
606 serialization_data += sizeof(DWORD);
607 HANDLE source_handle =
608 *reinterpret_cast<const HANDLE*>(serialization_data);
609 serialization_data += sizeof(HANDLE);
610 base::Process sender =
611 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
612 DCHECK(sender.IsValid());
613 HANDLE target_handle = NULL;
614 BOOL dup_result = DuplicateHandle(
615 sender.Handle(), source_handle,
616 base::GetCurrentProcessHandle(), &target_handle, 0,
617 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
618 DCHECK(dup_result);
619 rv->push_back(embedder::PlatformHandle(target_handle));
620 }
621 return rv.Pass();
622 }
623
624 IOResult WriteNoLock(size_t* platform_handles_written,
625 size_t* bytes_written) override {
626 write_lock().AssertAcquired();
627
628 DCHECK(io_handler_);
629 DCHECK(!io_handler_->pending_write_no_lock());
630
631 size_t num_platform_handles = 0;
632 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
633 // Since we're not sure which process might ultimately deserialize this
634 // message, we can't duplicate the handle now. Instead, write the process
635 // ID and handle now and let the receiver duplicate it.
636 embedder::PlatformHandle* platform_handles;
637 void* serialization_data_temp;
638 write_buffer_no_lock()->GetPlatformHandlesToSend(
639 &num_platform_handles, &platform_handles, &serialization_data_temp);
640 char* serialization_data = static_cast<char*>(serialization_data_temp);
641 DCHECK_GT(num_platform_handles, 0u);
642 DCHECK(platform_handles);
643
644 DWORD current_process_id = base::GetCurrentProcId();
645 for (size_t i = 0; i < num_platform_handles; i++) {
646 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id;
647 serialization_data += sizeof(DWORD);
648 *reinterpret_cast<HANDLE*>(serialization_data) =
649 platform_handles[i].handle;
650 serialization_data += sizeof(HANDLE);
651 platform_handles[i] = embedder::PlatformHandle();
652 }
653 }
654
655 std::vector<WriteBuffer::Buffer> buffers;
656 write_buffer_no_lock()->GetBuffers(&buffers);
657 DCHECK(!buffers.empty());
658
659 // TODO(yzshen): Handle multi-segment writes more efficiently.
660 DWORD bytes_written_dword = 0;
661
662
663
664
665 // TODO(jam): right now we get in bad situation where we might first write
666 // the main buffer and then the MP gets sent before we write the transport
667 // buffer. We can fix this by sending information about partially written
668 // messages, or by teaching transport buffer how to grow the main buffer and
669 // write its data there.
670 // Until that's done, for now make another copy.
671
672 size_t total_size = buffers[0].size;
673 if (buffers.size() > 1)
674 total_size+=buffers[1].size;
675 char* buf = new char[total_size];
676 memcpy(buf, buffers[0].addr, buffers[0].size);
677 if (buffers.size() > 1)
678 memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size);
679
680 BOOL result = WriteFile(
681 io_handler_->handle(), buf,
682 static_cast<DWORD>(total_size),
683 &bytes_written_dword,
684 &io_handler_->write_context_no_lock()->overlapped);
685 delete [] buf;
686
687 if (!result) {
688 DWORD error = GetLastError();
689 if (error == ERROR_BROKEN_PIPE)
690 return IO_FAILED_SHUTDOWN;
691 if (error != ERROR_IO_PENDING) {
692 LOG(WARNING) << "WriteFile: "
693 << logging::SystemErrorCodeToString(error);
694 return IO_FAILED_UNKNOWN;
695 }
696 }
697
698 if (result && skip_completion_port_on_success_) {
699 *platform_handles_written = num_platform_handles;
700 *bytes_written = bytes_written_dword;
701 return IO_SUCCEEDED;
702 }
703
704 if (!g_use_autoreset_event) {
705 if (!g_use_iocp) {
706 io_handler_->write_watcher_.StartWatching(
707 io_handler_->write_event, io_handler_, false);
708 }
709 }
710 // If the write is pending or the write has succeeded but we don't skip
711 // completion port on success, instruct |io_handler_| to wait for the
712 // completion packet.
713 //
714 // TODO(yzshen): it seems there isn't document saying that all error cases
715 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
716 // packet. If we do get one for errors,
717 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
718 // it.
719
720 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles);
721 return IO_PENDING;
722 }
723
724 IOResult ScheduleWriteNoLock() override {
725 write_lock().AssertAcquired();
726
727 DCHECK(io_handler_);
728 DCHECK(!io_handler_->pending_write_no_lock());
729
730 size_t platform_handles_written = 0;
731 size_t bytes_written = 0;
732 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
733 if (io_result == IO_SUCCEEDED) {
734 DCHECK(skip_completion_port_on_success_);
735
736 // We have finished writing successfully. Queue a notification manually.
737 io_handler_->OnPendingWriteStartedNoLock(platform_handles_written);
738 // |io_handler_| won't go away before that task is run, so it is safe to
739 // use |base::Unretained()|.
740 message_loop_for_io()->PostTask(
741 FROM_HERE,
742 base::Bind(&RawChannelIOHandler::OnIOCompleted,
743 base::Unretained(io_handler_),
744 base::Unretained(io_handler_->write_context_no_lock()),
745 static_cast<DWORD>(bytes_written), ERROR_SUCCESS));
746 return IO_PENDING;
747 }
748
749 return io_result;
750 }
751
752 void OnInit() override {
753 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
754
755 if (!handle_.is_valid()) {
756 LOG(ERROR) << "Note: RawChannelWin " << this
757 << " early exiting in OnInit because no handle";
758 return;
759 }
760
761 DCHECK(handle_.is_valid());
762 if (skip_completion_port_on_success_) {
763 // I don't know how this can fail (unless |handle_| is bad, in which case
764 // it's a bug in our code).
765 CHECK(g_vista_or_higher_functions.Get().
766 SetFileCompletionNotificationModes(
767 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS));
768 }
769
770 DCHECK(!io_handler_);
771 io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
772 }
773
774 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
775 scoped_ptr<WriteBuffer> write_buffer) override {
776 // happens on shutdown if didn't call init when doing createduplicate
777 if (message_loop_for_io()) {
778 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
779 }
780
781 if (!io_handler_) {
782 // This is hit when creating a duplicate dispatcher since we don't call
783 // Init on it.
784 DCHECK_EQ(read_buffer->num_valid_bytes(), 0U);
785 DCHECK_EQ(write_buffer->queue_size(), 0U);
786 return;
787 }
788
789 if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
790 // |io_handler_| will be alive until pending read/write (if any)
791 // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be
792 // freed up as soon as possible.
793 // Note: |CancelIo()| only cancels read/write requests started from this
794 // thread.
795 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) {
796 g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(),
797 nullptr);
798 } else {
799 CancelIo(io_handler_->handle());
800 }
801 }
802
803 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
804 io_handler_ = nullptr;
805 }
806
807 // Passed to |io_handler_| during initialization.
808 embedder::ScopedPlatformHandle handle_;
809
810 RawChannelIOHandler* io_handler_;
811
812 const bool skip_completion_port_on_success_;
813
814 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
815 };
816
817
818 } // namespace
819
820 // -----------------------------------------------------------------------------
821
822 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle) {
823 return new RawChannelWin(handle.Pass());
824 }
825
826 size_t RawChannel::GetSerializedPlatformHandleSize() {
827 return sizeof(DWORD) + sizeof(HANDLE);
828 }
829
830 } // namespace system
831 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698