Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Side by Side Diff: mojo/edk/system/raw_channel_win.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: move to mojo::edk namespace in preparation for runtim flag Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/raw_channel.h"
6
7 #include <windows.h>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/process/process.h"
16 #include "base/synchronization/lock.h"
17 #include "base/win/object_watcher.h"
18 #include "base/win/windows_version.h"
19 #include "mojo/edk/embedder/platform_handle.h"
20 #include "mojo/public/cpp/system/macros.h"
21
22 #define STATUS_CANCELLED 0xC0000120
23 #define STATUS_PIPE_BROKEN 0xC000014B
24
25 // We can't use IO completion ports if we send a message pipe. The reason is
26 // that the only way to stop an existing IOCP is by closing the pipe handle.
27 // See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.8 5).aspx
28 bool g_use_iocp = false;
29
30 // Manual reset per
31 // Doc for overlapped I/O says use manual per
32 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85). aspx
33 // However using an auto-reset event makes the perf test 5x faster and also
34 // works since we don't wait on the event elsewhere or call GetOverlappedResult
35 // before it fires.
36 bool g_use_autoreset_event = true;
37
38 namespace mojo {
39 namespace edk {
40
41 namespace {
42
43 struct MOJO_ALIGNAS(8) SerializedHandle {
44 DWORD handle_pid;
45 HANDLE handle;
46 };
47
48 class VistaOrHigherFunctions {
49 public:
50 VistaOrHigherFunctions()
51 : is_vista_or_higher_(
52 base::win::GetVersion() >= base::win::VERSION_VISTA),
53 set_file_completion_notification_modes_(nullptr),
54 cancel_io_ex_(nullptr) {
55 if (!is_vista_or_higher_)
56 return;
57
58 HMODULE module = GetModuleHandleW(L"kernel32.dll");
59 set_file_completion_notification_modes_ =
60 reinterpret_cast<SetFileCompletionNotificationModesFunc>(
61 GetProcAddress(module, "SetFileCompletionNotificationModes"));
62 DCHECK(set_file_completion_notification_modes_);
63
64 cancel_io_ex_ =
65 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx"));
66 DCHECK(cancel_io_ex_);
67 }
68
69 bool is_vista_or_higher() const { return is_vista_or_higher_; }
70
71 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
72 return set_file_completion_notification_modes_(handle, flags);
73 }
74
75 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
76 return cancel_io_ex_(handle, overlapped);
77 }
78
79 private:
80 using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR);
81 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED);
82
83 bool is_vista_or_higher_;
84 SetFileCompletionNotificationModesFunc
85 set_file_completion_notification_modes_;
86 CancelIoExFunc cancel_io_ex_;
87 };
88
89 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
90 LAZY_INSTANCE_INITIALIZER;
91
92 class RawChannelWin final : public RawChannel {
93 public:
94 RawChannelWin(ScopedPlatformHandle handle)
95 : handle_(handle.Pass()),
96 io_handler_(nullptr),
97 skip_completion_port_on_success_(
98 g_use_iocp &&
99 g_vista_or_higher_functions.Get().is_vista_or_higher()) {
100 DCHECK(handle_.is_valid());
101 }
102 ~RawChannelWin() override {
103 DCHECK(!io_handler_);
104 }
105
106 private:
107 // RawChannelIOHandler receives OS notifications for I/O completion. It must
108 // be created on the I/O thread.
109 //
110 // It manages its own destruction. Destruction happens on the I/O thread when
111 // all the following conditions are satisfied:
112 // - |DetachFromOwnerNoLock()| has been called;
113 // - there is no pending read;
114 // - there is no pending write.
115 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler,
116 public base::win::ObjectWatcher::Delegate {
117 public:
118 RawChannelIOHandler(RawChannelWin* owner,
119 ScopedPlatformHandle handle)
120 : handle_(handle.Pass()),
121 owner_(owner),
122 suppress_self_destruct_(false),
123 pending_read_(false),
124 pending_write_(false),
125 platform_handles_written_(0) {
126 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
127 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
128 if (g_use_iocp) {
129 owner_->message_loop_for_io()->RegisterIOHandler(
130 handle_.get().handle, this);
131 read_context_.handler = this;
132 write_context_.handler = this;
133 } else {
134 read_event = CreateEvent(
135 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
136 write_event = CreateEvent(
137 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
138 read_context_.overlapped.hEvent = read_event;
139 write_context_.overlapped.hEvent = write_event;
140
141
142 if (g_use_autoreset_event) {
143 read_watcher_.StartWatching(read_event, this, true);
144 write_watcher_.StartWatching(write_event, this, true);
145 }
146 }
147 }
148
149 ~RawChannelIOHandler() override {
150 DCHECK(ShouldSelfDestruct());
151 }
152
153 HANDLE handle() const { return handle_.get().handle; }
154
155 // The following methods are only called by the owner on the I/O thread.
156 bool pending_read() const {
157 DCHECK(owner_);
158 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
159 return pending_read_;
160 }
161
162 base::MessageLoopForIO::IOContext* read_context() {
163 DCHECK(owner_);
164 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
165 return &read_context_;
166 }
167
168 // Instructs the object to wait for an |OnIOCompleted()| notification.
169 void OnPendingReadStarted() {
170 DCHECK(owner_);
171 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
172 DCHECK(!pending_read_);
173 pending_read_ = true;
174 }
175
176 // The following methods are only called by the owner under
177 // |owner_->write_lock()|.
178 bool pending_write_no_lock() const {
179 DCHECK(owner_);
180 owner_->write_lock().AssertAcquired();
181 return pending_write_;
182 }
183
184 base::MessageLoopForIO::IOContext* write_context_no_lock() {
185 DCHECK(owner_);
186 owner_->write_lock().AssertAcquired();
187 return &write_context_;
188 }
189 // Instructs the object to wait for an |OnIOCompleted()| notification.
190 void OnPendingWriteStartedNoLock(size_t platform_handles_written) {
191 DCHECK(owner_);
192 owner_->write_lock().AssertAcquired();
193 DCHECK(!pending_write_);
194 pending_write_ = true;
195 platform_handles_written_ = platform_handles_written;
196 }
197
198 // |base::MessageLoopForIO::IOHandler| implementation:
199 // Must be called on the I/O thread. It could be called before or after
200 // detached from the owner.
201 void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
202 DWORD bytes_transferred,
203 DWORD error) override {
204 DCHECK(!owner_ ||
205 base::MessageLoop::current() == owner_->message_loop_for_io());
206
207 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case
208 // they result in a call to |Shutdown()|).
209 bool old_suppress_self_destruct = suppress_self_destruct_;
210 suppress_self_destruct_ = true;
211
212 if (context == &read_context_)
213 OnReadCompleted(bytes_transferred, error);
214 else if (context == &write_context_)
215 OnWriteCompleted(bytes_transferred, error);
216 else
217 NOTREACHED();
218
219 // Maybe allow self-destruction again.
220 suppress_self_destruct_ = old_suppress_self_destruct;
221
222 if (ShouldSelfDestruct())
223 delete this;
224 }
225
226 // Must be called on the I/O thread under |owner_->write_lock()|.
227 // After this call, the owner must not make any further calls on this
228 // object, and therefore the object is used on the I/O thread exclusively
229 // (if it stays alive).
230 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
231 scoped_ptr<WriteBuffer> write_buffer) {
232 DCHECK(owner_);
233 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
234 //owner_->write_lock().AssertAcquired();
235
236 // If read/write is pending, we have to retain the corresponding buffer.
237 if (pending_read_)
238 preserved_read_buffer_after_detach_ = read_buffer.Pass();
239 if (pending_write_)
240 preserved_write_buffer_after_detach_ = write_buffer.Pass();
241
242 owner_ = nullptr;
243 if (ShouldSelfDestruct())
244 delete this;
245 }
246
247 ScopedPlatformHandle ReleaseHandle(std::vector<char>* read_buffer) {
248 // TODO(jam): handle XP
249 CancelIoEx(handle(), NULL);
250 // NOTE!!!!
251 // The above call will cancel pending IO calls.
252 // HOWEVER, some could have already finished and posted task to IO thread
253 // that will execute
254
255
256 size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes();
257
258 if (pending_read_) {
259 DWORD bytes_read_dword = 0;
260
261 DWORD old_bytes = read_context_.overlapped.InternalHigh;
262
263 //TODO(jam): for XP, can return TRUE here to wait. also below.
264 BOOL rv = GetOverlappedResult(
265 handle(), &read_context_.overlapped, &bytes_read_dword, FALSE);
266 DCHECK_EQ(old_bytes, bytes_read_dword);
267 if (rv) {
268 if (read_context_.overlapped.Internal != STATUS_CANCELLED) {
269 read_buffer_byte_size += read_context_.overlapped.InternalHigh;
270 }
271 }
272 pending_read_ = false;
273 }
274
275 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock();
276
277 if (pending_write_) {
278 DWORD bytes_written_dword = 0;
279 DWORD old_bytes = write_context_.overlapped.InternalHigh;
280
281
282 BOOL rv = GetOverlappedResult(
283 handle(), &write_context_.overlapped, &bytes_written_dword, FALSE);
284
285 if (old_bytes != bytes_written_dword) {
286 NOTREACHED();
287 }
288
289 if (rv) {
290 if (write_context_.overlapped.Internal != STATUS_CANCELLED) {
291 CHECK(write_buffer->queue_size() != 0);
292
293 // TODO(jam)
294 DCHECK(!write_buffer->HavePlatformHandlesToSend());
295
296 write_buffer->data_offset_ += bytes_written_dword;
297
298 // TODO(jam): copied from OnWriteCompletedNoLock
299 MessageInTransit* message =
300 write_buffer->message_queue()->PeekMessage();
301 if (write_buffer->data_offset_ >= message->total_size()) {
302 // Complete write.
303 CHECK_EQ(write_buffer->data_offset_, message->total_size());
304 write_buffer->message_queue_.DiscardMessage();
305 write_buffer->platform_handles_offset_ = 0;
306 write_buffer->data_offset_ = 0;
307 }
308
309
310 //TODO(jam): handle more write msgs
311 DCHECK(write_buffer->message_queue_.IsEmpty());
312 }
313 }
314 pending_write_ = false;
315 }
316
317 if (read_buffer_byte_size) {
318 read_buffer->resize(read_buffer_byte_size);
319 memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(),
320 read_buffer_byte_size);
321 owner_->read_buffer()->Reset();
322 }
323
324 return ScopedPlatformHandle(handle_.release());
325 }
326
327 void OnObjectSignaled(HANDLE object) override {
328 // Since this is called on the IO thread, no locks needed for owner_.
yzshen1 2015/09/23 22:47:09 I think this is not necessarily called from the IO
329 bool handle_is_valid = false;
330 if (owner_)
331 owner_->read_lock().Acquire();
332 handle_is_valid = handle_.is_valid();
333 if (owner_)
334 owner_->read_lock().Release();
335 if (!handle_is_valid) {
336 if (object == read_event)
337 pending_read_ = false;
338 else
339 pending_write_ = false;
340 if (ShouldSelfDestruct())
341 delete this;
342 return;
343 }
344
345 if (object == read_event) {
346 OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh,
347 read_context_.overlapped.Internal);
348
349 } else {
350 CHECK(object == write_event);
351 OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh,
352 write_context_.overlapped.Internal);
353 }
354 }
355 HANDLE read_event, write_event;
356 base::win::ObjectWatcher read_watcher_, write_watcher_;
357
358 private:
359 // Returns true if |owner_| has been reset and there is not pending read or
360 // write.
361 // Must be called on the I/O thread.
362 bool ShouldSelfDestruct() const {
363 if (owner_ || suppress_self_destruct_)
364 return false;
365
366 // Note: Detached, hence no lock needed for |pending_write_|.
367 return !pending_read_ && !pending_write_;
368 }
369
370 // Must be called on the I/O thread. It may be called before or after
371 // detaching from the owner.
372 void OnReadCompleted(DWORD bytes_read, DWORD error) {
373 DCHECK(!owner_ ||
374 base::MessageLoop::current() == owner_->message_loop_for_io());
375 DCHECK(suppress_self_destruct_);
376
377 if (g_use_autoreset_event && !pending_read_)
378 return;
379
380 CHECK(pending_read_);
381 pending_read_ = false;
382 if (!owner_)
383 return;
384
385 // Note: |OnReadCompleted()| may detach us from |owner_|.
386 if (error == ERROR_SUCCESS) {
387 DCHECK_GT(bytes_read, 0u);
388 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
389 } else if (error == ERROR_BROKEN_PIPE ||
390 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
391 DCHECK_EQ(bytes_read, 0u);
392 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0);
393 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
394 return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
395 } else {
396 DCHECK_EQ(bytes_read, 0u);
397 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
398 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0);
399 }
400 }
401
402 // Must be called on the I/O thread. It may be called before or after
403 // detaching from the owner.
404 void OnWriteCompleted(DWORD bytes_written, DWORD error) {
405 DCHECK(!owner_ ||
406 base::MessageLoop::current() == owner_->message_loop_for_io());
407 DCHECK(suppress_self_destruct_);
408
409 if (!owner_) {
410 // No lock needed.
411 CHECK(pending_write_);
412 pending_write_ = false;
413 return;
414 }
415
416 {
417 base::AutoLock locker(owner_->write_lock());
418 if (g_use_autoreset_event && !pending_write_)
419 return;
420
421 CHECK(pending_write_);
422 pending_write_ = false;
423 }
424
425 // Note: |OnWriteCompleted()| may detach us from |owner_|.
426 if (error == ERROR_SUCCESS) {
427 // Reset |platform_handles_written_| before calling |OnWriteCompleted()|
428 // since that function may call back to this class and set it again.
429 size_t local_platform_handles_written_ = platform_handles_written_;
430 platform_handles_written_ = 0;
431 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
432 bytes_written);
433 } else if (error == ERROR_BROKEN_PIPE ||
434 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
435 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0);
436 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
437 size_t local_platform_handles_written_ = platform_handles_written_;
438 platform_handles_written_ = 0;
439 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
440 bytes_written);
441 } else {
442 LOG(WARNING) << "WriteFile: "
443 << logging::SystemErrorCodeToString(error);
444 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
445 }
446 }
447
448 ScopedPlatformHandle handle_;
449
450 // |owner_| is reset on the I/O thread under |owner_->write_lock()|.
451 // Therefore, it may be used on any thread under lock; or on the I/O thread
452 // without locking.
453 RawChannelWin* owner_;
454
455 // The following members must be used on the I/O thread.
456 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
457 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
458 bool suppress_self_destruct_;
459
460 bool pending_read_;
461 base::MessageLoopForIO::IOContext read_context_;
462
463 // The following members must be used under |owner_->write_lock()| while the
464 // object is still attached to the owner, and only on the I/O thread
465 // afterwards.
466 bool pending_write_;
467 size_t platform_handles_written_;
468 base::MessageLoopForIO::IOContext write_context_;
469
470 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
471 };
472
473 ScopedPlatformHandle ReleaseHandleNoLock(
474 std::vector<char>* read_buffer_out) override {
475 std::vector<WriteBuffer::Buffer> buffers;
476 write_buffer_no_lock()->GetBuffers(&buffers);
477 if (!buffers.empty()) {
478 // TODO(jam): copy code in OnShutdownNoLock
479 NOTREACHED() << "releasing handle with pending write buffer";
480 }
481
482
483 if( handle_.is_valid()) {
484 // SetInitialBuffer could have been called on main thread before OnInit
485 // is called on Io thread. and in meantime releasehandle called.
486 //DCHECK(read_buffer()->num_valid_bytes() == 0);
487 if (read_buffer()->num_valid_bytes()) {
488 read_buffer_out->resize(read_buffer()->num_valid_bytes());
489 memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(),
490 read_buffer()->num_valid_bytes());
491 read_buffer()->Reset();
492 }
493 DCHECK(write_buffer_no_lock()->queue_size() == 0);
494 return ScopedPlatformHandle(PlatformHandle(handle_.release().handle));
495 }
496
497 return io_handler_->ReleaseHandle(read_buffer_out);
498 }
499 PlatformHandle HandleForDebuggingNoLock() override {
500 if (handle_.is_valid())
501 return handle_.get();
502
503 if (!io_handler_)
504 return PlatformHandle();
505
506 return PlatformHandle(io_handler_->handle());
507 }
508
509 IOResult Read(size_t* bytes_read) override {
510 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
511
512 char* buffer = nullptr;
513 size_t bytes_to_read = 0;
514 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
515
516 DCHECK(io_handler_);
517 DCHECK(!io_handler_->pending_read());
518 BOOL result = ReadFile(
519 io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read),
520 nullptr, &io_handler_->read_context()->overlapped);
521 if (!result) {
522 DWORD error = GetLastError();
523 if (error == ERROR_BROKEN_PIPE)
524 return IO_FAILED_SHUTDOWN;
525 if (error != ERROR_IO_PENDING) {
526 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
527 return IO_FAILED_UNKNOWN;
528 }
529 }
530
531 if (result && skip_completion_port_on_success_) {
532 DWORD bytes_read_dword = 0;
533 BOOL get_size_result = GetOverlappedResult(
534 io_handler_->handle(), &io_handler_->read_context()->overlapped,
535 &bytes_read_dword, FALSE);
536 DPCHECK(get_size_result);
537 *bytes_read = bytes_read_dword;
538 return IO_SUCCEEDED;
539 }
540
541 if (!g_use_autoreset_event) {
542 if (!g_use_iocp) {
543 io_handler_->read_watcher_.StartWatching(
544 io_handler_->read_event, io_handler_, false);
545 }
546 }
547 // If the read is pending or the read has succeeded but we don't skip
548 // completion port on success, instruct |io_handler_| to wait for the
549 // completion packet.
550 //
551 // TODO(yzshen): It seems there isn't document saying that all error cases
552 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
553 // packet. If we do get one for errors,
554 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
555 // it.
556
557 io_handler_->OnPendingReadStarted();
558 return IO_PENDING;
559 }
560
561 IOResult ScheduleRead() override {
562 if (!io_handler_)
563 return IO_PENDING; // OnInit could have earlied out.
564
565 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
566 DCHECK(io_handler_);
567 DCHECK(!io_handler_->pending_read());
568
569 size_t bytes_read = 0;
570 IOResult io_result = Read(&bytes_read);
571 if (io_result == IO_SUCCEEDED) {
572 DCHECK(skip_completion_port_on_success_);
573
574 // We have finished reading successfully. Queue a notification manually.
575 io_handler_->OnPendingReadStarted();
576 // |io_handler_| won't go away before the task is run, so it is safe to
577 // use |base::Unretained()|.
578 message_loop_for_io()->PostTask(
579 FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted,
580 base::Unretained(io_handler_),
581 base::Unretained(io_handler_->read_context()),
582 static_cast<DWORD>(bytes_read), ERROR_SUCCESS));
583 return IO_PENDING;
584 }
585
586 return io_result;
587 }
588 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
589 size_t num_platform_handles,
590 const void* platform_handle_table) override {
591 // TODO(jam): this code will have to be updated once it's used in a sandbox
592 // and the receiving process doesn't have duplicate permission for the
593 // receiver. Once there's a broker and we have a connection to it (possibly
594 // through ConnectionManager), then we can make a sync IPC to it here to get
595 // a token for this handle, and it will duplicate the handle to is process.
596 // Then we pass the token to the receiver, which will then make a sync call
597 // to the broker to get a duplicated handle. This will also allow us to
598 // avoid leaks of the handle if the receiver dies, since the broker can
599 // notice that.
600 DCHECK_GT(num_platform_handles, 0u);
601 ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector());
602
603 const SerializedHandle* serialization_data =
604 static_cast<const SerializedHandle*>(platform_handle_table);
605 for (size_t i = 0; i < num_platform_handles; i++) {
606 DWORD pid = serialization_data->handle_pid;
607 HANDLE source_handle = serialization_data->handle;
608 serialization_data ++;
609 base::Process sender =
610 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
611 DCHECK(sender.IsValid());
612 HANDLE target_handle = NULL;
613 BOOL dup_result = DuplicateHandle(
614 sender.Handle(), source_handle,
615 base::GetCurrentProcessHandle(), &target_handle, 0,
616 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
617 DCHECK(dup_result);
618 rv->push_back(PlatformHandle(target_handle));
619 }
620 return rv.Pass();
621 }
622
623 IOResult WriteNoLock(size_t* platform_handles_written,
624 size_t* bytes_written) override {
625 write_lock().AssertAcquired();
626
627 DCHECK(io_handler_);
628 DCHECK(!io_handler_->pending_write_no_lock());
629
630 size_t num_platform_handles = 0;
631 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
632 // Since we're not sure which process might ultimately deserialize this
633 // message, we can't duplicate the handle now. Instead, write the process
634 // ID and handle now and let the receiver duplicate it.
635 PlatformHandle* platform_handles;
636 void* serialization_data_temp;
637 write_buffer_no_lock()->GetPlatformHandlesToSend(
638 &num_platform_handles, &platform_handles, &serialization_data_temp);
639 SerializedHandle* serialization_data =
640 static_cast<SerializedHandle*>(serialization_data_temp);
641 DCHECK_GT(num_platform_handles, 0u);
642 DCHECK(platform_handles);
643
644 DWORD current_process_id = base::GetCurrentProcId();
645 for (size_t i = 0; i < num_platform_handles; i++) {
646 serialization_data->handle_pid = current_process_id;
647 serialization_data->handle = platform_handles[i].handle;
648 serialization_data++;
649 platform_handles[i] = PlatformHandle();
650 }
651 }
652
653 std::vector<WriteBuffer::Buffer> buffers;
654 write_buffer_no_lock()->GetBuffers(&buffers);
655 DCHECK(!buffers.empty());
656
657 // TODO(yzshen): Handle multi-segment writes more efficiently.
658 DWORD bytes_written_dword = 0;
659
660
661
662
663 // TODO(jam): right now we get in bad situation where we might first write
664 // the main buffer and then the MP gets sent before we write the transport
665 // buffer. We can fix this by sending information about partially written
666 // messages, or by teaching transport buffer how to grow the main buffer and
667 // write its data there.
668 // Until that's done, for now make another copy.
669
670 size_t total_size = buffers[0].size;
671 if (buffers.size() > 1)
672 total_size+=buffers[1].size;
673 char* buf = new char[total_size];
674 memcpy(buf, buffers[0].addr, buffers[0].size);
675 if (buffers.size() > 1)
676 memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size);
677
678 BOOL result = WriteFile(
679 io_handler_->handle(), buf,
680 static_cast<DWORD>(total_size),
681 &bytes_written_dword,
682 &io_handler_->write_context_no_lock()->overlapped);
683 delete [] buf;
684
685 if (!result) {
686 DWORD error = GetLastError();
687 if (error == ERROR_BROKEN_PIPE)
688 return IO_FAILED_SHUTDOWN;
689 if (error != ERROR_IO_PENDING) {
690 LOG(WARNING) << "WriteFile: "
691 << logging::SystemErrorCodeToString(error);
692 return IO_FAILED_UNKNOWN;
693 }
694 }
695
696 if (result && skip_completion_port_on_success_) {
697 *platform_handles_written = num_platform_handles;
698 *bytes_written = bytes_written_dword;
699 return IO_SUCCEEDED;
700 }
701
702 if (!g_use_autoreset_event) {
703 if (!g_use_iocp) {
704 io_handler_->write_watcher_.StartWatching(
705 io_handler_->write_event, io_handler_, false);
706 }
707 }
708 // If the write is pending or the write has succeeded but we don't skip
709 // completion port on success, instruct |io_handler_| to wait for the
710 // completion packet.
711 //
712 // TODO(yzshen): it seems there isn't document saying that all error cases
713 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
714 // packet. If we do get one for errors,
715 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
716 // it.
717
718 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles);
719 return IO_PENDING;
720 }
721
722 IOResult ScheduleWriteNoLock() override {
723 write_lock().AssertAcquired();
724
725 DCHECK(io_handler_);
726 DCHECK(!io_handler_->pending_write_no_lock());
727
728 size_t platform_handles_written = 0;
729 size_t bytes_written = 0;
730 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
731 if (io_result == IO_SUCCEEDED) {
732 DCHECK(skip_completion_port_on_success_);
733
734 // We have finished writing successfully. Queue a notification manually.
735 io_handler_->OnPendingWriteStartedNoLock(platform_handles_written);
736 // |io_handler_| won't go away before that task is run, so it is safe to
737 // use |base::Unretained()|.
738 message_loop_for_io()->PostTask(
739 FROM_HERE,
740 base::Bind(&RawChannelIOHandler::OnIOCompleted,
741 base::Unretained(io_handler_),
742 base::Unretained(io_handler_->write_context_no_lock()),
743 static_cast<DWORD>(bytes_written), ERROR_SUCCESS));
744 return IO_PENDING;
745 }
746
747 return io_result;
748 }
749
750 void OnInit() override {
751 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
752
753 if (!handle_.is_valid()) {
754 LOG(ERROR) << "Note: RawChannelWin " << this
755 << " early exiting in OnInit because no handle";
756 return;
757 }
758
759 DCHECK(handle_.is_valid());
760 if (skip_completion_port_on_success_) {
761 // I don't know how this can fail (unless |handle_| is bad, in which case
762 // it's a bug in our code).
763 CHECK(g_vista_or_higher_functions.Get().
764 SetFileCompletionNotificationModes(
765 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS));
766 }
767
768 DCHECK(!io_handler_);
769 io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
770 }
771
772 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
773 scoped_ptr<WriteBuffer> write_buffer) override {
774 // happens on shutdown if didn't call init when doing createduplicate
775 if (message_loop_for_io()) {
776 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
777 }
778
779 if (!io_handler_) {
780 // This is hit when creating a duplicate dispatcher since we don't call
781 // Init on it.
782 DCHECK_EQ(read_buffer->num_valid_bytes(), 0U);
783 DCHECK_EQ(write_buffer->queue_size(), 0U);
784 return;
785 }
786
787 if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
788 // |io_handler_| will be alive until pending read/write (if any)
789 // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be
790 // freed up as soon as possible.
791 // Note: |CancelIo()| only cancels read/write requests started from this
792 // thread.
793 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) {
794 g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(),
795 nullptr);
796 } else {
797 CancelIo(io_handler_->handle());
798 }
799 }
800
801 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
802 io_handler_ = nullptr;
803 }
804
805 // Passed to |io_handler_| during initialization.
806 ScopedPlatformHandle handle_;
807
808 RawChannelIOHandler* io_handler_;
809
810 const bool skip_completion_port_on_success_;
811
812 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
813 };
814
815
816 } // namespace
817
818 // -----------------------------------------------------------------------------
819
820 RawChannel* RawChannel::Create(ScopedPlatformHandle handle) {
821 return new RawChannelWin(handle.Pass());
822 }
823
824 size_t RawChannel::GetSerializedPlatformHandleSize() {
825 return sizeof(SerializedHandle);
826 }
827
828 bool RawChannel::IsOtherEndOf(RawChannel* other) {
829 PlatformHandle this_handle = HandleForDebuggingNoLock();
830 PlatformHandle other_handle = other->HandleForDebuggingNoLock();
831
832 // TODO: XP: see http://stackoverflow.com/questions/65170/how-to-get-name-asso ciated-with-open-handle/5286888#5286888
833 WCHAR data1[_MAX_PATH + sizeof(FILE_NAME_INFO)];
834 WCHAR data2[_MAX_PATH + sizeof(FILE_NAME_INFO)];
835 FILE_NAME_INFO* fileinfo1 = reinterpret_cast<FILE_NAME_INFO *>(data1);
836 FILE_NAME_INFO* fileinfo2 = reinterpret_cast<FILE_NAME_INFO *>(data2);
837 CHECK(GetFileInformationByHandleEx(
838 this_handle.handle, FileNameInfo, fileinfo1, arraysize(data1)));
839 CHECK(GetFileInformationByHandleEx(
840 other_handle.handle, FileNameInfo, fileinfo2, arraysize(data2)));
841 std::wstring filepath1(fileinfo1->FileName, fileinfo1->FileNameLength / 2);
842 std::wstring filepath2(fileinfo2->FileName, fileinfo2->FileNameLength / 2);
843 return filepath1 == filepath2;
844 }
845
846 } // namespace edk
847 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698