OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "mojo/edk/system/raw_channel.h" | |
6 | |
7 #include <windows.h> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/lazy_instance.h" | |
11 #include "base/location.h" | |
12 #include "base/logging.h" | |
13 #include "base/memory/scoped_ptr.h" | |
14 #include "base/message_loop/message_loop.h" | |
15 #include "base/process/process.h" | |
16 #include "base/synchronization/lock.h" | |
17 #include "base/win/object_watcher.h" | |
18 #include "base/win/windows_version.h" | |
19 #include "mojo/edk/embedder/platform_handle.h" | |
20 #include "mojo/public/cpp/system/macros.h" | |
21 | |
22 #define STATUS_CANCELLED 0xC0000120 | |
23 #define STATUS_PIPE_BROKEN 0xC000014B | |
24 | |
25 // We can't use IO completion ports if we send a message pipe. The reason is | |
26 // that the only way to stop an existing IOCP is by closing the pipe handle. | |
27 // See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.8 5).aspx | |
28 bool g_use_iocp = false; | |
29 | |
30 // Manual reset per | |
31 // Doc for overlapped I/O says use manual per | |
32 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85). aspx | |
33 // However using an auto-reset event makes the perf test 5x faster and also | |
34 // works since we don't wait on the event elsewhere or call GetOverlappedResult | |
35 // before it fires. | |
36 bool g_use_autoreset_event = true; | |
37 | |
38 namespace mojo { | |
39 namespace system { | |
40 | |
41 namespace { | |
42 | |
43 class VistaOrHigherFunctions { | |
44 public: | |
45 VistaOrHigherFunctions() | |
46 : is_vista_or_higher_( | |
47 base::win::GetVersion() >= base::win::VERSION_VISTA), | |
48 set_file_completion_notification_modes_(nullptr), | |
49 cancel_io_ex_(nullptr) { | |
50 if (!is_vista_or_higher_) | |
51 return; | |
52 | |
53 HMODULE module = GetModuleHandleW(L"kernel32.dll"); | |
54 set_file_completion_notification_modes_ = | |
55 reinterpret_cast<SetFileCompletionNotificationModesFunc>( | |
56 GetProcAddress(module, "SetFileCompletionNotificationModes")); | |
57 DCHECK(set_file_completion_notification_modes_); | |
58 | |
59 cancel_io_ex_ = | |
60 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx")); | |
61 DCHECK(cancel_io_ex_); | |
62 } | |
63 | |
64 bool is_vista_or_higher() const { return is_vista_or_higher_; } | |
65 | |
66 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) { | |
67 return set_file_completion_notification_modes_(handle, flags); | |
68 } | |
69 | |
70 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) { | |
71 return cancel_io_ex_(handle, overlapped); | |
72 } | |
73 | |
74 private: | |
75 using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR); | |
76 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED); | |
77 | |
78 bool is_vista_or_higher_; | |
79 SetFileCompletionNotificationModesFunc | |
80 set_file_completion_notification_modes_; | |
81 CancelIoExFunc cancel_io_ex_; | |
82 }; | |
83 | |
84 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions = | |
85 LAZY_INSTANCE_INITIALIZER; | |
86 | |
87 class RawChannelWin final : public RawChannel { | |
88 public: | |
89 RawChannelWin(embedder::ScopedPlatformHandle handle) | |
90 : handle_(handle.Pass()), | |
91 io_handler_(nullptr), | |
92 skip_completion_port_on_success_( | |
93 g_use_iocp && | |
94 g_vista_or_higher_functions.Get().is_vista_or_higher()) { | |
95 DCHECK(handle_.is_valid()); | |
96 } | |
97 ~RawChannelWin() override { | |
98 DCHECK(!io_handler_); | |
99 } | |
100 | |
101 private: | |
102 // RawChannelIOHandler receives OS notifications for I/O completion. It must | |
103 // be created on the I/O thread. | |
104 // | |
105 // It manages its own destruction. Destruction happens on the I/O thread when | |
106 // all the following conditions are satisfied: | |
107 // - |DetachFromOwnerNoLock()| has been called; | |
108 // - there is no pending read; | |
109 // - there is no pending write. | |
110 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler, | |
111 public base::win::ObjectWatcher::Delegate { | |
112 public: | |
113 RawChannelIOHandler(RawChannelWin* owner, | |
114 embedder::ScopedPlatformHandle handle) | |
115 : handle_(handle.Pass()), | |
116 owner_(owner), | |
117 suppress_self_destruct_(false), | |
118 pending_read_(false), | |
119 pending_write_(false), | |
120 platform_handles_written_(0), | |
121 pipe_disconnected_(false) { | |
122 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); | |
123 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); | |
124 if (g_use_iocp) { | |
125 owner_->message_loop_for_io()->RegisterIOHandler( | |
126 handle_.get().handle, this); | |
127 read_context_.handler = this; | |
128 write_context_.handler = this; | |
129 } else { | |
130 read_event = CreateEvent( | |
131 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL); | |
132 write_event = CreateEvent( | |
133 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL); | |
134 read_context_.overlapped.hEvent = read_event; | |
135 write_context_.overlapped.hEvent = write_event; | |
136 | |
137 | |
138 if (g_use_autoreset_event) { | |
139 read_watcher_.StartWatching(read_event, this, true); | |
140 write_watcher_.StartWatching(write_event, this, true); | |
141 } | |
142 } | |
143 } | |
144 | |
145 ~RawChannelIOHandler() override { | |
146 DCHECK(ShouldSelfDestruct()); | |
147 } | |
148 | |
149 HANDLE handle() const { return handle_.get().handle; } | |
150 | |
151 // The following methods are only called by the owner on the I/O thread. | |
152 bool pending_read() const { | |
153 DCHECK(owner_); | |
154 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
155 return pending_read_; | |
156 } | |
157 | |
158 base::MessageLoopForIO::IOContext* read_context() { | |
159 DCHECK(owner_); | |
160 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
161 return &read_context_; | |
162 } | |
163 | |
164 // Instructs the object to wait for an |OnIOCompleted()| notification. | |
165 void OnPendingReadStarted() { | |
166 DCHECK(owner_); | |
167 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
168 DCHECK(!pending_read_); | |
169 pending_read_ = true; | |
170 } | |
171 | |
172 // The following methods are only called by the owner under | |
173 // |owner_->write_lock()|. | |
174 bool pending_write_no_lock() const { | |
175 DCHECK(owner_); | |
176 owner_->write_lock().AssertAcquired(); | |
177 return pending_write_; | |
178 } | |
179 | |
180 base::MessageLoopForIO::IOContext* write_context_no_lock() { | |
181 DCHECK(owner_); | |
182 owner_->write_lock().AssertAcquired(); | |
183 return &write_context_; | |
184 } | |
185 // Instructs the object to wait for an |OnIOCompleted()| notification. | |
186 void OnPendingWriteStartedNoLock(size_t platform_handles_written) { | |
187 DCHECK(owner_); | |
188 owner_->write_lock().AssertAcquired(); | |
189 DCHECK(!pending_write_); | |
190 pending_write_ = true; | |
191 platform_handles_written_ = platform_handles_written; | |
192 } | |
193 | |
194 // |base::MessageLoopForIO::IOHandler| implementation: | |
195 // Must be called on the I/O thread. It could be called before or after | |
196 // detached from the owner. | |
197 void OnIOCompleted(base::MessageLoopForIO::IOContext* context, | |
198 DWORD bytes_transferred, | |
199 DWORD error) override { | |
200 DCHECK(owner_); | |
201 DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io()); | |
202 | |
203 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case | |
204 // they result in a call to |Shutdown()|). | |
205 bool old_suppress_self_destruct = suppress_self_destruct_; | |
206 suppress_self_destruct_ = true; | |
207 | |
208 if (context == &read_context_) | |
209 OnReadCompleted(bytes_transferred, error); | |
210 else if (context == &write_context_) | |
211 OnWriteCompleted(bytes_transferred, error); | |
212 else | |
213 NOTREACHED(); | |
214 | |
215 // Maybe allow self-destruction again. | |
216 suppress_self_destruct_ = old_suppress_self_destruct; | |
217 | |
218 if (ShouldSelfDestruct()) | |
219 delete this; | |
220 } | |
221 | |
222 // Must be called on the I/O thread under |owner_->write_lock()|. | |
223 // After this call, the owner must not make any further calls on this | |
224 // object, and therefore the object is used on the I/O thread exclusively | |
225 // (if it stays alive). | |
226 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer, | |
227 scoped_ptr<WriteBuffer> write_buffer) { | |
228 DCHECK(owner_); | |
229 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); | |
230 //owner_->write_lock().AssertAcquired(); | |
231 | |
232 // If read/write is pending, we have to retain the corresponding buffer. | |
233 if (pending_read_) | |
234 preserved_read_buffer_after_detach_ = read_buffer.Pass(); | |
235 if (pending_write_) | |
236 preserved_write_buffer_after_detach_ = write_buffer.Pass(); | |
237 | |
238 owner_ = nullptr; | |
239 read_watcher_.StopWatching(); | |
240 write_watcher_.StopWatching(); | |
241 bool read_signalled = | |
242 WaitForSingleObjectEx(read_event, 0, true) != WAIT_TIMEOUT; | |
243 bool write_signalled = | |
244 WaitForSingleObjectEx(write_event, 0, true) != WAIT_TIMEOUT; | |
245 | |
246 if (read_signalled) { | |
247 if (read_context_.overlapped.Internal == STATUS_PIPE_BROKEN) { | |
248 pipe_disconnected_ = true; | |
249 } else if (read_context_.overlapped.Internal == STATUS_CANCELLED) { | |
250 // fine condition | |
251 } else { | |
252 NOTREACHED() << "TODO(jam)"; | |
253 } | |
254 } | |
255 | |
256 // Otherwise we will never quit since both endpoints of the channel could | |
257 // be waiting for their Read calls. | |
258 pending_read_ = false; | |
259 | |
260 if (write_signalled) { | |
261 NOTREACHED() << "TODO(jam)"; | |
262 } | |
263 | |
264 if (ShouldSelfDestruct()) | |
265 delete this; | |
266 } | |
267 | |
268 embedder::ScopedPlatformHandle ReleaseHandle( | |
269 std::vector<char>* read_buffer) { | |
270 // TODO(jam): handle XP | |
271 CancelIoEx(handle(), NULL); | |
272 // NOTE!!!! | |
273 // The above call will cancel pending IO calls. | |
274 // HOWEVER, some could have already finished and posted task to IO thread | |
275 // that will execute | |
276 | |
277 | |
278 size_t read_buffer_byte_size = owner_->read_buffer()->num_valid_bytes(); | |
279 | |
280 if (pending_read_) { | |
281 DWORD bytes_read_dword = 0; | |
282 | |
283 DWORD old_bytes = read_context_.overlapped.InternalHigh; | |
284 | |
285 //TODO(jam): for XP, can return TRUE here to wait. also below. | |
286 BOOL rv = GetOverlappedResult( | |
287 handle(), &read_context_.overlapped, &bytes_read_dword, FALSE); | |
288 DCHECK_EQ(old_bytes, bytes_read_dword); | |
289 if (rv) { | |
290 if (read_context_.overlapped.Internal != STATUS_CANCELLED) { | |
291 read_buffer_byte_size += read_context_.overlapped.InternalHigh; | |
292 } | |
293 } | |
294 pending_read_ = false; | |
295 } | |
296 | |
297 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock(); | |
298 | |
299 if (pending_write_) { | |
300 DWORD bytes_written_dword = 0; | |
301 DWORD old_bytes = write_context_.overlapped.InternalHigh; | |
302 | |
303 | |
304 BOOL rv = GetOverlappedResult( | |
305 handle(), &write_context_.overlapped, &bytes_written_dword, FALSE); | |
306 | |
307 if (old_bytes != bytes_written_dword) { | |
308 NOTREACHED(); | |
309 } | |
310 | |
311 if (rv) { | |
312 if (write_context_.overlapped.Internal != STATUS_CANCELLED) { | |
313 CHECK(write_buffer->queue_size() != 0); | |
314 | |
315 // TODO(jam) | |
316 DCHECK(!write_buffer->HavePlatformHandlesToSend()); | |
317 | |
318 write_buffer->data_offset_ += bytes_written_dword; | |
319 | |
320 // TODO(jam): copied from OnWriteCompletedNoLock | |
321 MessageInTransit* message = | |
322 write_buffer->message_queue()->PeekMessage(); | |
323 if (write_buffer->data_offset_ >= message->total_size()) { | |
324 // Complete write. | |
325 CHECK_EQ(write_buffer->data_offset_, message->total_size()); | |
326 write_buffer->message_queue_.DiscardMessage(); | |
327 write_buffer->platform_handles_offset_ = 0; | |
328 write_buffer->data_offset_ = 0; | |
329 } | |
330 | |
331 | |
332 //TODO(jam): handle more write msgs | |
333 DCHECK(write_buffer->message_queue_.IsEmpty()); | |
334 } | |
335 } | |
336 pending_write_ = false; | |
337 } | |
338 | |
339 if (read_buffer_byte_size) { | |
340 read_buffer->resize(read_buffer_byte_size); | |
341 memcpy(&(*read_buffer)[0], owner_->read_buffer()->buffer(), | |
342 read_buffer_byte_size); | |
343 owner_->read_buffer()->Reset(); | |
344 } | |
345 | |
346 return embedder::ScopedPlatformHandle(handle_.release()); | |
347 } | |
348 | |
349 void OnObjectSignaled(HANDLE object) override { | |
350 // TODO(jam): need to call out to owner? | |
351 | |
352 | |
353 //not needed per doc, since reset by readfile & writefile | |
354 //ResetEvent(object); | |
355 if (object == read_event) { | |
356 if (read_context_.overlapped.Internal == STATUS_CANCELLED) | |
357 return; | |
358 | |
359 { | |
360 // TODO(jam): improve this locking | |
361 base::AutoLock locker(owner_->read_lock()); | |
362 if (!handle_.is_valid()) { | |
363 | |
364 | |
365 | |
366 | |
367 // TODO(jam): have way to cancel objectwatcher posttask so that we | |
368 // can have a dcheck here and catch any erronous conditions. right | |
369 // now this fires for non-problems. | |
370 return; | |
371 | |
372 | |
373 | |
374 } | |
375 // this isn't covered with above lock | |
376 // DCHECK(!owner_->debug_started_sending()); | |
377 DCHECK(handle_.is_valid()); | |
378 } | |
379 OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh, | |
380 read_context_.overlapped.Internal); | |
381 | |
382 } else { | |
383 if (write_context_.overlapped.Internal == STATUS_CANCELLED) | |
384 return; | |
385 DCHECK(!owner_->debug_started_sending()); | |
386 DCHECK(handle_.is_valid()); | |
387 CHECK(object == write_event); | |
388 OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh, | |
yzshen1
2015/09/23 22:47:09
Since write could happen on any thread, and we sta
| |
389 write_context_.overlapped.Internal); | |
390 } | |
391 } | |
392 HANDLE read_event, write_event; | |
393 base::win::ObjectWatcher read_watcher_, write_watcher_; | |
394 | |
395 bool pipe_disconnected() { return pipe_disconnected_; } | |
396 | |
397 void QuitNow() { | |
398 pipe_disconnected_ = true; | |
399 owner_ = nullptr; | |
400 base::MessageLoop::current()->DeleteSoon(FROM_HERE, this); | |
401 } | |
402 | |
403 private: | |
404 // Returns true if |owner_| has been reset and there is not pending read or | |
405 // write. | |
406 // Must be called on the I/O thread. | |
407 bool ShouldSelfDestruct() const { | |
408 if (owner_ || suppress_self_destruct_) | |
409 return false; | |
410 | |
411 if (pipe_disconnected_) | |
412 return true; | |
413 | |
414 // Note: Detached, hence no lock needed for |pending_write_|. | |
415 return !pending_read_ && !pending_write_; | |
416 } | |
417 | |
418 // Must be called on the I/O thread. It may be called before or after | |
419 // detaching from the owner. | |
420 void OnReadCompleted(DWORD bytes_read, DWORD error) { | |
421 DCHECK(owner_); | |
422 DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io()); | |
423 DCHECK(suppress_self_destruct_); | |
424 | |
425 if (g_use_autoreset_event && !pending_read_) | |
426 return; | |
427 | |
428 CHECK(pending_read_); | |
429 pending_read_ = false; | |
430 if (!owner_) | |
431 return; | |
432 | |
433 // Note: |OnReadCompleted()| may detach us from |owner_|. | |
434 if (error == ERROR_SUCCESS) { | |
435 DCHECK_GT(bytes_read, 0u); | |
436 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); | |
437 } else if (error == ERROR_BROKEN_PIPE || | |
438 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) { | |
439 DCHECK_EQ(bytes_read, 0u); | |
440 pipe_disconnected_ = true; | |
441 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0); | |
442 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) { | |
443 return owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); | |
444 } else { | |
445 DCHECK_EQ(bytes_read, 0u); | |
446 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); | |
447 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0); | |
448 } | |
449 } | |
450 | |
451 // Must be called on the I/O thread. It may be called before or after | |
452 // detaching from the owner. | |
453 void OnWriteCompleted(DWORD bytes_written, DWORD error) { | |
454 DCHECK(base::MessageLoop::current() == owner_->message_loop_for_io()); | |
455 DCHECK(suppress_self_destruct_); | |
456 | |
457 if (!owner_) { | |
458 // No lock needed. | |
459 CHECK(pending_write_); | |
460 pending_write_ = false; | |
461 return; | |
462 } | |
463 | |
464 { | |
465 base::AutoLock locker(owner_->write_lock()); | |
466 if (g_use_autoreset_event && !pending_write_) | |
467 return; | |
468 | |
469 CHECK(pending_write_); | |
470 pending_write_ = false; | |
471 } | |
472 | |
473 // Note: |OnWriteCompleted()| may detach us from |owner_|. | |
474 if (error == ERROR_SUCCESS) { | |
475 // Reset |platform_handles_written_| before calling |OnWriteCompleted()| | |
476 // since that function may call back to this class and set it again. | |
477 size_t local_platform_handles_written_ = platform_handles_written_; | |
478 platform_handles_written_ = 0; | |
479 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_, | |
480 bytes_written); | |
481 } else if (error == ERROR_BROKEN_PIPE || | |
482 (g_use_autoreset_event && error ==STATUS_PIPE_BROKEN)) { | |
yzshen1
2015/09/23 22:47:09
It seems nice to unify the error code used.
| |
483 pipe_disconnected_ = true; | |
484 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0); | |
485 } else if (error == ERROR_NO_MORE_ITEMS && g_use_autoreset_event) { | |
486 size_t local_platform_handles_written_ = platform_handles_written_; | |
487 platform_handles_written_ = 0; | |
488 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written_, | |
489 bytes_written); | |
490 } else { | |
491 LOG(WARNING) << "WriteFile: " | |
492 << logging::SystemErrorCodeToString(error); | |
493 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); | |
494 } | |
495 } | |
496 | |
497 embedder::ScopedPlatformHandle handle_; | |
498 | |
499 // |owner_| is reset on the I/O thread under |owner_->write_lock()|. | |
500 // Therefore, it may be used on any thread under lock; or on the I/O thread | |
501 // without locking. | |
502 RawChannelWin* owner_; | |
503 | |
504 // The following members must be used on the I/O thread. | |
505 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_; | |
506 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_; | |
507 bool suppress_self_destruct_; | |
508 | |
509 bool pending_read_; | |
510 base::MessageLoopForIO::IOContext read_context_; | |
511 | |
512 // The following members must be used under |owner_->write_lock()| while the | |
513 // object is still attached to the owner, and only on the I/O thread | |
514 // afterwards. | |
515 bool pending_write_; | |
516 size_t platform_handles_written_; | |
517 base::MessageLoopForIO::IOContext write_context_; | |
518 | |
519 bool pipe_disconnected_; | |
520 | |
521 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler); | |
522 }; | |
523 | |
524 embedder::ScopedPlatformHandle ReleaseHandleNoLock( | |
525 std::vector<char>* read_buffer_out) override { | |
526 std::vector<WriteBuffer::Buffer> buffers; | |
527 write_buffer_no_lock()->GetBuffers(&buffers); | |
528 if (!buffers.empty()) { | |
529 // TODO(jam): copy code in OnShutdownNoLock | |
530 NOTREACHED() << "releasing handle with pending write buffer"; | |
531 } | |
532 | |
533 | |
534 if( handle_.is_valid()) { | |
535 // SetInitialBuffer could have been called on main thread before OnInit | |
536 // is called on Io thread. and in meantime releasehandle called. | |
537 //DCHECK(read_buffer()->num_valid_bytes() == 0); | |
538 if (read_buffer()->num_valid_bytes()) { | |
539 read_buffer_out->resize(read_buffer()->num_valid_bytes()); | |
540 memcpy(&(*read_buffer_out)[0], read_buffer()->buffer(), | |
541 read_buffer()->num_valid_bytes()); | |
542 read_buffer()->Reset(); | |
543 } | |
544 DCHECK(write_buffer_no_lock()->queue_size() == 0); | |
545 return embedder::ScopedPlatformHandle( | |
546 embedder::PlatformHandle(handle_.release().handle)); | |
547 } | |
548 | |
549 return io_handler_->ReleaseHandle(read_buffer_out); | |
550 } | |
551 embedder::PlatformHandle HandleForDebuggingNoLock() override { | |
552 if (handle_.is_valid()) | |
553 return handle_.get(); | |
554 | |
555 if (!io_handler_) | |
556 return embedder::PlatformHandle(); | |
557 | |
558 return embedder::PlatformHandle(io_handler_->handle()); | |
559 } | |
560 | |
561 IOResult Read(size_t* bytes_read) override { | |
562 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
563 | |
564 char* buffer = nullptr; | |
565 size_t bytes_to_read = 0; | |
566 read_buffer()->GetBuffer(&buffer, &bytes_to_read); | |
567 | |
568 DCHECK(io_handler_); | |
569 DCHECK(!io_handler_->pending_read()); | |
570 BOOL result = ReadFile( | |
571 io_handler_->handle(), buffer, static_cast<DWORD>(bytes_to_read), | |
572 nullptr, &io_handler_->read_context()->overlapped); | |
573 if (!result) { | |
574 DWORD error = GetLastError(); | |
575 if (error == ERROR_BROKEN_PIPE) | |
576 return IO_FAILED_SHUTDOWN; | |
577 if (error != ERROR_IO_PENDING) { | |
578 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); | |
579 return IO_FAILED_UNKNOWN; | |
580 } | |
581 } | |
582 | |
583 if (result && skip_completion_port_on_success_) { | |
584 DWORD bytes_read_dword = 0; | |
585 BOOL get_size_result = GetOverlappedResult( | |
586 io_handler_->handle(), &io_handler_->read_context()->overlapped, | |
587 &bytes_read_dword, FALSE); | |
588 DPCHECK(get_size_result); | |
589 *bytes_read = bytes_read_dword; | |
590 return IO_SUCCEEDED; | |
591 } | |
592 | |
593 if (!g_use_autoreset_event) { | |
594 if (!g_use_iocp) { | |
595 io_handler_->read_watcher_.StartWatching( | |
596 io_handler_->read_event, io_handler_, false); | |
597 } | |
598 } | |
599 // If the read is pending or the read has succeeded but we don't skip | |
600 // completion port on success, instruct |io_handler_| to wait for the | |
601 // completion packet. | |
602 // | |
603 // TODO(yzshen): It seems there isn't document saying that all error cases | |
604 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion | |
605 // packet. If we do get one for errors, | |
606 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about | |
607 // it. | |
608 | |
609 io_handler_->OnPendingReadStarted(); | |
610 return IO_PENDING; | |
611 } | |
612 | |
613 IOResult ScheduleRead() override { | |
614 if (!io_handler_) | |
615 return IO_PENDING; // OnInit could have earlied out. | |
616 | |
617 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
618 DCHECK(io_handler_); | |
619 DCHECK(!io_handler_->pending_read()); | |
620 | |
621 size_t bytes_read = 0; | |
622 IOResult io_result = Read(&bytes_read); | |
623 if (io_result == IO_SUCCEEDED) { | |
624 DCHECK(skip_completion_port_on_success_); | |
625 | |
626 // We have finished reading successfully. Queue a notification manually. | |
627 io_handler_->OnPendingReadStarted(); | |
628 // |io_handler_| won't go away before the task is run, so it is safe to | |
629 // use |base::Unretained()|. | |
630 message_loop_for_io()->PostTask( | |
631 FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted, | |
632 base::Unretained(io_handler_), | |
633 base::Unretained(io_handler_->read_context()), | |
634 static_cast<DWORD>(bytes_read), ERROR_SUCCESS)); | |
635 return IO_PENDING; | |
636 } | |
637 | |
638 return io_result; | |
639 } | |
640 embedder::ScopedPlatformHandleVectorPtr GetReadPlatformHandles( | |
641 size_t num_platform_handles, | |
642 const void* platform_handle_table) override { | |
643 // TODO(jam): this code will have to be updated once it's used in a sandbox | |
644 // and the receiving process doesn't have duplicate permission for the | |
645 // receiver. Once there's a broker and we have a connection to it (possibly | |
646 // through ConnectionManager), then we can make a sync IPC to it here to get | |
647 // a token for this handle, and it will duplicate the handle to is process. | |
648 // Then we pass the token to the receiver, which will then make a sync call | |
649 // to the broker to get a duplicated handle. This will also allow us to | |
650 // avoid leaks of the handle if the receiver dies, since the broker can | |
651 // notice that. | |
652 DCHECK_GT(num_platform_handles, 0u); | |
653 embedder::ScopedPlatformHandleVectorPtr rv( | |
654 new embedder::PlatformHandleVector()); | |
655 | |
656 const char* serialization_data = | |
657 static_cast<const char*>(platform_handle_table); | |
658 for (size_t i = 0; i < num_platform_handles; i++) { | |
659 DWORD pid = *reinterpret_cast<const DWORD*>(serialization_data); | |
660 serialization_data += sizeof(DWORD); | |
661 HANDLE source_handle = | |
662 *reinterpret_cast<const HANDLE*>(serialization_data); | |
663 serialization_data += sizeof(HANDLE); | |
664 base::Process sender = | |
665 base::Process::OpenWithAccess(pid, PROCESS_DUP_HANDLE); | |
666 DCHECK(sender.IsValid()); | |
667 HANDLE target_handle = NULL; | |
668 BOOL dup_result = DuplicateHandle( | |
669 sender.Handle(), source_handle, | |
670 base::GetCurrentProcessHandle(), &target_handle, 0, | |
671 FALSE, DUPLICATE_SAME_ACCESS | DUPLICATE_CLOSE_SOURCE); | |
672 DCHECK(dup_result); | |
673 rv->push_back(embedder::PlatformHandle(target_handle)); | |
674 } | |
675 return rv.Pass(); | |
676 } | |
677 | |
678 IOResult WriteNoLock(size_t* platform_handles_written, | |
679 size_t* bytes_written) override { | |
680 write_lock().AssertAcquired(); | |
681 | |
682 DCHECK(io_handler_); | |
683 DCHECK(!io_handler_->pending_write_no_lock()); | |
684 | |
685 size_t num_platform_handles = 0; | |
686 if (write_buffer_no_lock()->HavePlatformHandlesToSend()) { | |
687 // Since we're not sure which process might ultimately deserialize this | |
688 // message, we can't duplicate the handle now. Instead, write the process | |
689 // ID and handle now and let the receiver duplicate it. | |
690 embedder::PlatformHandle* platform_handles; | |
691 void* serialization_data_temp; | |
692 write_buffer_no_lock()->GetPlatformHandlesToSend( | |
693 &num_platform_handles, &platform_handles, &serialization_data_temp); | |
694 char* serialization_data = static_cast<char*>(serialization_data_temp); | |
695 DCHECK_GT(num_platform_handles, 0u); | |
696 DCHECK(platform_handles); | |
697 | |
698 DWORD current_process_id = base::GetCurrentProcId(); | |
699 for (size_t i = 0; i < num_platform_handles; i++) { | |
700 *reinterpret_cast<DWORD*>(serialization_data) = current_process_id; | |
701 serialization_data += sizeof(DWORD); | |
702 *reinterpret_cast<HANDLE*>(serialization_data) = | |
703 platform_handles[i].handle; | |
704 serialization_data += sizeof(HANDLE); | |
705 platform_handles[i] = embedder::PlatformHandle(); | |
706 } | |
707 } | |
708 | |
709 std::vector<WriteBuffer::Buffer> buffers; | |
710 write_buffer_no_lock()->GetBuffers(&buffers); | |
711 DCHECK(!buffers.empty()); | |
712 | |
713 // TODO(yzshen): Handle multi-segment writes more efficiently. | |
714 DWORD bytes_written_dword = 0; | |
715 | |
716 | |
717 | |
718 | |
719 // TODO(jam): right now we get in bad situation where we might first write | |
yzshen1
2015/09/23 22:47:09
I haven't understood: I thought we synchronously f
| |
720 // the main buffer and then the MP gets sent before we write the transport | |
721 // buffer. We can fix this by sending information about partially written | |
722 // messages, or by teaching transport buffer how to grow the main buffer and | |
723 // write its data there. | |
724 // Until that's done, for now make another copy. | |
725 | |
726 size_t total_size = buffers[0].size; | |
727 if (buffers.size() > 1) | |
728 total_size+=buffers[1].size; | |
729 char* buf = new char[total_size]; | |
730 memcpy(buf, buffers[0].addr, buffers[0].size); | |
731 if (buffers.size() > 1) | |
732 memcpy(buf + buffers[0].size, buffers[1].addr, buffers[1].size); | |
733 | |
734 BOOL result = WriteFile( | |
735 io_handler_->handle(), buf, | |
736 static_cast<DWORD>(total_size), | |
737 &bytes_written_dword, | |
738 &io_handler_->write_context_no_lock()->overlapped); | |
739 delete [] buf; | |
yzshen1
2015/09/23 22:47:09
I think we need to keep the buffer valid until the
| |
740 | |
741 if (!result) { | |
742 DWORD error = GetLastError(); | |
743 if (error == ERROR_BROKEN_PIPE) | |
744 return IO_FAILED_SHUTDOWN; | |
745 if (error != ERROR_IO_PENDING) { | |
746 LOG(WARNING) << "WriteFile: " | |
747 << logging::SystemErrorCodeToString(error); | |
748 return IO_FAILED_UNKNOWN; | |
749 } | |
750 } | |
751 | |
752 if (result && skip_completion_port_on_success_) { | |
753 *platform_handles_written = num_platform_handles; | |
754 *bytes_written = bytes_written_dword; | |
755 return IO_SUCCEEDED; | |
756 } | |
757 | |
758 if (!g_use_autoreset_event) { | |
759 if (!g_use_iocp) { | |
760 io_handler_->write_watcher_.StartWatching( | |
761 io_handler_->write_event, io_handler_, false); | |
762 } | |
763 } | |
764 // If the write is pending or the write has succeeded but we don't skip | |
765 // completion port on success, instruct |io_handler_| to wait for the | |
766 // completion packet. | |
767 // | |
768 // TODO(yzshen): it seems there isn't document saying that all error cases | |
769 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion | |
770 // packet. If we do get one for errors, | |
771 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about | |
772 // it. | |
773 | |
774 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles); | |
775 return IO_PENDING; | |
776 } | |
777 | |
778 IOResult ScheduleWriteNoLock() override { | |
779 write_lock().AssertAcquired(); | |
780 | |
781 DCHECK(io_handler_); | |
782 DCHECK(!io_handler_->pending_write_no_lock()); | |
783 | |
784 size_t platform_handles_written = 0; | |
785 size_t bytes_written = 0; | |
786 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); | |
787 if (io_result == IO_SUCCEEDED) { | |
788 DCHECK(skip_completion_port_on_success_); | |
789 | |
790 // We have finished writing successfully. Queue a notification manually. | |
791 io_handler_->OnPendingWriteStartedNoLock(platform_handles_written); | |
792 // |io_handler_| won't go away before that task is run, so it is safe to | |
793 // use |base::Unretained()|. | |
794 message_loop_for_io()->PostTask( | |
795 FROM_HERE, | |
796 base::Bind(&RawChannelIOHandler::OnIOCompleted, | |
797 base::Unretained(io_handler_), | |
798 base::Unretained(io_handler_->write_context_no_lock()), | |
799 static_cast<DWORD>(bytes_written), ERROR_SUCCESS)); | |
800 return IO_PENDING; | |
801 } | |
802 | |
803 return io_result; | |
804 } | |
805 | |
806 void OnInit() override { | |
807 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
808 | |
809 if (!handle_.is_valid()) { | |
810 LOG(ERROR) << "Note: RawChannelWin " << this | |
811 << " early exiting in OnInit because no handle"; | |
812 return; | |
813 } | |
814 | |
815 DCHECK(handle_.is_valid()); | |
816 if (skip_completion_port_on_success_) { | |
817 // I don't know how this can fail (unless |handle_| is bad, in which case | |
818 // it's a bug in our code). | |
819 CHECK(g_vista_or_higher_functions.Get(). | |
820 SetFileCompletionNotificationModes( | |
821 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)); | |
822 } | |
823 | |
824 DCHECK(!io_handler_); | |
825 io_handler_ = new RawChannelIOHandler(this, handle_.Pass()); | |
826 } | |
827 | |
828 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, | |
829 scoped_ptr<WriteBuffer> write_buffer) override { | |
830 // happens on shutdown if didn't call init when doing createduplicate | |
831 if (message_loop_for_io()) { | |
832 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); | |
833 } | |
834 | |
835 if (!io_handler_) { | |
836 // This is hit when creating a duplicate dispatcher since we don't call | |
yzshen1
2015/09/23 22:47:09
Before this CL, Init() is restricted to be called
| |
837 // Init on it. | |
838 DCHECK_EQ(read_buffer->num_valid_bytes(), 0U); | |
839 DCHECK_EQ(write_buffer->queue_size(), 0U); | |
840 return; | |
841 } | |
842 | |
843 if (io_handler_->pending_read() || io_handler_->pending_write_no_lock()) { | |
844 // |io_handler_| will be alive until pending read/write (if any) | |
845 // completes. Call |CancelIoEx()| or |CancelIo()| so that resources can be | |
846 // freed up as soon as possible. | |
847 // Note: |CancelIo()| only cancels read/write requests started from this | |
848 // thread. | |
849 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) { | |
850 g_vista_or_higher_functions.Get().CancelIoEx(io_handler_->handle(), | |
851 nullptr); | |
852 } else { | |
853 CancelIo(io_handler_->handle()); | |
854 } | |
855 } | |
856 | |
857 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass()); | |
858 io_handler_ = nullptr; | |
859 } | |
860 | |
861 // Passed to |io_handler_| during initialization. | |
862 embedder::ScopedPlatformHandle handle_; | |
863 | |
864 RawChannelIOHandler* io_handler_; | |
865 | |
866 const bool skip_completion_port_on_success_; | |
867 | |
868 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin); | |
869 }; | |
870 | |
871 | |
872 } // namespace | |
873 | |
874 // ----------------------------------------------------------------------------- | |
875 | |
876 RawChannel* RawChannel::Create(embedder::ScopedPlatformHandle handle) { | |
877 return new RawChannelWin(handle.Pass()); | |
878 } | |
879 | |
880 size_t RawChannel::GetSerializedPlatformHandleSize() { | |
881 return sizeof(DWORD) + sizeof(HANDLE); | |
882 } | |
883 | |
884 } // namespace system | |
885 } // namespace mojo | |
OLD | NEW |