Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(501)

Side by Side Diff: mojo/edk/system/raw_channel_win.cc

Issue 1350023003: Add a Mojo EDK for Chrome that uses one OS pipe per message pipe. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: more cleanup Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/edk/system/raw_channel.h"
6
7 #include <windows.h>
8
9 #include "base/bind.h"
10 #include "base/lazy_instance.h"
11 #include "base/location.h"
12 #include "base/logging.h"
13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h"
15 #include "base/process/process.h"
16 #include "base/synchronization/lock.h"
17 #include "base/win/object_watcher.h"
18 #include "base/win/windows_version.h"
19 #include "mojo/edk/embedder/platform_handle.h"
20 #include "mojo/public/cpp/system/macros.h"
21
22 #define STATUS_CANCELLED 0xC0000120
23 #define STATUS_PIPE_BROKEN 0xC000014B
24
25 // We can't use IO completion ports if we send a message pipe. The reason is
26 // that the only way to stop an existing IOCP is by closing the pipe handle.
27 // See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.8 5).aspx
28 bool g_use_iocp = false;
29
30 // Manual reset per
31 // Doc for overlapped I/O says use manual per
32 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85). aspx
33 // However using an auto-reset event makes the perf test 5x faster and also
34 // works since we don't wait on the event elsewhere or call GetOverlappedResult
35 // before it fires.
36 bool g_use_autoreset_event = true;
37
38 namespace mojo {
39 namespace edk {
40
41 namespace {
42
43 struct MOJO_ALIGNAS(8) SerializedHandle {
44 DWORD handle_pid;
45 HANDLE handle;
46 };
47
48 class VistaOrHigherFunctions {
49 public:
50 VistaOrHigherFunctions()
51 : is_vista_or_higher_(
52 base::win::GetVersion() >= base::win::VERSION_VISTA),
53 set_file_completion_notification_modes_(nullptr),
54 cancel_io_ex_(nullptr) {
55 if (!is_vista_or_higher_)
56 return;
57
58 HMODULE module = GetModuleHandleW(L"kernel32.dll");
59 set_file_completion_notification_modes_ =
60 reinterpret_cast<SetFileCompletionNotificationModesFunc>(
61 GetProcAddress(module, "SetFileCompletionNotificationModes"));
62 DCHECK(set_file_completion_notification_modes_);
63
64 cancel_io_ex_ =
65 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx"));
66 DCHECK(cancel_io_ex_);
67 }
68
69 bool is_vista_or_higher() const { return is_vista_or_higher_; }
70
71 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
72 return set_file_completion_notification_modes_(handle, flags);
73 }
74
75 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
76 return cancel_io_ex_(handle, overlapped);
77 }
78
79 private:
80 using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR);
81 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED);
82
83 bool is_vista_or_higher_;
84 SetFileCompletionNotificationModesFunc
85 set_file_completion_notification_modes_;
86 CancelIoExFunc cancel_io_ex_;
87 };
88
89 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
90 LAZY_INSTANCE_INITIALIZER;
91
92 class RawChannelWin final : public RawChannel {
93 public:
94 RawChannelWin(ScopedPlatformHandle handle)
95 : handle_(handle.Pass()),
96 io_handler_(nullptr),
97 skip_completion_port_on_success_(
98 g_use_iocp &&
99 g_vista_or_higher_functions.Get().is_vista_or_higher()) {
100 DCHECK(handle_.is_valid());
101 }
102 ~RawChannelWin() override {
103 DCHECK(!io_handler_);
104 }
105
106 private:
107 // RawChannelIOHandler receives OS notifications for I/O completion. It must
108 // be created on the I/O thread.
109 //
110 // It manages its own destruction. Destruction happens on the I/O thread when
111 // all the following conditions are satisfied:
112 // - |DetachFromOwnerNoLock()| has been called;
113 // - there is no pending read;
114 // - there is no pending write.
115 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler,
116 public base::win::ObjectWatcher::Delegate {
117 public:
118 RawChannelIOHandler(RawChannelWin* owner,
119 ScopedPlatformHandle handle)
120 : handle_(handle.Pass()),
121 owner_(owner),
122 suppress_self_destruct_(false),
123 pending_read_(false),
124 pending_write_(false),
125 platform_handles_written_(0) {
126 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
127 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
128 if (g_use_iocp) {
129 owner_->message_loop_for_io()->RegisterIOHandler(
130 handle_.get().handle, this);
131 read_context_.handler = this;
132 write_context_.handler = this;
133 } else {
134 read_event = CreateEvent(
135 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
136 write_event = CreateEvent(
137 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
138 read_context_.overlapped.hEvent = read_event;
139 write_context_.overlapped.hEvent = write_event;
140
141
142 if (g_use_autoreset_event) {
143 read_watcher_.StartWatchingMultipleTimes(read_event, this);
144 write_watcher_.StartWatchingMultipleTimes(write_event, this);
145 }
146 }
147 }
148
149 ~RawChannelIOHandler() override {
150 DCHECK(ShouldSelfDestruct());
151 }
152
153 HANDLE handle() const { return handle_.get().handle; }
154
155 // The following methods are only called by the owner on the I/O thread.
156 bool pending_read() const {
157 DCHECK(owner_);
158 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
159 return pending_read_;
160 }
161
162 base::MessageLoopForIO::IOContext* read_context() {
163 DCHECK(owner_);
164 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
165 return &read_context_;
166 }
167
168 // Instructs the object to wait for an |OnIOCompleted()| notification.
169 void OnPendingReadStarted() {
170 DCHECK(owner_);
171 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
172 DCHECK(!pending_read_);
173 pending_read_ = true;
174 }
175
176 // The following methods are only called by the owner under
177 // |owner_->write_lock()|.
178 bool pending_write_no_lock() const {
179 DCHECK(owner_);
180 owner_->write_lock().AssertAcquired();
181 return pending_write_;
182 }
183
184 base::MessageLoopForIO::IOContext* write_context_no_lock() {
185 DCHECK(owner_);
186 owner_->write_lock().AssertAcquired();
187 return &write_context_;
188 }
189 // Instructs the object to wait for an |OnIOCompleted()| notification.
190 void OnPendingWriteStartedNoLock(size_t platform_handles_written) {
191 DCHECK(owner_);
192 owner_->write_lock().AssertAcquired();
193 DCHECK(!pending_write_);
194 pending_write_ = true;
195 platform_handles_written_ = platform_handles_written;
196 }
197
198 // |base::MessageLoopForIO::IOHandler| implementation:
199 // Must be called on the I/O thread. It could be called before or after
200 // detached from the owner.
201 void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
202 DWORD bytes_transferred,
203 DWORD error) override {
204 DCHECK(!owner_ ||
205 base::MessageLoop::current() == owner_->message_loop_for_io());
206
207 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case
208 // they result in a call to |Shutdown()|).
209 bool old_suppress_self_destruct = suppress_self_destruct_;
210 suppress_self_destruct_ = true;
211
212 if (context == &read_context_)
213 OnReadCompleted(bytes_transferred, error);
214 else if (context == &write_context_)
215 OnWriteCompleted(bytes_transferred, error);
216 else
217 NOTREACHED();
218
219 // Maybe allow self-destruction again.
220 suppress_self_destruct_ = old_suppress_self_destruct;
221
222 if (ShouldSelfDestruct())
223 delete this;
224 }
225
226 // Must be called on the I/O thread under |owner_->write_lock()|.
227 // After this call, the owner must not make any further calls on this
228 // object, and therefore the object is used on the I/O thread exclusively
229 // (if it stays alive).
230 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
231 scoped_ptr<WriteBuffer> write_buffer) {
232 DCHECK(owner_);
233 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
234 //owner_->write_lock().AssertAcquired();
235
236 // If read/write is pending, we have to retain the corresponding buffer.
237 if (pending_read_)
238 preserved_read_buffer_after_detach_ = read_buffer.Pass();
239 if (pending_write_)
240 preserved_write_buffer_after_detach_ = write_buffer.Pass();
241
242 owner_ = nullptr;
243 if (ShouldSelfDestruct())
244 delete this;
245 }
246
247 ScopedPlatformHandle ReleaseHandle(std::vector<char>* read_buffer) {
248 // TODO(jam): handle XP
249 CancelIoEx(handle(), NULL);
250 // NOTE: The above call will cancel pending IO calls.
251 size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes();
252
253 if (pending_read_) {
254 DWORD bytes_read_dword = 0;
255
256 DWORD old_bytes = read_context_.overlapped.InternalHigh;
257
258 // Since we cancelled pending IO calls above, we need to know if the
259 // read did succeed (i.e. it completed and there's a pending task posted
260 // to alert us) or if it was cancelled. This important because if the
261 // read completed, we don't want to serialize those bytes again.
262 //TODO(jam): for XP, can return TRUE here to wait. also below.
263 BOOL rv = GetOverlappedResult(
264 handle(), &read_context_.overlapped, &bytes_read_dword, FALSE);
265 DCHECK_EQ(old_bytes, bytes_read_dword);
266 if (rv) {
267 if (read_context_.overlapped.Internal != STATUS_CANCELLED) {
268 read_buffer_byte_size += read_context_.overlapped.InternalHigh;
269 }
270 }
271 pending_read_ = false;
272 }
273
274 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock();
275
276 if (pending_write_) {
277 DWORD bytes_written_dword = 0;
278 DWORD old_bytes = write_context_.overlapped.InternalHigh;
279
280 // See comment above.
281 BOOL rv = GetOverlappedResult(
282 handle(), &write_context_.overlapped, &bytes_written_dword, FALSE);
283
284 if (old_bytes != bytes_written_dword) {
285 NOTREACHED() << "TODO(jam): this shouldn't be reached";
286 }
287
288 if (rv) {
289 if (write_context_.overlapped.Internal != STATUS_CANCELLED) {
290 CHECK(write_buffer->queue_size() != 0);
291
292 // TODO(jam)
293 DCHECK(!write_buffer->HavePlatformHandlesToSend());
294
295 write_buffer->data_offset_ += bytes_written_dword;
296
297 // TODO(jam): copied from OnWriteCompletedNoLock
298 MessageInTransit* message =
299 write_buffer->message_queue()->PeekMessage();
300 if (write_buffer->data_offset_ >= message->total_size()) {
301 // Complete write.
302 CHECK_EQ(write_buffer->data_offset_, message->total_size());
303 write_buffer->message_queue_.DiscardMessage();
304 write_buffer->platform_handles_offset_ = 0;
305 write_buffer->data_offset_ = 0;
306 }
307
308
309 //TODO(jam): handle more write msgs
310 DCHECK(write_buffer->message_queue_.IsEmpty());
311 }
312 }
313 pending_write_ = false;
314 }
315
316 if (read_buffer_byte_size) {
317 read_buffer->resize(read_buffer_byte_size);
318 memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(),
319 read_buffer_byte_size);
320 owner_->read_buffer()->Reset();
321 }
322
323 return ScopedPlatformHandle(handle_.release());
324 }
325
326 void OnObjectSignaled(HANDLE object) override {
327 // Since this is called on the IO thread, no locks needed for owner_.
328 bool handle_is_valid = false;
329 if (owner_)
330 owner_->read_lock().Acquire();
331 handle_is_valid = handle_.is_valid();
332 if (owner_)
333 owner_->read_lock().Release();
334 if (!handle_is_valid) {
335 if (object == read_event)
336 pending_read_ = false;
337 else
338 pending_write_ = false;
339 if (ShouldSelfDestruct())
340 delete this;
341 return;
342 }
343
344 if (object == read_event) {
345 OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh,
346 read_context_.overlapped.Internal);
347
348 } else {
349 CHECK(object == write_event);
350 OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh,
351 write_context_.overlapped.Internal);
352 }
353 }
354 HANDLE read_event, write_event;
355 base::win::ObjectWatcher read_watcher_, write_watcher_;
356
357 private:
358 // Returns true if |owner_| has been reset and there is not pending read or
359 // write.
360 // Must be called on the I/O thread.
361 bool ShouldSelfDestruct() const {
362 if (owner_ || suppress_self_destruct_)
363 return false;
364
365 // Note: Detached, hence no lock needed for |pending_write_|.
366 return !pending_read_ && !pending_write_;
367 }
368
369 // Must be called on the I/O thread. It may be called before or after
370 // detaching from the owner.
371 void OnReadCompleted(DWORD bytes_read, DWORD error) {
372 DCHECK(!owner_ ||
373 base::MessageLoop::current() == owner_->message_loop_for_io());
374 DCHECK(suppress_self_destruct_);
375
376 if (g_use_autoreset_event && !pending_read_)
377 return;
378
379 CHECK(pending_read_);
380 pending_read_ = false;
381 if (!owner_)
382 return;
383
384 // Note: |OnReadCompleted()| may detach us from |owner_|.
385 if (error == ERROR_SUCCESS) {
386 DCHECK_GT(bytes_read, 0u);
387 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
388 } else if (error == ERROR_BROKEN_PIPE ||
389 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
390 DCHECK_EQ(bytes_read, 0u);
391 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0);
392 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
393 return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read);
394 } else {
395 DCHECK_EQ(bytes_read, 0u);
396 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
397 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0);
398 }
399 }
400
401 // Must be called on the I/O thread. It may be called before or after
402 // detaching from the owner.
403 void OnWriteCompleted(DWORD bytes_written, DWORD error) {
404 DCHECK(!owner_ ||
405 base::MessageLoop::current() == owner_->message_loop_for_io());
406 DCHECK(suppress_self_destruct_);
407
408 if (!owner_) {
409 // No lock needed.
410 CHECK(pending_write_);
411 pending_write_ = false;
412 return;
413 }
414
415 {
416 base::AutoLock locker(owner_->write_lock());
417 if (g_use_autoreset_event && !pending_write_)
418 return;
419
420 CHECK(pending_write_);
421 pending_write_ = false;
422 }
423
424 // Note: |OnWriteCompleted()| may detach us from |owner_|.
425 if (error == ERROR_SUCCESS) {
426 // Reset |platform_handles_written_| before calling |OnWriteCompleted()|
427 // since that function may call back to this class and set it again.
428 size_t local_platform_handles_written_ = platform_handles_written_;
429 platform_handles_written_ = 0;
430 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
431 bytes_written);
432 } else if (error == ERROR_BROKEN_PIPE ||
433 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
434 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0);
435 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) {
436 size_t local_platform_handles_written_ = platform_handles_written_;
437 platform_handles_written_ = 0;
438 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_,
439 bytes_written);
440 } else {
441 LOG(WARNING) << "WriteFile: "
442 << logging::SystemErrorCodeToString(error);
443 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0);
444 }
445 }
446
447 ScopedPlatformHandle handle_;
448
449 // |owner_| is reset on the I/O thread under |owner_->write_lock()|.
450 // Therefore, it may be used on any thread under lock; or on the I/O thread
451 // without locking.
452 RawChannelWin* owner_;
453
454 // The following members must be used on the I/O thread.
455 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
456 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
457 bool suppress_self_destruct_;
458
459 bool pending_read_;
460 base::MessageLoopForIO::IOContext read_context_;
461
462 // The following members must be used under |owner_->write_lock()| while the
463 // object is still attached to the owner, and only on the I/O thread
464 // afterwards.
465 bool pending_write_;
466 size_t platform_handles_written_;
467 base::MessageLoopForIO::IOContext write_context_;
468
469 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
470 };
471
472 ScopedPlatformHandle ReleaseHandleNoLock(
473 std::vector<char>* read_buffer_out) override {
474 std::vector<WriteBuffer::Buffer> buffers;
475 write_buffer_no_lock()->GetBuffers(&buffers);
476 if (!buffers.empty()) {
477 // TODO(jam): copy code in OnShutdownNoLock
478 NOTREACHED() << "releasing handle with pending write buffer";
479 }
480
481
482 if (handle_.is_valid()) {
483 // SetInitialBuffer could have been called on main thread before OnInit
484 // is called on Io thread. and in meantime releasehandle called.
485 //DCHECK(read_buffer()->num_valid_bytes() == 0);
486 if (read_buffer()->num_valid_bytes()) {
487 read_buffer_out->resize(read_buffer()->num_valid_bytes());
488 memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(),
489 read_buffer()->num_valid_bytes());
490 read_buffer()->Reset();
491 }
492 DCHECK(write_buffer_no_lock()->queue_size() == 0);
493 return ScopedPlatformHandle(PlatformHandle(handle_.release().handle));
494 }
495
496 return io_handler_->ReleaseHandle(read_buffer_out);
497 }
498 PlatformHandle HandleForDebuggingNoLock() override {
499 if (handle_.is_valid())
500 return handle_.get();
501
502 if (!io_handler_)
503 return PlatformHandle();
504
505 return PlatformHandle(io_handler_->handle());
506 }
507
508 IOResult Read(size_t* bytes_read) override {
509 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
510
511 char* buffer = nullptr;
512 size_t bytes_to_read = 0;
513 read_buffer()->GetBuffer(&buffer, &bytes_to_read);
514
515 DCHECK(io_handler_);
516 DCHECK(!io_handler_->pending_read());
517 BOOL result = ReadFile(
518 io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read),
519 nullptr, &io_handler_->read_context()->overlapped);
520 if (!result) {
521 DWORD error = GetLastError();
522 if (error == ERROR_BROKEN_PIPE)
523 return IO_FAILED_SHUTDOWN;
524 if (error != ERROR_IO_PENDING) {
525 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
526 return IO_FAILED_UNKNOWN;
527 }
528 }
529
530 if (result && skip_completion_port_on_success_) {
531 DWORD bytes_read_dword = 0;
532 BOOL get_size_result = GetOverlappedResult(
533 io_handler_->handle(), &io_handler_->read_context()->overlapped,
534 &bytes_read_dword, FALSE);
535 DPCHECK(get_size_result);
536 *bytes_read = bytes_read_dword;
537 return IO_SUCCEEDED;
538 }
539
540 if (!g_use_autoreset_event) {
541 if (!g_use_iocp) {
542 io_handler_->read_watcher_.StartWatchingOnce(
543 io_handler_->read_event, io_handler_);
544 }
545 }
546 // If the read is pending or the read has succeeded but we don't skip
547 // completion port on success, instruct |io_handler_| to wait for the
548 // completion packet.
549 //
550 // TODO(yzshen): It seems there isn't document saying that all error cases
551 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
552 // packet. If we do get one for errors,
553 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
554 // it.
555
556 io_handler_->OnPendingReadStarted();
557 return IO_PENDING;
558 }
559
560 IOResult ScheduleRead() override {
561 if (!io_handler_)
562 return IO_PENDING; // OnInit could have earlied out.
563
564 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
565 DCHECK(io_handler_);
566 DCHECK(!io_handler_->pending_read());
567
568 size_t bytes_read = 0;
569 IOResult io_result = Read(&bytes_read);
570 if (io_result == IO_SUCCEEDED) {
571 DCHECK(skip_completion_port_on_success_);
572
573 // We have finished reading successfully. Queue a notification manually.
574 io_handler_->OnPendingReadStarted();
575 // |io_handler_| won't go away before the task is run, so it is safe to
576 // use |base::Unretained()|.
577 message_loop_for_io()->PostTask(
578 FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted,
579 base::Unretained(io_handler_),
580 base::Unretained(io_handler_->read_context()),
581 static_cast<DWORD>(bytes_read), ERROR_SUCCESS));
582 return IO_PENDING;
583 }
584
585 return io_result;
586 }
587 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
588 size_t num_platform_handles,
589 const void* platform_handle_table) override {
590 // TODO(jam): this code will have to be updated once it's used in a sandbox
591 // and the receiving process doesn't have duplicate permission for the
592 // receiver. Once there's a broker and we have a connection to it (possibly
593 // through ConnectionManager), then we can make a sync IPC to it here to get
594 // a token for this handle, and it will duplicate the handle to is process.
595 // Then we pass the token to the receiver, which will then make a sync call
596 // to the broker to get a duplicated handle. This will also allow us to
597 // avoid leaks of the handle if the receiver dies, since the broker can
598 // notice that.
599 DCHECK_GT(num_platform_handles, 0u);
600 ScopedPlatformHandleVectorPtr rv(new PlatformHandleVector());
601
602 const SerializedHandle* serialization_data =
603 static_cast<const SerializedHandle*>(platform_handle_table);
604 for (size_t i = 0; i < num_platform_handles; i++) {
605 DWORD pid = serialization_data->handle_pid;
606 HANDLE source_handle = serialization_data->handle;
607 serialization_data ++;
608 base::Process sender =
609 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE);
610 DCHECK(sender.IsValid());
611 HANDLE target_handle = NULL;
612 BOOL dup_result = DuplicateHandle(
613 sender.Handle(), source_handle,
614 base::GetCurrentProcessHandle(), &target_handle, 0,
615 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE);
616 DCHECK(dup_result);
617 rv->push_back(PlatformHandle(target_handle));
618 }
619 return rv.Pass();
620 }
621
622 IOResult WriteNoLock(size_t* platform_handles_written,
623 size_t* bytes_written) override {
624 write_lock().AssertAcquired();
625
626 DCHECK(io_handler_);
627 DCHECK(!io_handler_->pending_write_no_lock());
628
629 size_t num_platform_handles = 0;
630 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) {
631 // Since we're not sure which process might ultimately deserialize this
632 // message, we can't duplicate the handle now. Instead, write the process
633 // ID and handle now and let the receiver duplicate it.
634 PlatformHandle* platform_handles;
635 void* serialization_data_temp;
636 write_buffer_no_lock()->GetPlatformHandlesToSend(
637 &num_platform_handles, &platform_handles, &serialization_data_temp);
638 SerializedHandle* serialization_data =
639 static_cast<SerializedHandle*>(serialization_data_temp);
640 DCHECK_GT(num_platform_handles, 0u);
641 DCHECK(platform_handles);
642
643 DWORD current_process_id = base::GetCurrentProcId();
644 for (size_t i = 0; i < num_platform_handles; i++) {
645 serialization_data->handle_pid = current_process_id;
646 serialization_data->handle = platform_handles[i].handle;
647 serialization_data++;
648 platform_handles[i] = PlatformHandle();
649 }
650 }
651
652 std::vector<WriteBuffer::Buffer> buffers;
653 write_buffer_no_lock()->GetBuffers(&buffers);
654 DCHECK(!buffers.empty());
655
656 // TODO(yzshen): Handle multi-segment writes more efficiently.
657 DWORD bytes_written_dword = 0;
658
659
660 /* jam: comment below is commented out because with the latest code I don't
661 see this. Note that this code is incorrect for two reasons:
662 1) the buffer would need to be saved until the write is finished
663 2) this still doesn't fix the problem of there being more data or messages
664 to be sent. the proper fix is to serialize pending messages to shared
665 memory.
666
667 // TODO(jam): right now we get in bad situation where we might first write
668 // the main buffer and then the MP gets sent before we write the transport
669 // buffer. We can fix this by sending information about partially written
670 // messages, or by teaching transport buffer how to grow the main buffer and
671 // write its data there.
672 // Until that's done, for now make another copy.
673
674 size_t total_size = buffers[0].size;
675 if (buffers.size() > 1)
676 total_size+=buffers[1].size;
677 char* buf = new char[total_size];
678 memcpy(buf, buffers[0].addr, buffers[0].size);
679 if (buffers.size() > 1)
680 memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size);
681
682 BOOL result = WriteFile(
683 io_handler_->handle(), buf,
684 static_cast<DWORD>(total_size),
685 &bytes_written_dword,
686 &io_handler_->write_context_no_lock()->overlapped);
687 delete [] buf;
688
689 if (!result) {
690 DWORD error = GetLastError();
691 if (error == ERROR_BROKEN_PIPE)
692 return IO_FAILED_SHUTDOWN;
693 if (error != ERROR_IO_PENDING) {
694 LOG(WARNING) << "WriteFile: "
695 << logging::SystemErrorCodeToString(error);
696 return IO_FAILED_UNKNOWN;
697 }
698 }
699 */
700
701 BOOL result =
702 WriteFile(io_handler_->handle(), buffers[0].addr,
703 static_cast<DWORD>(buffers[0].size), &bytes_written_dword,
704 &io_handler_->write_context_no_lock()->overlapped);
705 if (!result) {
706 DWORD error = GetLastError();
707 if (error == ERROR_BROKEN_PIPE)
708 return IO_FAILED_SHUTDOWN;
709 if (error != ERROR_IO_PENDING) {
710 LOG(WARNING) << "WriteFile: " << logging::SystemErrorCodeToString(error) ;
711 return IO_FAILED_UNKNOWN;
712 }
713 }
714
715 if (result && skip_completion_port_on_success_) {
716 *platform_handles_written = num_platform_handles;
717 *bytes_written = bytes_written_dword;
718 return IO_SUCCEEDED;
719 }
720
721 if (!g_use_autoreset_event) {
722 if (!g_use_iocp) {
723 io_handler_->write_watcher_.StartWatchingOnce(
724 io_handler_->write_event, io_handler_);
725 }
726 }
727 // If the write is pending or the write has succeeded but we don't skip
728 // completion port on success, instruct |io_handler_| to wait for the
729 // completion packet.
730 //
731 // TODO(yzshen): it seems there isn't document saying that all error cases
732 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
733 // packet. If we do get one for errors,
734 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about
735 // it.
736
737 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles);
738 return IO_PENDING;
739 }
740
741 IOResult ScheduleWriteNoLock() override {
742 write_lock().AssertAcquired();
743
744 DCHECK(io_handler_);
745 DCHECK(!io_handler_->pending_write_no_lock());
746
747 size_t platform_handles_written = 0;
748 size_t bytes_written = 0;
749 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written);
750 if (io_result == IO_SUCCEEDED) {
751 DCHECK(skip_completion_port_on_success_);
752
753 // We have finished writing successfully. Queue a notification manually.
754 io_handler_->OnPendingWriteStartedNoLock(platform_handles_written);
755 // |io_handler_| won't go away before that task is run, so it is safe to
756 // use |base::Unretained()|.
757 message_loop_for_io()->PostTask(
758 FROM_HERE,
759 base::Bind(&RawChannelIOHandler::OnIOCompleted,
760 base::Unretained(io_handler_),
761 base::Unretained(io_handler_->write_context_no_lock()),
762 static_cast<DWORD>(bytes_written), ERROR_SUCCESS));
763 return IO_PENDING;
764 }
765
766 return io_result;
767 }
768
769 void OnInit() override {
770 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
771
772 if (!handle_.is_valid()) {
773 LOG(ERROR) << "Note: RawChannelWin " << this
774 << " early exiting in OnInit because no handle";
775 return;
776 }
777
778 DCHECK(handle_.is_valid());
779 if (skip_completion_port_on_success_) {
780 // I don't know how this can fail (unless |handle_| is bad, in which case
781 // it's a bug in our code).
782 CHECK(g_vista_or_higher_functions.Get().
783 SetFileCompletionNotificationModes(
784 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS));
785 }
786
787 DCHECK(!io_handler_);
788 io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
789 }
790
791 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
792 scoped_ptr<WriteBuffer> write_buffer) override {
793 // happens on shutdown if didn't call init when doing createduplicate
794 if (message_loop_for_io()) {
795 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
796 }
797
798 if (!io_handler_) {
799 // This is hit when creating a duplicate dispatcher since we don't call
800 // Init on it.
801 DCHECK_EQ(read_buffer->num_valid_bytes(), 0U);
802 DCHECK_EQ(write_buffer->queue_size(), 0U);
803 return;
804 }
805
806 if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) {
807 // |io_handler_| will be alive until pending read/write (if any)
808 // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be
809 // freed up as soon as possible.
810 // Note: |CancelIo()| only cancels read/write requests started from this
811 // thread.
812 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) {
813 g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(),
814 nullptr);
815 } else {
816 CancelIo(io_handler_->handle());
817 }
818 }
819
820 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
821 io_handler_ = nullptr;
822 }
823
824 // Passed to |io_handler_| during initialization.
825 ScopedPlatformHandle handle_;
826
827 RawChannelIOHandler* io_handler_;
828
829 const bool skip_completion_port_on_success_;
830
831 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
832 };
833
834
835 } // namespace
836
837 // -----------------------------------------------------------------------------
838
839 RawChannel* RawChannel::Create(ScopedPlatformHandle handle) {
840 return new RawChannelWin(handle.Pass());
841 }
842
843 size_t RawChannel::GetSerializedPlatformHandleSize() {
844 return sizeof(SerializedHandle);
845 }
846
847 bool RawChannel::IsOtherEndOf(RawChannel* other) {
848 PlatformHandle this_handle = HandleForDebuggingNoLock();
849 PlatformHandle other_handle = other->HandleForDebuggingNoLock();
850
851 // TODO: XP: see http://stackoverflow.com/questions/65170/how-to-get-name-asso ciated-with-open-handle/5286888#5286888
852 WCHAR data1[_MAX_PATH + sizeof(FILE_NAME_INFO)];
853 WCHAR data2[_MAX_PATH + sizeof(FILE_NAME_INFO)];
854 FILE_NAME_INFO* fileinfo1 = reinterpret_cast<FILE_NAME_INFO *>(data1);
855 FILE_NAME_INFO* fileinfo2 = reinterpret_cast<FILE_NAME_INFO *>(data2);
856 CHECK(GetFileInformationByHandleEx(
857 this_handle.handle, FileNameInfo, fileinfo1, arraysize(data1)));
858 CHECK(GetFileInformationByHandleEx(
859 other_handle.handle, FileNameInfo, fileinfo2, arraysize(data2)));
860 std::wstring filepath1(fileinfo1->FileName, fileinfo1->FileNameLength / 2);
861 std::wstring filepath2(fileinfo2->FileName, fileinfo2->FileNameLength / 2);
862 return filepath1 == filepath2;
863 }
864
865 } // namespace edk
866 } // namespace mojo
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698