OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/raw_channel.h" | |
6 | |
7 #include <windows.h> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/lazy_instance.h" | |
11 #include "base/location.h" | |
12 #include "base/logging.h" | |
13 #include "base/memory/scoped_ptr.h" | |
14 #include "base/message_loop/message_loop.h" | |
15 #include "base/process/process.h" | |
16 #include "base/synchronization/lock.h" | |
17 #include "base/win/object_watcher.h" | |
18 #include "base/win/windows_version.h" | |
19 #include "mojo/edk/embedder/platform_handle.h" | |
20 #include "mojo/public/cpp/system/macros.h" | |
21 | |
22 #define STATUS_CANCELLED 0xC0000120 | |
23 #define STATUS_PIPE_BROKEN 0xC000014B | |
24 | |
25 // We can't use IO completion ports if we send a message pipe. The reason is | |
26 // that the only way to stop an existing IOCP is by closing the pipe handle. | |
27 // See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.8 5).aspx | |
28 bool g_use_iocp = false; | |
29 | |
30 // Manual reset per | |
31 // Doc for overlapped I/O says use manual per | |
32 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85). aspx | |
33 // However using an auto-reset event makes the perf test 5x faster and also | |
34 // works since we don't wait on the event elsewhere or call GetOverlappedResult | |
35 // before it fires. | |
36 bool g_use_autoreset_event = true; | |
37 | |
38 namespace mojo { | |
39 namespace system { | |
40 | |
41 namespace { | |
42 | |
43 class VistaOrHigherFunctions { | |
44 public: | |
45 VistaOrHigherFunctions() | |
46 : is_vista_or_higher_( | |
47 base::win::GetVersion() >= base::win::VERSION_VISTA), | |
48 set_file_completion_notification_modes_(nullptr), | |
49 cancel_io_ex_(nullptr) { | |
50 if (!is_vista_or_higher_) | |
51 return; | |
52 | |
53 HMODULE module = GetModuleHandleW(L"kernel32.dll"); | |
54 set_file_completion_notification_modes_ = | |
55 reinterpret_cast<SetFileCompletionNotificationModesFunc>( | |
56 GetProcAddress(module, "SetFileCompletionNotificationModes")); | |
57 DCHECK(set_file_completion_notification_modes_); | |
58 | |
59 cancel_io_ex_ = | |
60 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx")); | |
61 DCHECK(cancel_io_ex_); | |
62 } | |
63 | |
64 bool is_vista_or_higher() const { return is_vista_or_higher_; } | |
65 | |
66 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) { | |
67 return set_file_completion_notification_modes_(handle, flags); | |
68 } | |
69 | |
70 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) { | |
71 return cancel_io_ex_(handle, overlapped); | |
72 } | |
73 | |
74 private: | |
75 using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR); | |
76 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED); | |
77 | |
78 bool is_vista_or_higher_; | |
79 SetFileCompletionNotificationModesFunc | |
80 set_file_completion_notification_modes_; | |
81 CancelIoExFunc cancel_io_ex_; | |
82 }; | |
83 | |
84 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions = | |
85 LAZY_INSTANCE_INITIALIZER; | |
86 | |
87 class RawChannelWin final : public RawChannel { | |
88 public: | |
89 RawChannelWin(embedder::ScopedPlatformHandle handle) | |
90 : handle_(handle.Pass()), | |
91 io_handler_(nullptr), | |
92 skip_completion_port_on_success_( | |
93 g_use_iocp && | |
94 g_vista_or_higher_functions.Get().is_vista_or_higher()) { | |
95 DCHECK(handle_.is_valid()); | |
96 } | |
97 ~RawChannelWin() override { | |
98 DCHECK(!io_handler_); | |
99 } | |
100 | |
101 private: | |
102 // RawChannelIOHandler receives OS notifications for I/O completion. It must | |
103 // be created on the I/O thread. | |
104 // | |
105 // It manages its own destruction. Destruction happens on the I/O thread when | |
106 // all the following conditions are satisfied: | |
107 // - |DetachFromOwnerNoLock()| has been called; | |
108 // - there is no pending read; | |
109 // - there is no pending write. | |
110 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler, | |
111 public base::win::ObjectWatcher::Delegate { | |
112 public: | |
113 RawChannelIOHandler(RawChannelWin* owner, | |
114 embedder::ScopedPlatformHandle handle) | |
115 : handle_(handle.Pass()), | |
116 owner_(owner), | |
117 suppress_self_destruct_(false), | |
118 pending_read_(false), | |
119 pending_write_(false), | |
120 platform_handles_written_(0) { | |
121 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); | |
122 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); | |
123 if (g_use_iocp) { | |
124 owner_->message_loop_for_io()->RegisterIOHandler( | |
125 handle_.get().handle, this); | |
126 read_context_.handler = this; | |
127 write_context_.handler = this; | |
128 } else { | |
129 read_event = CreateEvent( | |
130 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL); | |
131 write_event = CreateEvent( | |
132 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL); | |
133 read_context_.overlapped.hEvent = read_event; | |
134 write_context_.overlapped.hEvent = write_event; | |
135 | |
136 | |
137 if (g_use_autoreset_event) { | |
138 read_watcher_.StartWatching(read_event, this, true); | |
139 write_watcher_.StartWatching(write_event, this, true); | |
140 } | |
141 } | |
142 } | |
143 | |
144 ~RawChannelIOHandler() override { | |
145 DCHECK(ShouldSelfDestruct()); | |
146 } | |
147 | |
148 HANDLE handle() const { return handle_.get().handle; } | |
149 | |
150 // The following methods are only called by the owner on the I/O thread. | |
151 bool pending_read() const { | |
152 DCHECK(owner_); | |
153 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
154 return pending_read_; | |
155 } | |
156 | |
157 base::MessageLoopForIO::IOContext* read_context() { | |
158 DCHECK(owner_); | |
159 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
160 return &read_context_; | |
161 } | |
162 | |
163 // Instructs the object to wait for an |OnIOCompleted()| notification. | |
164 void OnPendingReadStarted() { | |
165 DCHECK(owner_); | |
166 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
167 DCHECK(!pending_read_); | |
168 pending_read_ = true; | |
169 } | |
170 | |
171 // The following methods are only called by the owner under | |
172 // |owner_->write_lock()|. | |
173 bool pending_write_no_lock() const { | |
174 DCHECK(owner_); | |
175 owner_->write_lock().AssertAcquired(); | |
176 return pending_write_; | |
177 } | |
178 | |
179 base::MessageLoopForIO::IOContext* write_context_no_lock() { | |
180 DCHECK(owner_); | |
181 owner_->write_lock().AssertAcquired(); | |
182 return &write_context_; | |
183 } | |
184 // Instructs the object to wait for an |OnIOCompleted()| notification. | |
185 void OnPendingWriteStartedNoLock(size_t platform_handles_written) { | |
186 DCHECK(owner_); | |
187 owner_->write_lock().AssertAcquired(); | |
188 DCHECK(!pending_write_); | |
189 pending_write_ = true; | |
190 platform_handles_written_ = platform_handles_written; | |
191 } | |
192 | |
193 // |base::MessageLoopForIO::IOHandler| implementation: | |
194 // Must be called on the I/O thread. It could be called before or after | |
195 // detached from the owner. | |
196 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, | |
197 DWORD bytes_transferred, | |
198 DWORD error) override { | |
199 DCHECK(!owner_ || | |
200 base::MessageLoop::current() == owner_->message_loop_for_io()); | |
201 | |
202 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case | |
203 // they result in a call to |Shutdown()|). | |
204 bool old_suppress_self_destruct = suppress_self_destruct_; | |
205 suppress_self_destruct_ = true; | |
206 | |
207 if (context == &read_context_) | |
208 OnReadCompleted(bytes_transferred, error); | |
209 else if (context == &write_context_) | |
210 OnWriteCompleted(bytes_transferred, error); | |
211 else | |
212 NOTREACHED(); | |
213 | |
214 // Maybe allow self-destruction again. | |
215 suppress_self_destruct_ = old_suppress_self_destruct; | |
216 | |
217 if (ShouldSelfDestruct()) | |
218 delete this; | |
219 } | |
220 | |
221 // Must be called on the I/O thread under |owner_->write_lock()|. | |
222 // After this call, the owner must not make any further calls on this | |
223 // object, and therefore the object is used on the I/O thread exclusively | |
224 // (if it stays alive). | |
225 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer, | |
226 scoped_ptr<WriteBuffer> write_buffer) { | |
227 DCHECK(owner_); | |
228 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
229 //owner_->write_lock().AssertAcquired(); | |
230 | |
231 // If read/write is pending, we have to retain the corresponding buffer. | |
232 if (pending_read_) | |
233 preserved_read_buffer_after_detach_ = read_buffer.Pass(); | |
234 if (pending_write_) | |
235 preserved_write_buffer_after_detach_ = write_buffer.Pass(); | |
236 | |
237 owner_ = nullptr; | |
238 if (ShouldSelfDestruct()) | |
239 delete this; | |
240 } | |
241 | |
242 embedder::ScopedPlatformHandle ReleaseHandle( | |
243 std::vector<char>* read_buffer) { | |
244 // TODO(jam): handle XP | |
245 CancelIoEx(handle(), NULL); | |
246 // NOTE!!!! | |
247 // The above call will cancel pending IO calls. | |
248 // HOWEVER, some could have already finished and posted task to IO thread | |
249 // that will execute | |
250 | |
251 | |
252 size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes(); | |
253 | |
254 if (pending_read_) { | |
255 DWORD bytes_read_dword = 0; | |
256 | |
257 DWORD old_bytes = read_context_.overlapped.InternalHigh; | |
258 | |
259 //TODO(jam): for XP, can return TRUE here to wait. also below. | |
260 BOOL rv = GetOverlappedResult( | |
yzshen1
2015/09/23 22:47:09
Please add a comment about why the if (pending_rea
| |
261 handle(), &read_context_.overlapped, &bytes_read_dword, FALSE); | |
262 DCHECK_EQ(old_bytes, bytes_read_dword); | |
263 if (rv) { | |
264 if (read_context_.overlapped.Internal != STATUS_CANCELLED) { | |
265 read_buffer_byte_size += read_context_.overlapped.InternalHigh; | |
266 } | |
267 } | |
268 pending_read_ = false; | |
269 } | |
270 | |
271 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock(); | |
272 | |
273 if (pending_write_) { | |
274 DWORD bytes_written_dword = 0; | |
275 DWORD old_bytes = write_context_.overlapped.InternalHigh; | |
276 | |
277 | |
278 BOOL rv = GetOverlappedResult( | |
279 handle(), &write_context_.overlapped, &bytes_written_dword, FALSE); | |
280 | |
281 if (old_bytes != bytes_written_dword) { | |
282 NOTREACHED(); | |
283 } | |
284 | |
285 if (rv) { | |
286 if (write_context_.overlapped.Internal != STATUS_CANCELLED) { | |
287 CHECK(write_buffer->queue_size() != 0); | |
288 | |
289 // TODO(jam) | |
290 DCHECK(!write_buffer->HavePlatformHandlesToSend()); | |
291 | |
292 write_buffer->data_offset_ += bytes_written_dword; | |
293 | |
294 // TODO(jam): copied from OnWriteCompletedNoLock | |
295 MessageInTransit* message = | |
296 write_buffer->message_queue()->PeekMessage(); | |
297 if (write_buffer->data_offset_ >= message->total_size()) { | |
298 // Complete write. | |
299 CHECK_EQ(write_buffer->data_offset_, message->total_size()); | |
300 write_buffer->message_queue_.DiscardMessage(); | |
301 write_buffer->platform_handles_offset_ = 0; | |
302 write_buffer->data_offset_ = 0; | |
303 } | |
304 | |
305 | |
306 //TODO(jam): handle more write msgs | |
307 DCHECK(write_buffer->message_queue_.IsEmpty()); | |
308 } | |
309 } | |
310 pending_write_ = false; | |
311 } | |
312 | |
313 if (read_buffer_byte_size) { | |
314 read_buffer->resize(read_buffer_byte_size); | |
315 memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(), | |
316 read_buffer_byte_size); | |
317 owner_->read_buffer()->Reset(); | |
318 } | |
319 | |
320 return embedder::ScopedPlatformHandle(handle_.release()); | |
321 } | |
322 | |
323 void OnObjectSignaled(HANDLE object) override { | |
324 | |
325 // Since this is called on the IO thread, no locks needed for owner_. | |
326 bool handle_is_valid = false; | |
327 if (owner_) | |
328 owner_->read_lock().Acquire(); | |
329 handle_is_valid = handle_.is_valid(); | |
330 if (owner_) | |
331 owner_->read_lock().Release(); | |
332 if (!handle_is_valid) { | |
333 if (object == read_event) | |
334 pending_read_ = false; | |
335 else | |
336 pending_write_ = false; | |
337 if (ShouldSelfDestruct()) | |
338 delete this; | |
339 return; | |
340 } | |
341 | |
342 if (object == read_event) { | |
343 OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh, | |
344 read_context_.overlapped.Internal); | |
345 | |
346 } else { | |
347 CHECK(object == write_event); | |
348 OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh, | |
349 write_context_.overlapped.Internal); | |
350 } | |
351 } | |
352 HANDLE read_event, write_event; | |
353 base::win::ObjectWatcher read_watcher_, write_watcher_; | |
354 | |
355 private: | |
356 // Returns true if |owner_| has been reset and there is not pending read or | |
357 // write. | |
358 // Must be called on the I/O thread. | |
359 bool ShouldSelfDestruct() const { | |
360 if (owner_ || suppress_self_destruct_) | |
361 return false; | |
362 | |
363 // Note: Detached, hence no lock needed for |pending_write_|. | |
364 return !pending_read_ && !pending_write_; | |
365 } | |
366 | |
367 // Must be called on the I/O thread. It may be called before or after | |
368 // detaching from the owner. | |
369 void OnReadCompleted(DWORD bytes_read, DWORD error) { | |
370 DCHECK(!owner_ || | |
371 base::MessageLoop::current() == owner_->message_loop_for_io()); | |
372 DCHECK(suppress_self_destruct_); | |
373 | |
374 if (g_use_autoreset_event && !pending_read_) | |
375 return; | |
376 | |
377 CHECK(pending_read_); | |
378 pending_read_ = false; | |
379 if (!owner_) | |
380 return; | |
381 | |
382 // Note: |OnReadCompleted()| may detach us from |owner_|. | |
383 if (error == ERROR_SUCCESS) { | |
384 DCHECK_GT(bytes_read, 0u); | |
385 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); | |
386 } else if (error == ERROR_BROKEN_PIPE || | |
387 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) { | |
388 DCHECK_EQ(bytes_read, 0u); | |
389 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0); | |
390 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) { | |
391 return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); | |
392 } else { | |
393 DCHECK_EQ(bytes_read, 0u); | |
394 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); | |
395 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0); | |
396 } | |
397 } | |
398 | |
399 // Must be called on the I/O thread. It may be called before or after | |
400 // detaching from the owner. | |
401 void OnWriteCompleted(DWORD bytes_written, DWORD error) { | |
402 DCHECK(!owner_ || | |
403 base::MessageLoop::current() == owner_->message_loop_for_io()); | |
404 DCHECK(suppress_self_destruct_); | |
405 | |
406 if (!owner_) { | |
407 // No lock needed. | |
408 CHECK(pending_write_); | |
409 pending_write_ = false; | |
410 return; | |
411 } | |
412 | |
413 { | |
414 base::AutoLock locker(owner_->write_lock()); | |
415 if (g_use_autoreset_event && !pending_write_) | |
416 return; | |
417 | |
418 CHECK(pending_write_); | |
419 pending_write_ = false; | |
420 } | |
421 | |
422 // Note: |OnWriteCompleted()| may detach us from |owner_|. | |
423 if (error == ERROR_SUCCESS) { | |
424 // Reset |platform_handles_written_| before calling |OnWriteCompleted()| | |
425 // since that function may call back to this class and set it again. | |
426 size_t local_platform_handles_written_ = platform_handles_written_; | |
427 platform_handles_written_ = 0; | |
428 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_, | |
429 bytes_written); | |
430 } else if (error == ERROR_BROKEN_PIPE || | |
431 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) { | |
432 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0); | |
433 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) { | |
434 size_t local_platform_handles_written_ = platform_handles_written_; | |
435 platform_handles_written_ = 0; | |
436 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_, | |
437 bytes_written); | |
438 } else { | |
439 LOG(WARNING) << "WriteFile: " | |
440 << logging::SystemErrorCodeToString(error); | |
441 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); | |
442 } | |
443 } | |
444 | |
445 embedder::ScopedPlatformHandle handle_; | |
446 | |
447 // |owner_| is reset on the I/O thread under |owner_->write_lock()|. | |
448 // Therefore, it may be used on any thread under lock; or on the I/O thread | |
449 // without locking. | |
450 RawChannelWin* owner_; | |
451 | |
452 // The following members must be used on the I/O thread. | |
453 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_; | |
454 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_; | |
455 bool suppress_self_destruct_; | |
456 | |
457 bool pending_read_; | |
458 base::MessageLoopForIO::IOContext read_context_; | |
459 | |
460 // The following members must be used under |owner_->write_lock()| while the | |
461 // object is still attached to the owner, and only on the I/O thread | |
462 // afterwards. | |
463 bool pending_write_; | |
464 size_t platform_handles_written_; | |
465 base::MessageLoopForIO::IOContext write_context_; | |
466 | |
467 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler); | |
468 }; | |
469 | |
470 embedder::ScopedPlatformHandle ReleaseHandleNoLock( | |
471 std::vector<char>* read_buffer_out) override { | |
472 std::vector<WriteBuffer::Buffer> buffers; | |
473 write_buffer_no_lock()->GetBuffers(&buffers); | |
474 if (!buffers.empty()) { | |
475 // TODO(jam): copy code in OnShutdownNoLock | |
476 NOTREACHED() << "releasing handle with pending write buffer"; | |
477 } | |
478 | |
479 | |
480 if( handle_.is_valid()) { | |
481 // SetInitialBuffer could have been called on main thread before OnInit | |
482 // is called on Io thread. and in meantime releasehandle called. | |
483 //DCHECK(read_buffer()->num_valid_bytes() == 0); | |
484 if (read_buffer()->num_valid_bytes()) { | |
485 read_buffer_out->resize(read_buffer()->num_valid_bytes()); | |
486 memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(), | |
487 read_buffer()->num_valid_bytes()); | |
488 read_buffer()->Reset(); | |
yzshen1
2015/09/23 22:47:09
Before we cancel IO (which happens in io_handler_-
| |
489 } | |
490 DCHECK(write_buffer_no_lock()->queue_size() == 0); | |
491 return embedder::ScopedPlatformHandle( | |
492 embedder::PlatformHandle(handle_.release().handle)); | |
493 } | |
494 | |
495 return io_handler_->ReleaseHandle(read_buffer_out); | |
496 } | |
497 embedder::PlatformHandle HandleForDebuggingNoLock() override { | |
498 if (handle_.is_valid()) | |
499 return handle_.get(); | |
500 | |
501 if (!io_handler_) | |
502 return embedder::PlatformHandle(); | |
503 | |
504 return embedder::PlatformHandle(io_handler_->handle()); | |
505 } | |
506 | |
507 IOResult Read(size_t* bytes_read) override { | |
508 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
509 | |
510 char* buffer = nullptr; | |
511 size_t bytes_to_read = 0; | |
512 read_buffer()->GetBuffer(&buffer, &bytes_to_read); | |
513 | |
514 DCHECK(io_handler_); | |
515 DCHECK(!io_handler_->pending_read()); | |
516 BOOL result = ReadFile( | |
517 io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read), | |
518 nullptr, &io_handler_->read_context()->overlapped); | |
519 if (!result) { | |
520 DWORD error = GetLastError(); | |
521 if (error == ERROR_BROKEN_PIPE) | |
522 return IO_FAILED_SHUTDOWN; | |
523 if (error != ERROR_IO_PENDING) { | |
524 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); | |
525 return IO_FAILED_UNKNOWN; | |
526 } | |
527 } | |
528 | |
529 if (result && skip_completion_port_on_success_) { | |
530 DWORD bytes_read_dword = 0; | |
531 BOOL get_size_result = GetOverlappedResult( | |
532 io_handler_->handle(), &io_handler_->read_context()->overlapped, | |
533 &bytes_read_dword, FALSE); | |
534 DPCHECK(get_size_result); | |
535 *bytes_read = bytes_read_dword; | |
536 return IO_SUCCEEDED; | |
537 } | |
538 | |
539 if (!g_use_autoreset_event) { | |
540 if (!g_use_iocp) { | |
541 io_handler_->read_watcher_.StartWatching( | |
542 io_handler_->read_event, io_handler_, false); | |
543 } | |
544 } | |
545 // If the read is pending or the read has succeeded but we don't skip | |
546 // completion port on success, instruct |io_handler_| to wait for the | |
547 // completion packet. | |
548 // | |
549 // TODO(yzshen): It seems there isn't document saying that all error cases | |
550 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion | |
551 // packet. If we do get one for errors, | |
552 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about | |
553 // it. | |
554 | |
555 io_handler_->OnPendingReadStarted(); | |
556 return IO_PENDING; | |
557 } | |
558 | |
559 IOResult ScheduleRead() override { | |
560 if (!io_handler_) | |
561 return IO_PENDING; // OnInit could have earlied out. | |
562 | |
563 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
564 DCHECK(io_handler_); | |
565 DCHECK(!io_handler_->pending_read()); | |
566 | |
567 size_t bytes_read = 0; | |
568 IOResult io_result = Read(&bytes_read); | |
569 if (io_result == IO_SUCCEEDED) { | |
570 DCHECK(skip_completion_port_on_success_); | |
571 | |
572 // We have finished reading successfully. Queue a notification manually. | |
573 io_handler_->OnPendingReadStarted(); | |
574 // |io_handler_| won't go away before the task is run, so it is safe to | |
575 // use |base::Unretained()|. | |
576 message_loop_for_io()->PostTask( | |
577 FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted, | |
578 base::Unretained(io_handler_), | |
579 base::Unretained(io_handler_->read_context()), | |
580 static_cast<DWORD>(bytes_read), ERROR_SUCCESS)); | |
581 return IO_PENDING; | |
582 } | |
583 | |
584 return io_result; | |
585 } | |
586 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | |
587 size_t num_platform_handles, | |
588 const void* platform_handle_table) override { | |
589 // TODO(jam): this code will have to be updated once it's used in a sandbox | |
590 // and the receiving process doesn't have duplicate permission for the | |
591 // receiver. Once there's a broker and we have a connection to it (possibly | |
592 // through ConnectionManager), then we can make a sync IPC to it here to get | |
593 // a token for this handle, and it will duplicate the handle to is process. | |
594 // Then we pass the token to the receiver, which will then make a sync call | |
595 // to the broker to get a duplicated handle. This will also allow us to | |
596 // avoid leaks of the handle if the receiver dies, since the broker can | |
597 // notice that. | |
598 DCHECK_GT(num_platform_handles, 0u); | |
599 embedder::ScopedPlatformHandleVectorPtr rv( | |
600 new embedder::PlatformHandleVector()); | |
601 | |
602 const char* serialization_data = | |
603 static_cast<const char*>(platform_handle_table); | |
604 for (size_t i = 0; i < num_platform_handles; i++) { | |
605 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data); | |
606 serialization_data += sizeof(DWORD); | |
607 HANDLE source_handle = | |
608 *reinterpret_cast<const HANDLE*>(serialization_data); | |
609 serialization_data += sizeof(HANDLE); | |
610 base::Process sender = | |
611 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE); | |
612 DCHECK(sender.IsValid()); | |
613 HANDLE target_handle = NULL; | |
614 BOOL dup_result = DuplicateHandle( | |
615 sender.Handle(), source_handle, | |
616 base::GetCurrentProcessHandle(), &target_handle, 0, | |
617 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); | |
618 DCHECK(dup_result); | |
619 rv->push_back(embedder::PlatformHandle(target_handle)); | |
620 } | |
621 return rv.Pass(); | |
622 } | |
623 | |
624 IOResult WriteNoLock(size_t* platform_handles_written, | |
625 size_t* bytes_written) override { | |
626 write_lock().AssertAcquired(); | |
627 | |
628 DCHECK(io_handler_); | |
629 DCHECK(!io_handler_->pending_write_no_lock()); | |
630 | |
631 size_t num_platform_handles = 0; | |
632 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { | |
633 // Since we're not sure which process might ultimately deserialize this | |
634 // message, we can't duplicate the handle now. Instead, write the process | |
635 // ID and handle now and let the receiver duplicate it. | |
636 embedder::PlatformHandle* platform_handles; | |
637 void* serialization_data_temp; | |
638 write_buffer_no_lock()->GetPlatformHandlesToSend( | |
639 &num_platform_handles, &platform_handles, &serialization_data_temp); | |
640 char* serialization_data = static_cast<char*>(serialization_data_temp); | |
641 DCHECK_GT(num_platform_handles, 0u); | |
642 DCHECK(platform_handles); | |
643 | |
644 DWORD current_process_id = base::GetCurrentProcId(); | |
645 for (size_t i = 0; i < num_platform_handles; i++) { | |
646 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id; | |
647 serialization_data += sizeof(DWORD); | |
648 *reinterpret_cast<HANDLE*>(serialization_data) = | |
649 platform_handles[i].handle; | |
650 serialization_data += sizeof(HANDLE); | |
651 platform_handles[i] = embedder::PlatformHandle(); | |
652 } | |
653 } | |
654 | |
655 std::vector<WriteBuffer::Buffer> buffers; | |
656 write_buffer_no_lock()->GetBuffers(&buffers); | |
657 DCHECK(!buffers.empty()); | |
658 | |
659 // TODO(yzshen): Handle multi-segment writes more efficiently. | |
660 DWORD bytes_written_dword = 0; | |
661 | |
662 | |
663 | |
664 | |
665 // TODO(jam): right now we get in bad situation where we might first write | |
666 // the main buffer and then the MP gets sent before we write the transport | |
667 // buffer. We can fix this by sending information about partially written | |
668 // messages, or by teaching transport buffer how to grow the main buffer and | |
669 // write its data there. | |
670 // Until that's done, for now make another copy. | |
671 | |
672 size_t total_size = buffers[0].size; | |
673 if (buffers.size() > 1) | |
674 total_size+=buffers[1].size; | |
675 char* buf = new char[total_size]; | |
676 memcpy(buf, buffers[0].addr, buffers[0].size); | |
677 if (buffers.size() > 1) | |
678 memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size); | |
679 | |
680 BOOL result = WriteFile( | |
681 io_handler_->handle(), buf, | |
682 static_cast<DWORD>(total_size), | |
683 &bytes_written_dword, | |
684 &io_handler_->write_context_no_lock()->overlapped); | |
685 delete [] buf; | |
686 | |
687 if (!result) { | |
688 DWORD error = GetLastError(); | |
689 if (error == ERROR_BROKEN_PIPE) | |
690 return IO_FAILED_SHUTDOWN; | |
691 if (error != ERROR_IO_PENDING) { | |
692 LOG(WARNING) << "WriteFile: " | |
693 << logging::SystemErrorCodeToString(error); | |
694 return IO_FAILED_UNKNOWN; | |
695 } | |
696 } | |
697 | |
698 if (result && skip_completion_port_on_success_) { | |
699 *platform_handles_written = num_platform_handles; | |
700 *bytes_written = bytes_written_dword; | |
701 return IO_SUCCEEDED; | |
702 } | |
703 | |
704 if (!g_use_autoreset_event) { | |
705 if (!g_use_iocp) { | |
706 io_handler_->write_watcher_.StartWatching( | |
707 io_handler_->write_event, io_handler_, false); | |
708 } | |
709 } | |
710 // If the write is pending or the write has succeeded but we don't skip | |
711 // completion port on success, instruct |io_handler_| to wait for the | |
712 // completion packet. | |
713 // | |
714 // TODO(yzshen): it seems there isn't document saying that all error cases | |
715 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion | |
716 // packet. If we do get one for errors, | |
717 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about | |
718 // it. | |
719 | |
720 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles); | |
721 return IO_PENDING; | |
722 } | |
723 | |
724 IOResult ScheduleWriteNoLock() override { | |
725 write_lock().AssertAcquired(); | |
726 | |
727 DCHECK(io_handler_); | |
728 DCHECK(!io_handler_->pending_write_no_lock()); | |
729 | |
730 size_t platform_handles_written = 0; | |
731 size_t bytes_written = 0; | |
732 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | |
733 if (io_result == IO_SUCCEEDED) { | |
734 DCHECK(skip_completion_port_on_success_); | |
735 | |
736 // We have finished writing successfully. Queue a notification manually. | |
737 io_handler_->OnPendingWriteStartedNoLock(platform_handles_written); | |
738 // |io_handler_| won't go away before that task is run, so it is safe to | |
739 // use |base::Unretained()|. | |
740 message_loop_for_io()->PostTask( | |
741 FROM_HERE, | |
742 base::Bind(&RawChannelIOHandler::OnIOCompleted, | |
743 base::Unretained(io_handler_), | |
744 base::Unretained(io_handler_->write_context_no_lock()), | |
745 static_cast<DWORD>(bytes_written), ERROR_SUCCESS)); | |
746 return IO_PENDING; | |
747 } | |
748 | |
749 return io_result; | |
750 } | |
751 | |
752 void OnInit() override { | |
753 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
754 | |
755 if (!handle_.is_valid()) { | |
756 LOG(ERROR) << "Note: RawChannelWin " << this | |
757 << " early exiting in OnInit because no handle"; | |
758 return; | |
759 } | |
760 | |
761 DCHECK(handle_.is_valid()); | |
762 if (skip_completion_port_on_success_) { | |
763 // I don't know how this can fail (unless |handle_| is bad, in which case | |
764 // it's a bug in our code). | |
765 CHECK(g_vista_or_higher_functions.Get(). | |
766 SetFileCompletionNotificationModes( | |
767 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)); | |
768 } | |
769 | |
770 DCHECK(!io_handler_); | |
771 io_handler_ = new RawChannelIOHandler(this, handle_.Pass()); | |
772 } | |
773 | |
774 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, | |
775 scoped_ptr<WriteBuffer> write_buffer) override { | |
776 // happens on shutdown if didn't call init when doing createduplicate | |
777 if (message_loop_for_io()) { | |
778 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
779 } | |
780 | |
781 if (!io_handler_) { | |
782 // This is hit when creating a duplicate dispatcher since we don't call | |
783 // Init on it. | |
784 DCHECK_EQ(read_buffer->num_valid_bytes(), 0U); | |
785 DCHECK_EQ(write_buffer->queue_size(), 0U); | |
786 return; | |
787 } | |
788 | |
789 if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) { | |
790 // |io_handler_| will be alive until pending read/write (if any) | |
791 // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be | |
792 // freed up as soon as possible. | |
793 // Note: |CancelIo()| only cancels read/write requests started from this | |
794 // thread. | |
795 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) { | |
796 g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(), | |
797 nullptr); | |
798 } else { | |
799 CancelIo(io_handler_->handle()); | |
800 } | |
801 } | |
802 | |
803 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass()); | |
804 io_handler_ = nullptr; | |
805 } | |
806 | |
807 // Passed to |io_handler_| during initialization. | |
808 embedder::ScopedPlatformHandle handle_; | |
809 | |
810 RawChannelIOHandler* io_handler_; | |
811 | |
812 const bool skip_completion_port_on_success_; | |
813 | |
814 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin); | |
815 }; | |
816 | |
817 | |
818 } // namespace | |
819 | |
820 // ----------------------------------------------------------------------------- | |
821 | |
822 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle) { | |
823 return new RawChannelWin(handle.Pass()); | |
824 } | |
825 | |
826 size_t RawChannel::GetSerializedPlatformHandleSize() { | |
827 return sizeof(DWORD) + sizeof(HANDLE); | |
828 } | |
829 | |
830 } // namespace system | |
831 } // namespace mojo | |
OLD | NEW |