Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: mojo/edk/system/raw_channel_win.cc

Issue 1403033003: Last set of fixes to make the src/mojo/edk pass the page cycler. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: fixes now that OnError can be called multiple times Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/edk/system/raw_channel_posix.cc ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "mojo/edk/system/raw_channel.h" 5 #include "mojo/edk/system/raw_channel.h"
6 6
7 #include <windows.h> 7 #include <windows.h>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/lazy_instance.h" 10 #include "base/lazy_instance.h"
11 #include "base/location.h" 11 #include "base/location.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/memory/scoped_ptr.h" 13 #include "base/memory/scoped_ptr.h"
14 #include "base/message_loop/message_loop.h" 14 #include "base/message_loop/message_loop.h"
15 #include "base/process/process.h" 15 #include "base/process/process.h"
16 #include "base/synchronization/lock.h" 16 #include "base/synchronization/lock.h"
17 #include "base/win/object_watcher.h" 17 #include "base/win/scoped_handle.h"
18 #include "base/win/windows_version.h" 18 #include "base/win/windows_version.h"
19 #include "mojo/edk/embedder/platform_handle.h" 19 #include "mojo/edk/embedder/platform_handle.h"
20 #include "mojo/edk/system/transport_data.h" 20 #include "mojo/edk/system/transport_data.h"
21 #include "mojo/public/cpp/system/macros.h" 21 #include "mojo/public/cpp/system/macros.h"
22 22
23 #define STATUS_CANCELLED 0xC0000120 23 #define STATUS_CANCELLED 0xC0000120
24 #define STATUS_PIPE_BROKEN 0xC000014B 24 #define STATUS_PIPE_BROKEN 0xC000014B
25 25
26 // We can't use IO completion ports if we send a message pipe. The reason is
27 // that the only way to stop an existing IOCP is by closing the pipe handle.
28 // See https://msdn.microsoft.com/en-us/library/windows/hardware/ff545834(v=vs.8 5).aspx
29 bool g_use_iocp = false;
30
31 // Manual reset per
32 // Doc for overlapped I/O says use manual per
33 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342(v=vs.85). aspx
34 // However using an auto-reset event makes the perf test 5x faster and also
35 // works since we don't wait on the event elsewhere or call GetOverlappedResult
36 // before it fires.
37 bool g_use_autoreset_event = true;
38
39 namespace mojo { 26 namespace mojo {
40 namespace edk { 27 namespace edk {
41 28
42 namespace { 29 namespace {
43 30
44 struct MOJO_ALIGNAS(8) SerializedHandle { 31 struct MOJO_ALIGNAS(8) SerializedHandle {
45 DWORD handle_pid; 32 DWORD handle_pid;
46 HANDLE handle; 33 HANDLE handle;
47 }; 34 };
48 35
49 class VistaOrHigherFunctions { 36 class VistaOrHigherFunctions {
50 public: 37 public:
51 VistaOrHigherFunctions() 38 VistaOrHigherFunctions()
52 : is_vista_or_higher_( 39 : is_vista_or_higher_(
53 base::win::GetVersion() >= base::win::VERSION_VISTA), 40 base::win::GetVersion() >= base::win::VERSION_VISTA),
54 set_file_completion_notification_modes_(nullptr),
55 cancel_io_ex_(nullptr), 41 cancel_io_ex_(nullptr),
56 get_file_information_by_handle_ex_(nullptr) { 42 get_file_information_by_handle_ex_(nullptr) {
57 if (!is_vista_or_higher_) 43 if (!is_vista_or_higher_)
58 return; 44 return;
59 45
60 HMODULE module = GetModuleHandleW(L"kernel32.dll"); 46 HMODULE module = GetModuleHandleW(L"kernel32.dll");
61 set_file_completion_notification_modes_ =
62 reinterpret_cast<SetFileCompletionNotificationModesFunc>(
63 GetProcAddress(module, "SetFileCompletionNotificationModes"));
64 DCHECK(set_file_completion_notification_modes_);
65
66 cancel_io_ex_ = 47 cancel_io_ex_ =
67 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx")); 48 reinterpret_cast<CancelIoExFunc>(GetProcAddress(module, "CancelIoEx"));
68 DCHECK(cancel_io_ex_); 49 DCHECK(cancel_io_ex_);
69 50
70 get_file_information_by_handle_ex_ = 51 get_file_information_by_handle_ex_ =
71 reinterpret_cast<GetFileInformationByHandleExFunc>( 52 reinterpret_cast<GetFileInformationByHandleExFunc>(
72 GetProcAddress(module, "GetFileInformationByHandleEx")); 53 GetProcAddress(module, "GetFileInformationByHandleEx"));
73 DCHECK(get_file_information_by_handle_ex_); 54 DCHECK(get_file_information_by_handle_ex_);
74 } 55 }
75 56
76 bool is_vista_or_higher() const { return is_vista_or_higher_; } 57 bool is_vista_or_higher() const { return is_vista_or_higher_; }
77 58
78 BOOL SetFileCompletionNotificationModes(HANDLE handle, UCHAR flags) {
79 return set_file_completion_notification_modes_(handle, flags);
80 }
81
82 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) { 59 BOOL CancelIoEx(HANDLE handle, LPOVERLAPPED overlapped) {
83 return cancel_io_ex_(handle, overlapped); 60 return cancel_io_ex_(handle, overlapped);
84 } 61 }
85 62
86 BOOL GetFileInformationByHandleEx(HANDLE handle, 63 BOOL GetFileInformationByHandleEx(HANDLE handle,
87 FILE_INFO_BY_HANDLE_CLASS file_info_class, 64 FILE_INFO_BY_HANDLE_CLASS file_info_class,
88 LPVOID file_info, 65 LPVOID file_info,
89 DWORD buffer_size) { 66 DWORD buffer_size) {
90 return get_file_information_by_handle_ex_( 67 return get_file_information_by_handle_ex_(
91 handle, file_info_class, file_info, buffer_size); 68 handle, file_info_class, file_info, buffer_size);
92 } 69 }
93 70
94 private: 71 private:
95 using SetFileCompletionNotificationModesFunc = BOOL(WINAPI*)(HANDLE, UCHAR);
96 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED); 72 using CancelIoExFunc = BOOL(WINAPI*)(HANDLE, LPOVERLAPPED);
97 using GetFileInformationByHandleExFunc = BOOL(WINAPI*)( 73 using GetFileInformationByHandleExFunc = BOOL(WINAPI*)(
98 HANDLE, FILE_INFO_BY_HANDLE_CLASS, LPVOID, DWORD); 74 HANDLE, FILE_INFO_BY_HANDLE_CLASS, LPVOID, DWORD);
99 75
100 bool is_vista_or_higher_; 76 bool is_vista_or_higher_;
101 SetFileCompletionNotificationModesFunc
102 set_file_completion_notification_modes_;
103 CancelIoExFunc cancel_io_ex_; 77 CancelIoExFunc cancel_io_ex_;
104 GetFileInformationByHandleExFunc get_file_information_by_handle_ex_; 78 GetFileInformationByHandleExFunc get_file_information_by_handle_ex_;
105 }; 79 };
106 80
107 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions = 81 base::LazyInstance<VistaOrHigherFunctions> g_vista_or_higher_functions =
108 LAZY_INSTANCE_INITIALIZER; 82 LAZY_INSTANCE_INITIALIZER;
109 83
110 class RawChannelWin final : public RawChannel { 84 class RawChannelWin final : public RawChannel {
111 public: 85 public:
112 RawChannelWin(ScopedPlatformHandle handle) 86 RawChannelWin(ScopedPlatformHandle handle)
113 : handle_(handle.Pass()), 87 : handle_(handle.Pass()), io_handler_(nullptr) {
114 io_handler_(nullptr),
115 skip_completion_port_on_success_(
116 g_use_iocp &&
117 g_vista_or_higher_functions.Get().is_vista_or_higher()) {
118 DCHECK(handle_.is_valid()); 88 DCHECK(handle_.is_valid());
119 } 89 }
120 ~RawChannelWin() override { 90 ~RawChannelWin() override {
121 DCHECK(!io_handler_); 91 DCHECK(!io_handler_);
122 } 92 }
123 93
124 private: 94 private:
125 // RawChannelIOHandler receives OS notifications for I/O completion. It must 95 // RawChannelIOHandler receives OS notifications for I/O completion. Currently
126 // be created on the I/O thread. 96 // this object is only used on the IO thread, other than ReleaseHandle. But
97 // there's nothing preventing using this on other threads.
127 // 98 //
128 // It manages its own destruction. Destruction happens on the I/O thread when 99 // It manages its own destruction. Destruction happens on the I/O thread when
129 // all the following conditions are satisfied: 100 // all the following conditions are satisfied:
130 // - |DetachFromOwnerNoLock()| has been called; 101 // - |DetachFromOwnerNoLock()| has been called;
131 // - there is no pending read; 102 // - there is no pending read;
132 // - there is no pending write. 103 // - there is no pending write.
133 class RawChannelIOHandler : public base::MessageLoopForIO::IOHandler, 104 class RawChannelIOHandler {
134 public base::win::ObjectWatcher::Delegate {
135 public: 105 public:
136 RawChannelIOHandler(RawChannelWin* owner, 106 RawChannelIOHandler(RawChannelWin* owner,
137 ScopedPlatformHandle handle) 107 ScopedPlatformHandle handle)
138 : handle_(handle.Pass()), 108 : handle_(handle.Pass()),
139 owner_(owner), 109 owner_(owner),
140 suppress_self_destruct_(false), 110 suppress_self_destruct_(false),
141 pending_read_(false), 111 pending_read_(false),
142 pending_write_(false), 112 pending_write_(false),
143 platform_handles_written_(0) { 113 platform_handles_written_(0),
114 read_event_(CreateEvent(NULL, FALSE, FALSE, NULL)),
115 write_event_(CreateEvent(NULL, FALSE, FALSE, NULL)),
116 read_wait_object_(NULL),
117 write_wait_object_(NULL),
118 read_event_signalled_(false),
119 write_event_signalled_(false),
120 message_loop_for_io_(base::MessageLoop::current()->task_runner()),
121 weak_ptr_factory_(this) {
144 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped)); 122 memset(&read_context_.overlapped, 0, sizeof(read_context_.overlapped));
145 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped)); 123 memset(&write_context_.overlapped, 0, sizeof(write_context_.overlapped));
146 if (g_use_iocp) { 124 read_context_.overlapped.hEvent = read_event_.Get();
147 owner_->message_loop_for_io()->RegisterIOHandler( 125 write_context_.overlapped.hEvent = write_event_.Get();
148 handle_.get().handle, this);
149 read_context_.handler = this;
150 write_context_.handler = this;
151 } else {
152 read_event = CreateEvent(
153 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
154 write_event = CreateEvent(
155 NULL, g_use_autoreset_event ? FALSE : TRUE, FALSE, NULL);
156 read_context_.overlapped.hEvent = read_event;
157 write_context_.overlapped.hEvent = write_event;
158 126
159 127 this_weakptr_ = weak_ptr_factory_.GetWeakPtr();
160 if (g_use_autoreset_event) { 128 RegisterWaitForSingleObject(&read_wait_object_, read_event_.Get(),
161 read_watcher_.StartWatchingMultipleTimes(read_event, this); 129 ReadCompleted, this, INFINITE, WT_EXECUTEINWAITTHREAD);
162 write_watcher_.StartWatchingMultipleTimes(write_event, this); 130 RegisterWaitForSingleObject(&write_wait_object_, write_event_.Get(),
163 } 131 WriteCompleted, this, INFINITE, WT_EXECUTEINWAITTHREAD);
164 }
165 } 132 }
166 133
167 ~RawChannelIOHandler() override { 134 ~RawChannelIOHandler() {
135 if (read_wait_object_)
136 UnregisterWaitEx(read_wait_object_, INVALID_HANDLE_VALUE);
137
138 if (write_wait_object_)
139 UnregisterWaitEx(write_wait_object_, INVALID_HANDLE_VALUE);
168 DCHECK(ShouldSelfDestruct()); 140 DCHECK(ShouldSelfDestruct());
169 } 141 }
170 142
171 HANDLE handle() const { return handle_.get().handle; } 143 HANDLE handle() const { return handle_.get().handle; }
172 144
173 // The following methods are only called by the owner on the I/O thread. 145 // The following methods are only called by the owner on the I/O thread.
174 bool pending_read() const { 146 bool pending_read() const {
175 DCHECK(owner_); 147 DCHECK(owner_);
176 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 148 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
177 return pending_read_; 149 return pending_read_;
178 } 150 }
179 151
180 base::MessageLoopForIO::IOContext* read_context() { 152 base::MessageLoopForIO::IOContext* read_context() {
181 DCHECK(owner_); 153 DCHECK(owner_);
182 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 154 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
183 return &read_context_; 155 return &read_context_;
184 } 156 }
185 157
186 // Instructs the object to wait for an |OnIOCompleted()| notification. 158 // Instructs the object to wait for an OnObjectSignaled notification.
187 void OnPendingReadStarted() { 159 void OnPendingReadStarted() {
188 DCHECK(owner_); 160 DCHECK(owner_);
189 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 161 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
190 DCHECK(!pending_read_); 162 DCHECK(!pending_read_);
191 pending_read_ = true; 163 pending_read_ = true;
164 read_event_signalled_ = false;
192 } 165 }
193 166
194 // The following methods are only called by the owner under 167 // The following methods are only called by the owner under
195 // |owner_->write_lock()|. 168 // |owner_->write_lock()|.
196 bool pending_write_no_lock() const { 169 bool pending_write_no_lock() const {
197 DCHECK(owner_); 170 DCHECK(owner_);
198 owner_->write_lock().AssertAcquired(); 171 owner_->write_lock().AssertAcquired();
199 return pending_write_; 172 return pending_write_;
200 } 173 }
201 174
202 base::MessageLoopForIO::IOContext* write_context_no_lock() { 175 base::MessageLoopForIO::IOContext* write_context_no_lock() {
203 DCHECK(owner_); 176 DCHECK(owner_);
204 owner_->write_lock().AssertAcquired(); 177 owner_->write_lock().AssertAcquired();
205 return &write_context_; 178 return &write_context_;
206 } 179 }
207 // Instructs the object to wait for an |OnIOCompleted()| notification. 180
181 // Instructs the object to wait for an OnObjectSignaled notification.
208 void OnPendingWriteStartedNoLock(size_t platform_handles_written) { 182 void OnPendingWriteStartedNoLock(size_t platform_handles_written) {
209 DCHECK(owner_); 183 DCHECK(owner_);
210 owner_->write_lock().AssertAcquired(); 184 owner_->write_lock().AssertAcquired();
211 DCHECK(!pending_write_); 185 DCHECK(!pending_write_);
212 pending_write_ = true; 186 pending_write_ = true;
187 write_event_signalled_ = false;
213 platform_handles_written_ = platform_handles_written; 188 platform_handles_written_ = platform_handles_written;
214 } 189 }
215 190
216 // |base::MessageLoopForIO::IOHandler| implementation: 191 // Must be called on the I/O thread under read and write locks.
217 // Must be called on the I/O thread. It could be called before or after
218 // detached from the owner.
219 void OnIOCompleted(base::MessageLoopForIO::IOContext* context,
220 DWORD bytes_transferred,
221 DWORD error) override {
222 DCHECK(!owner_ ||
223 base::MessageLoop::current() == owner_->message_loop_for_io());
224
225 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case
226 // they result in a call to |Shutdown()|).
227 bool old_suppress_self_destruct = suppress_self_destruct_;
228 suppress_self_destruct_ = true;
229
230 if (context == &read_context_)
231 OnReadCompleted(bytes_transferred, error);
232 else if (context == &write_context_)
233 OnWriteCompleted(bytes_transferred, error);
234 else
235 NOTREACHED();
236
237 // Maybe allow self-destruction again.
238 suppress_self_destruct_ = old_suppress_self_destruct;
239
240 if (ShouldSelfDestruct())
241 delete this;
242 }
243
244 // Must be called on the I/O thread under |owner_->write_lock()|.
245 // After this call, the owner must not make any further calls on this 192 // After this call, the owner must not make any further calls on this
246 // object, and therefore the object is used on the I/O thread exclusively 193 // object, and therefore the object is used on the I/O thread exclusively
247 // (if it stays alive). 194 // (if it stays alive).
248 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer, 195 void DetachFromOwnerNoLock(scoped_ptr<ReadBuffer> read_buffer,
249 scoped_ptr<WriteBuffer> write_buffer) { 196 scoped_ptr<WriteBuffer> write_buffer) {
250 DCHECK(owner_); 197 DCHECK(owner_);
251 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io()); 198 DCHECK_EQ(base::MessageLoop::current(), owner_->message_loop_for_io());
252 //owner_->write_lock().AssertAcquired(); 199 owner_->read_lock().AssertAcquired();
200 owner_->write_lock().AssertAcquired();
253 201
254 // If read/write is pending, we have to retain the corresponding buffer. 202 // If read/write is pending, we have to retain the corresponding buffer.
255 if (pending_read_) 203 if (pending_read_)
256 preserved_read_buffer_after_detach_ = read_buffer.Pass(); 204 preserved_read_buffer_after_detach_ = read_buffer.Pass();
257 if (pending_write_) 205 if (pending_write_)
258 preserved_write_buffer_after_detach_ = write_buffer.Pass(); 206 preserved_write_buffer_after_detach_ = write_buffer.Pass();
259 207
260 owner_ = nullptr; 208 owner_ = nullptr;
261 if (ShouldSelfDestruct()) 209 if (ShouldSelfDestruct())
262 delete this; 210 delete this;
263 } 211 }
264 212
265 ScopedPlatformHandle ReleaseHandle( 213 ScopedPlatformHandle ReleaseHandle(
266 std::vector<char>* serialized_read_buffer, 214 std::vector<char>* serialized_read_buffer,
267 std::vector<char>* serialized_write_buffer) { 215 std::vector<char>* serialized_write_buffer) {
268 // Cancel pending IO calls. 216 // Cancel pending IO calls.
269 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) { 217 if (g_vista_or_higher_functions.Get().is_vista_or_higher()) {
270 g_vista_or_higher_functions.Get().CancelIoEx(handle(), nullptr); 218 g_vista_or_higher_functions.Get().CancelIoEx(handle(), nullptr);
271 } else { 219 } else {
272 CHECK(false) << "TODO(jam): handle XP"; 220 CHECK(false) << "TODO(jam): handle XP";
273 } 221 }
274 222
275 size_t additional_bytes_read = 0; 223 size_t additional_bytes_read = 0;
276 if (pending_read_) { 224 if (pending_read_) {
225 bool wait = false;
226 UnregisterWaitEx(read_wait_object_, INVALID_HANDLE_VALUE);
227 read_wait_object_ = NULL;
228 if (!read_event_signalled_)
229 wait = true;
277 DWORD bytes_read_dword = 0; 230 DWORD bytes_read_dword = 0;
278 231
279 DWORD old_bytes = read_context_.overlapped.InternalHigh;
280
281 // Since we cancelled pending IO calls above, we need to know if the 232 // Since we cancelled pending IO calls above, we need to know if the
282 // read did succeed (i.e. it completed and there's a pending task posted 233 // read did succeed (i.e. it completed and there's a pending task posted
283 // to alert us) or if it was cancelled. This important because if the 234 // to alert us) or if it was cancelled. This important because if the
284 // read completed, we don't want to serialize those bytes again. 235 // read completed, we don't want to serialize those bytes again.
285 //TODO(jam): for XP, can return TRUE here to wait. also below. 236 // TODO(jam): for XP, can return TRUE here to wait. also below.
286 BOOL rv = GetOverlappedResult( 237 BOOL rv = GetOverlappedResult(
287 handle(), &read_context_.overlapped, &bytes_read_dword, FALSE); 238 handle(), &read_context_.overlapped, &bytes_read_dword,
288 DCHECK_EQ(old_bytes, bytes_read_dword); 239 wait ? TRUE : FALSE);
289 if (rv && read_context_.overlapped.Internal != STATUS_CANCELLED) { 240 if (rv && read_context_.overlapped.Internal != STATUS_CANCELLED)
290 additional_bytes_read = 241 additional_bytes_read = bytes_read_dword;
291 static_cast<size_t>(read_context_.overlapped.InternalHigh);
292 }
293 pending_read_ = false; 242 pending_read_ = false;
294 } 243 }
295 244
296 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock(); 245 RawChannel::WriteBuffer* write_buffer = owner_->write_buffer_no_lock();
297 246
298 size_t additional_bytes_written = 0; 247 size_t additional_bytes_written = 0;
299 size_t additional_platform_handles_written = 0; 248 size_t additional_platform_handles_written = 0;
300 if (pending_write_) { 249 if (pending_write_) {
250 bool wait = false;
251 UnregisterWaitEx(write_wait_object_, INVALID_HANDLE_VALUE);
252 write_wait_object_ = NULL;
253 if (!write_event_signalled_)
254 wait = true;
255
301 DWORD bytes_written_dword = 0; 256 DWORD bytes_written_dword = 0;
302 DWORD old_bytes = write_context_.overlapped.InternalHigh;
303 257
304 // See comment above. 258 // See comment above.
305 BOOL rv = GetOverlappedResult( 259 BOOL rv = GetOverlappedResult(
306 handle(), &write_context_.overlapped, &bytes_written_dword, FALSE); 260 handle(), &write_context_.overlapped, &bytes_written_dword,
261 wait ? TRUE : FALSE);
307 262
308 DCHECK_EQ(old_bytes, bytes_written_dword);
309 if (rv && write_context_.overlapped.Internal != STATUS_CANCELLED) { 263 if (rv && write_context_.overlapped.Internal != STATUS_CANCELLED) {
310 CHECK(!write_buffer->IsEmpty()); 264 CHECK(!write_buffer->IsEmpty());
311 265
312 additional_bytes_written = static_cast<size_t>(bytes_written_dword); 266 additional_bytes_written = static_cast<size_t>(bytes_written_dword);
313 additional_platform_handles_written = platform_handles_written_; 267 additional_platform_handles_written = platform_handles_written_;
314 platform_handles_written_ = 0; 268 platform_handles_written_ = 0;
315 } 269 }
316 pending_write_ = false; 270 pending_write_ = false;
317 } 271 }
318 272
319 owner_->SerializeReadBuffer( 273 owner_->SerializeReadBuffer(
320 additional_bytes_read, serialized_read_buffer); 274 additional_bytes_read, serialized_read_buffer);
321 owner_->SerializeWriteBuffer( 275 owner_->SerializeWriteBuffer(
322 serialized_write_buffer, additional_bytes_written, 276 additional_bytes_written, additional_platform_handles_written,
323 additional_platform_handles_written); 277 serialized_write_buffer);
278
279 // There's a PostTask inside RawChannel because an error over the channel
280 // occurred. We need to propagate this, otherwise the object using this
281 // channel will never get a peer-closed signal.
282 if (owner_->pending_error()) {
283 handle_.reset();
yzshen1 2015/10/14 21:37:00 Just to make sure I understand: does this part nee
jam 2015/10/14 22:35:35 yep
284 serialized_read_buffer->clear();
285 serialized_write_buffer->clear();
286 return ScopedPlatformHandle();
287 }
324 288
325 return ScopedPlatformHandle(handle_.release()); 289 return ScopedPlatformHandle(handle_.release());
326 } 290 }
327 291
328 void OnObjectSignaled(HANDLE object) override {
329 // Since this is called on the IO thread, no locks needed for owner_.
330 bool handle_is_valid = false;
331 if (owner_)
332 owner_->read_lock().Acquire();
333 handle_is_valid = handle_.is_valid();
334 if (owner_)
335 owner_->read_lock().Release();
336 if (!handle_is_valid) {
337 if (object == read_event)
338 pending_read_ = false;
339 else
340 pending_write_ = false;
341 if (ShouldSelfDestruct())
342 delete this;
343 return;
344 }
345
346 if (object == read_event) {
347 OnIOCompleted(&read_context_, read_context_.overlapped.InternalHigh,
348 read_context_.overlapped.Internal);
349
350 } else {
351 CHECK(object == write_event);
352 OnIOCompleted(&write_context_, write_context_.overlapped.InternalHigh,
353 write_context_.overlapped.Internal);
354 }
355 }
356 HANDLE read_event, write_event;
357 base::win::ObjectWatcher read_watcher_, write_watcher_;
358
359 private: 292 private:
360 // Returns true if |owner_| has been reset and there is not pending read or 293 // Returns true if |owner_| has been reset and there is not pending read or
361 // write. 294 // write.
362 // Must be called on the I/O thread. 295 // Must be called on the I/O thread.
363 bool ShouldSelfDestruct() const { 296 bool ShouldSelfDestruct() const {
364 if (owner_ || suppress_self_destruct_) 297 if (owner_ || suppress_self_destruct_)
365 return false; 298 return false;
366 299
367 // Note: Detached, hence no lock needed for |pending_write_|. 300 // Note: Detached, hence no lock needed for |pending_write_|.
368 return !pending_read_ && !pending_write_; 301 return !pending_read_ && !pending_write_;
369 } 302 }
370 303
371 // Must be called on the I/O thread. It may be called before or after 304 // Must be called on the I/O thread. It may be called before or after
372 // detaching from the owner. 305 // detaching from the owner.
373 void OnReadCompleted(DWORD bytes_read, DWORD error) { 306 void OnReadCompleted(DWORD bytes_read, DWORD error) {
374 DCHECK(!owner_ || 307 DCHECK(!owner_ ||
375 base::MessageLoop::current() == owner_->message_loop_for_io()); 308 base::MessageLoop::current() == owner_->message_loop_for_io());
376 DCHECK(suppress_self_destruct_); 309 DCHECK(suppress_self_destruct_);
310 if (!owner_) {
311 pending_read_ = false;
312 return;
313 }
377 314
378 if (g_use_autoreset_event && !pending_read_) 315 // Must acquire the read lock before we update pending_read_, since
316 // otherwise there is a race condition in ReleaseHandle if this method
317 // sets it to false but ReleaseHandle acquired read lock. It would then
318 // think there's no pending read and miss the read bytes.
319 base::AutoLock locker(owner_->read_lock());
320
321 // This can happen if ReleaseHandle was called and it set pending_read to
322 // false. We don't want to call owner_->OnReadCompletedNoLock since the
323 // read_buffer has been freed.
324 if (!pending_read_)
379 return; 325 return;
380 326
381 CHECK(pending_read_); 327 CHECK(pending_read_);
382 pending_read_ = false; 328 pending_read_ = false;
383 if (!owner_)
384 return;
385 329
386 // Note: |OnReadCompleted()| may detach us from |owner_|. 330 // Note: |OnReadCompleted()| may detach us from |owner_|.
387 if (error == ERROR_SUCCESS || 331 if (error == ERROR_SUCCESS || error == ERROR_NO_MORE_ITEMS) {
388 (g_use_autoreset_event && error == ERROR_NO_MORE_ITEMS)) {
389 DCHECK_GT(bytes_read, 0u); 332 DCHECK_GT(bytes_read, 0u);
390 owner_->OnReadCompleted(IO_SUCCEEDED, bytes_read); 333 owner_->OnReadCompletedNoLock(IO_SUCCEEDED, bytes_read);
391 } else if (error == ERROR_BROKEN_PIPE || 334 } else if (error == ERROR_BROKEN_PIPE || error == STATUS_PIPE_BROKEN) {
yzshen1 2015/10/14 21:37:00 I vaguely remember that you mentioned that at the
392 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) {
393 DCHECK_EQ(bytes_read, 0u); 335 DCHECK_EQ(bytes_read, 0u);
394 owner_->OnReadCompleted(IO_FAILED_SHUTDOWN, 0); 336 owner_->OnReadCompletedNoLock(IO_FAILED_SHUTDOWN, 0);
395 } else { 337 } else {
396 DCHECK_EQ(bytes_read, 0u); 338 DCHECK_EQ(bytes_read, 0u);
397 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); 339 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
398 owner_->OnReadCompleted(IO_FAILED_UNKNOWN, 0); 340 owner_->OnReadCompletedNoLock(IO_FAILED_UNKNOWN, 0);
399 } 341 }
400 } 342 }
401 343
402 // Must be called on the I/O thread. It may be called before or after 344 // Must be called on the I/O thread. It may be called before or after
403 // detaching from the owner. 345 // detaching from the owner.
404 void OnWriteCompleted(DWORD bytes_written, DWORD error) { 346 void OnWriteCompleted(DWORD bytes_written, DWORD error) {
405 DCHECK(!owner_ || 347 DCHECK(!owner_ ||
406 base::MessageLoop::current() == owner_->message_loop_for_io()); 348 base::MessageLoop::current() == owner_->message_loop_for_io());
407 DCHECK(suppress_self_destruct_); 349 DCHECK(suppress_self_destruct_);
408 350
409 if (!owner_) { 351 if (!owner_) {
410 // No lock needed. 352 // No lock needed.
411 CHECK(pending_write_); 353 CHECK(pending_write_);
412 pending_write_ = false; 354 pending_write_ = false;
413 return; 355 return;
414 } 356 }
415 357
416 { 358 base::AutoLock locker(owner_->write_lock());
417 base::AutoLock locker(owner_->write_lock()); 359 // This can happen if ReleaseHandle was called and it set pending_write to
418 if (g_use_autoreset_event && !pending_write_) 360 // false. We don't want to call owner_->OnWriteCompletedNoLock since the
419 return; 361 // write_buffer has been freed.
362 if (!pending_write_)
363 return;
420 364
421 CHECK(pending_write_); 365 CHECK(pending_write_);
422 pending_write_ = false; 366 pending_write_ = false;
423 }
424 367
425 // Note: |OnWriteCompleted()| may detach us from |owner_|. 368 // Note: |OnWriteCompleted()| may detach us from |owner_|.
426 if (error == ERROR_SUCCESS || 369 if (error == ERROR_SUCCESS || error == ERROR_NO_MORE_ITEMS) {
427 (g_use_autoreset_event && error == ERROR_NO_MORE_ITEMS)) {
428 // Reset |platform_handles_written_| before calling |OnWriteCompleted()| 370 // Reset |platform_handles_written_| before calling |OnWriteCompleted()|
429 // since that function may call back to this class and set it again. 371 // since that function may call back to this class and set it again.
430 size_t local_platform_handles_written = platform_handles_written_; 372 size_t local_platform_handles_written = platform_handles_written_;
431 platform_handles_written_ = 0; 373 platform_handles_written_ = 0;
432 owner_->OnWriteCompleted(IO_SUCCEEDED, local_platform_handles_written, 374 owner_->OnWriteCompletedNoLock(
433 bytes_written); 375 IO_SUCCEEDED, local_platform_handles_written, bytes_written);
434 } else if (error == ERROR_BROKEN_PIPE || 376 } else if (error == ERROR_BROKEN_PIPE || error == STATUS_PIPE_BROKEN) {
435 (g_use_autoreset_event && error == STATUS_PIPE_BROKEN)) { 377 owner_->OnWriteCompletedNoLock(IO_FAILED_SHUTDOWN, 0, 0);
436 owner_->OnWriteCompleted(IO_FAILED_SHUTDOWN, 0, 0);
437 } else { 378 } else {
438 LOG(WARNING) << "WriteFile: " 379 LOG(WARNING) << "WriteFile: "
439 << logging::SystemErrorCodeToString(error); 380 << logging::SystemErrorCodeToString(error);
440 owner_->OnWriteCompleted(IO_FAILED_UNKNOWN, 0, 0); 381 owner_->OnWriteCompletedNoLock(IO_FAILED_UNKNOWN, 0, 0);
441 } 382 }
442 } 383 }
443 384
385 void OnObjectSignaled(HANDLE object) {
386 DCHECK(!owner_ ||
387 base::MessageLoop::current() == owner_->message_loop_for_io());
388 // Since this is called on the IO thread, no locks needed for owner_.
389 bool handle_is_valid = false;
390 if (owner_)
391 owner_->read_lock().Acquire();
392 handle_is_valid = handle_.is_valid();
393 if (owner_)
394 owner_->read_lock().Release();
395 if (!handle_is_valid) {
396 if (object == read_event_.Get())
397 pending_read_ = false;
398 else
399 pending_write_ = false;
400 if (ShouldSelfDestruct())
401 delete this;
402 return;
403 }
404
405 // Suppress self-destruction inside |OnReadCompleted()|, etc. (in case
406 // they result in a call to |Shutdown()|).
407 bool old_suppress_self_destruct = suppress_self_destruct_;
408 suppress_self_destruct_ = true;
409 if (object == read_event_.Get()) {
410 OnReadCompleted(read_context_.overlapped.InternalHigh,
411 read_context_.overlapped.Internal);
412 } else {
413 CHECK(object == write_event_.Get());
414 OnWriteCompleted(write_context_.overlapped.InternalHigh,
415 write_context_.overlapped.Internal);
416 }
417
418 // Maybe allow self-destruction again.
419 suppress_self_destruct_ = old_suppress_self_destruct;
420
421 if (ShouldSelfDestruct())
422 delete this;
423 }
424
425 static void CALLBACK ReadCompleted(void* param, BOOLEAN timed_out) {
426 DCHECK(!timed_out);
427 // The destructor blocks on any callbacks that are in flight, so we know
428 // that that is always a pointer to a valid RawChannelIOHandler.
429 RawChannelIOHandler* that = static_cast<RawChannelIOHandler*>(param);
430 that->read_event_signalled_ = true;
yzshen1 2015/10/14 21:37:00 Is it safe to set the value outside of lock? Relea
jam 2015/10/14 22:35:35 ReleaseHandle only checks it after calling Unregis
431 that->message_loop_for_io_->PostTask(
432 FROM_HERE,
433 base::Bind(&RawChannelIOHandler::OnObjectSignaled,
434 that->this_weakptr_, that->read_event_.Get()));
435 }
436
437 static void CALLBACK WriteCompleted(void* param, BOOLEAN timed_out) {
438 DCHECK(!timed_out);
439 // The destructor blocks on any callbacks that are in flight, so we know
440 // that that is always a pointer to a valid RawChannelIOHandler.
441 RawChannelIOHandler* that = static_cast<RawChannelIOHandler*>(param);
442 that->write_event_signalled_ = true;
yzshen1 2015/10/14 21:37:00 ditto.
443 that->message_loop_for_io_->PostTask(
444 FROM_HERE,
445 base::Bind(&RawChannelIOHandler::OnObjectSignaled,
446 that->this_weakptr_, that->write_event_.Get()));
447 }
448
444 ScopedPlatformHandle handle_; 449 ScopedPlatformHandle handle_;
445 450
446 // |owner_| is reset on the I/O thread under |owner_->write_lock()|. 451 // |owner_| is reset on the I/O thread under |owner_->write_lock()|.
447 // Therefore, it may be used on any thread under lock; or on the I/O thread 452 // Therefore, it may be used on any thread under lock; or on the I/O thread
448 // without locking. 453 // without locking.
449 RawChannelWin* owner_; 454 RawChannelWin* owner_;
450 455
451 // The following members must be used on the I/O thread. 456 // The following members must be used on the I/O thread.
452 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_; 457 scoped_ptr<ReadBuffer> preserved_read_buffer_after_detach_;
453 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_; 458 scoped_ptr<WriteBuffer> preserved_write_buffer_after_detach_;
454 bool suppress_self_destruct_; 459 bool suppress_self_destruct_;
455 460
456 bool pending_read_; 461 bool pending_read_;
457 base::MessageLoopForIO::IOContext read_context_; 462 base::MessageLoopForIO::IOContext read_context_;
458 463
459 // The following members must be used under |owner_->write_lock()| while the 464 // The following members must be used under |owner_->write_lock()| while the
460 // object is still attached to the owner, and only on the I/O thread 465 // object is still attached to the owner, and only on the I/O thread
461 // afterwards. 466 // afterwards.
462 bool pending_write_; 467 bool pending_write_;
463 size_t platform_handles_written_; 468 size_t platform_handles_written_;
464 base::MessageLoopForIO::IOContext write_context_; 469 base::MessageLoopForIO::IOContext write_context_;
465 470
471 base::win::ScopedHandle read_event_;
472 base::win::ScopedHandle write_event_;
473
474 HANDLE read_wait_object_;
475 HANDLE write_wait_object_;
476
477 // Since we use auto-reset event, these variables let ReleaseHandle know if
478 // UnregisterWaitEx ended up running a callback or not.
479 bool read_event_signalled_;
480 bool write_event_signalled_;
481
482 // These are used by the callbacks for the wait event watchers.
483 scoped_refptr<base::SingleThreadTaskRunner> message_loop_for_io_;
484 base::WeakPtr<RawChannelIOHandler> this_weakptr_;
485 base::WeakPtrFactory<RawChannelIOHandler> weak_ptr_factory_;
486
466 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler); 487 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelIOHandler);
467 }; 488 };
468 489
469 ScopedPlatformHandle ReleaseHandleNoLock( 490 ScopedPlatformHandle ReleaseHandleNoLock(
470 std::vector<char>* serialized_read_buffer, 491 std::vector<char>* serialized_read_buffer,
471 std::vector<char>* serialized_write_buffer) override { 492 std::vector<char>* serialized_write_buffer) override {
472 if (handle_.is_valid()) { 493 if (handle_.is_valid()) {
473 // SetInitialBuffer could have been called on main thread before OnInit 494 // SetInitialBuffer could have been called on main thread before OnInit
474 // is called on Io thread. and in meantime releasehandle called. 495 // is called on Io thread. and in meantime releasehandle called.
475 SerializeReadBuffer(0u, serialized_read_buffer); 496 SerializeReadBuffer(0u, serialized_read_buffer);
476 497
477 // We could have been given messages to write before OnInit. 498 // We could have been given messages to write before OnInit.
478 SerializeWriteBuffer(serialized_write_buffer, 0u, 0u); 499 SerializeWriteBuffer(0u, 0u, serialized_write_buffer);
479 500
480 return ScopedPlatformHandle(PlatformHandle(handle_.release().handle)); 501 return ScopedPlatformHandle(PlatformHandle(handle_.release().handle));
481 } 502 }
482 503
483 return io_handler_->ReleaseHandle(serialized_read_buffer, 504 return io_handler_->ReleaseHandle(serialized_read_buffer,
484 serialized_write_buffer); 505 serialized_write_buffer);
485 } 506 }
507
486 PlatformHandle HandleForDebuggingNoLock() override { 508 PlatformHandle HandleForDebuggingNoLock() override {
487 if (handle_.is_valid()) 509 if (handle_.is_valid())
488 return handle_.get(); 510 return handle_.get();
489 511
490 if (!io_handler_) 512 if (!io_handler_)
491 return PlatformHandle(); 513 return PlatformHandle();
492 514
493 return PlatformHandle(io_handler_->handle()); 515 return PlatformHandle(io_handler_->handle());
494 } 516 }
495 517
(...skipping 12 matching lines...) Expand all
508 if (!result) { 530 if (!result) {
509 DWORD error = GetLastError(); 531 DWORD error = GetLastError();
510 if (error == ERROR_BROKEN_PIPE) 532 if (error == ERROR_BROKEN_PIPE)
511 return IO_FAILED_SHUTDOWN; 533 return IO_FAILED_SHUTDOWN;
512 if (error != ERROR_IO_PENDING) { 534 if (error != ERROR_IO_PENDING) {
513 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error); 535 LOG(WARNING) << "ReadFile: " << logging::SystemErrorCodeToString(error);
514 return IO_FAILED_UNKNOWN; 536 return IO_FAILED_UNKNOWN;
515 } 537 }
516 } 538 }
517 539
518 if (result && skip_completion_port_on_success_) {
519 DWORD bytes_read_dword = 0;
520 BOOL get_size_result = GetOverlappedResult(
521 io_handler_->handle(), &io_handler_->read_context()->overlapped,
522 &bytes_read_dword, FALSE);
523 DPCHECK(get_size_result);
524 *bytes_read = bytes_read_dword;
525 return IO_SUCCEEDED;
526 }
527
528 if (!g_use_autoreset_event) {
529 if (!g_use_iocp) {
530 io_handler_->read_watcher_.StartWatchingOnce(
531 io_handler_->read_event, io_handler_);
532 }
533 }
534 // If the read is pending or the read has succeeded but we don't skip 540 // If the read is pending or the read has succeeded but we don't skip
535 // completion port on success, instruct |io_handler_| to wait for the 541 // completion port on success, instruct |io_handler_| to wait for the
536 // completion packet. 542 // completion packet.
537 // 543 //
538 // TODO(yzshen): It seems there isn't document saying that all error cases 544 // TODO(yzshen): It seems there isn't document saying that all error cases
539 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion 545 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
540 // packet. If we do get one for errors, 546 // packet. If we do get one for errors, OnObjectSignaled()| will crash so we
541 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about 547 // will learn about it.
542 // it.
543
544 io_handler_->OnPendingReadStarted(); 548 io_handler_->OnPendingReadStarted();
545 return IO_PENDING; 549 return IO_PENDING;
546 } 550 }
547 551
548 IOResult ScheduleRead() override { 552 IOResult ScheduleRead() override {
549 if (!io_handler_) 553 if (!io_handler_)
550 return IO_PENDING; // OnInit could have earlied out. 554 return IO_PENDING; // OnInit could have earlied out.
551 555
552 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 556 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
553 DCHECK(io_handler_); 557 DCHECK(io_handler_);
554 DCHECK(!io_handler_->pending_read()); 558 DCHECK(!io_handler_->pending_read());
555 559
556 size_t bytes_read = 0; 560 size_t bytes_read = 0;
557 IOResult io_result = Read(&bytes_read); 561 return Read(&bytes_read);
558 if (io_result == IO_SUCCEEDED) { 562 }
559 DCHECK(skip_completion_port_on_success_);
560 563
561 // We have finished reading successfully. Queue a notification manually.
562 io_handler_->OnPendingReadStarted();
563 // |io_handler_| won't go away before the task is run, so it is safe to
564 // use |base::Unretained()|.
565 message_loop_for_io()->PostTask(
566 FROM_HERE, base::Bind(&RawChannelIOHandler::OnIOCompleted,
567 base::Unretained(io_handler_),
568 base::Unretained(io_handler_->read_context()),
569 static_cast<DWORD>(bytes_read), ERROR_SUCCESS));
570 return IO_PENDING;
571 }
572
573 return io_result;
574 }
575 ScopedPlatformHandleVectorPtr GetReadPlatformHandles( 564 ScopedPlatformHandleVectorPtr GetReadPlatformHandles(
576 size_t num_platform_handles, 565 size_t num_platform_handles,
577 const void* platform_handle_table) override { 566 const void* platform_handle_table) override {
578 // TODO(jam): this code will have to be updated once it's used in a sandbox 567 // TODO(jam): this code will have to be updated once it's used in a sandbox
579 // and the receiving process doesn't have duplicate permission for the 568 // and the receiving process doesn't have duplicate permission for the
580 // receiver. Once there's a broker and we have a connection to it (possibly 569 // receiver. Once there's a broker and we have a connection to it (possibly
581 // through ConnectionManager), then we can make a sync IPC to it here to get 570 // through ConnectionManager), then we can make a sync IPC to it here to get
582 // a token for this handle, and it will duplicate the handle to is process. 571 // a token for this handle, and it will duplicate the handle to is process.
583 // Then we pass the token to the receiver, which will then make a sync call 572 // Then we pass the token to the receiver, which will then make a sync call
584 // to the broker to get a duplicated handle. This will also allow us to 573 // to the broker to get a duplicated handle. This will also allow us to
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
658 DWORD error = GetLastError(); 647 DWORD error = GetLastError();
659 if (error == ERROR_BROKEN_PIPE) 648 if (error == ERROR_BROKEN_PIPE)
660 return IO_FAILED_SHUTDOWN; 649 return IO_FAILED_SHUTDOWN;
661 if (error != ERROR_IO_PENDING) { 650 if (error != ERROR_IO_PENDING) {
662 LOG(WARNING) << "WriteFile: " 651 LOG(WARNING) << "WriteFile: "
663 << logging::SystemErrorCodeToString(error); 652 << logging::SystemErrorCodeToString(error);
664 return IO_FAILED_UNKNOWN; 653 return IO_FAILED_UNKNOWN;
665 } 654 }
666 } 655 }
667 656
668 if (result && skip_completion_port_on_success_) {
669 *platform_handles_written = num_platform_handles;
670 *bytes_written = bytes_written_dword;
671 return IO_SUCCEEDED;
672 }
673
674 if (!g_use_autoreset_event) {
675 if (!g_use_iocp) {
676 io_handler_->write_watcher_.StartWatchingOnce(
677 io_handler_->write_event, io_handler_);
678 }
679 }
680 // If the write is pending or the write has succeeded but we don't skip 657 // If the write is pending or the write has succeeded but we don't skip
681 // completion port on success, instruct |io_handler_| to wait for the 658 // completion port on success, instruct |io_handler_| to wait for the
682 // completion packet. 659 // completion packet.
683 // 660 //
684 // TODO(yzshen): it seems there isn't document saying that all error cases 661 // TODO(yzshen): it seems there isn't document saying that all error cases
685 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion 662 // (other than ERROR_IO_PENDING) are guaranteed to *not* queue a completion
686 // packet. If we do get one for errors, 663 // packet. If we do get one for errors, OnObjectSignaled will crash so we
687 // |RawChannelIOHandler::OnIOCompleted()| will crash so we will learn about 664 // will learn about it.
688 // it.
689 665
690 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles); 666 io_handler_->OnPendingWriteStartedNoLock(num_platform_handles);
691 return IO_PENDING; 667 return IO_PENDING;
692 } 668 }
693 669
694 IOResult ScheduleWriteNoLock() override { 670 IOResult ScheduleWriteNoLock() override {
695 write_lock().AssertAcquired(); 671 write_lock().AssertAcquired();
696 672
697 DCHECK(io_handler_); 673 DCHECK(io_handler_);
698 DCHECK(!io_handler_->pending_write_no_lock()); 674 DCHECK(!io_handler_->pending_write_no_lock());
699 675
700 size_t platform_handles_written = 0; 676 size_t platform_handles_written = 0;
701 size_t bytes_written = 0; 677 size_t bytes_written = 0;
702 IOResult io_result = WriteNoLock(&platform_handles_written, &bytes_written); 678 return WriteNoLock(&platform_handles_written, &bytes_written);
703 if (io_result == IO_SUCCEEDED) {
704 DCHECK(skip_completion_port_on_success_);
705
706 // We have finished writing successfully. Queue a notification manually.
707 io_handler_->OnPendingWriteStartedNoLock(platform_handles_written);
708 // |io_handler_| won't go away before that task is run, so it is safe to
709 // use |base::Unretained()|.
710 message_loop_for_io()->PostTask(
711 FROM_HERE,
712 base::Bind(&RawChannelIOHandler::OnIOCompleted,
713 base::Unretained(io_handler_),
714 base::Unretained(io_handler_->write_context_no_lock()),
715 static_cast<DWORD>(bytes_written), ERROR_SUCCESS));
716 return IO_PENDING;
717 }
718
719 return io_result;
720 } 679 }
721 680
722 void OnInit() override { 681 void OnInit() override {
723 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 682 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
724 683
725 if (!handle_.is_valid()) { 684 if (!handle_.is_valid()) {
726 LOG(ERROR) << "Note: RawChannelWin " << this 685 LOG(ERROR) << "Note: RawChannelWin " << this
727 << " early exiting in OnInit because no handle"; 686 << " early exiting in OnInit because no handle";
728 return; 687 return;
729 } 688 }
730 689
731 DCHECK(handle_.is_valid()); 690 DCHECK(handle_.is_valid());
732 if (skip_completion_port_on_success_) {
733 // I don't know how this can fail (unless |handle_| is bad, in which case
734 // it's a bug in our code).
735 CHECK(g_vista_or_higher_functions.Get().
736 SetFileCompletionNotificationModes(
737 handle_.get().handle, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS));
738 }
739
740 DCHECK(!io_handler_); 691 DCHECK(!io_handler_);
741 io_handler_ = new RawChannelIOHandler(this, handle_.Pass()); 692 io_handler_ = new RawChannelIOHandler(this, handle_.Pass());
742 } 693 }
743 694
744 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer, 695 void OnShutdownNoLock(scoped_ptr<ReadBuffer> read_buffer,
745 scoped_ptr<WriteBuffer> write_buffer) override { 696 scoped_ptr<WriteBuffer> write_buffer) override {
746 // happens on shutdown if didn't call init when doing createduplicate 697 // happens on shutdown if didn't call init when doing createduplicate
747 if (message_loop_for_io()) { 698 if (message_loop_for_io()) {
748 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io()); 699 DCHECK_EQ(base::MessageLoop::current(), message_loop_for_io());
749 } 700 }
(...skipping 22 matching lines...) Expand all
772 723
773 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass()); 724 io_handler_->DetachFromOwnerNoLock(read_buffer.Pass(), write_buffer.Pass());
774 io_handler_ = nullptr; 725 io_handler_ = nullptr;
775 } 726 }
776 727
777 // Passed to |io_handler_| during initialization. 728 // Passed to |io_handler_| during initialization.
778 ScopedPlatformHandle handle_; 729 ScopedPlatformHandle handle_;
779 730
780 RawChannelIOHandler* io_handler_; 731 RawChannelIOHandler* io_handler_;
781 732
782 const bool skip_completion_port_on_success_;
783
784 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin); 733 MOJO_DISALLOW_COPY_AND_ASSIGN(RawChannelWin);
785 }; 734 };
786 735
787 736
788 } // namespace 737 } // namespace
789 738
790 // ----------------------------------------------------------------------------- 739 // -----------------------------------------------------------------------------
791 740
792 RawChannel* RawChannel::Create(ScopedPlatformHandle handle) { 741 RawChannel* RawChannel::Create(ScopedPlatformHandle handle) {
793 return new RawChannelWin(handle.Pass()); 742 return new RawChannelWin(handle.Pass());
(...skipping 22 matching lines...) Expand all
816 return filepath1 == filepath2; 765 return filepath1 == filepath2;
817 } else { 766 } else {
818 // TODO: XP: see http://stackoverflow.com/questions/65170/how-to-get-name-as sociated-with-open-handle/5286888#5286888 767 // TODO: XP: see http://stackoverflow.com/questions/65170/how-to-get-name-as sociated-with-open-handle/5286888#5286888
819 CHECK(false) << "TODO(jam): handle XP"; 768 CHECK(false) << "TODO(jam): handle XP";
820 return false; 769 return false;
821 } 770 }
822 } 771 }
823 772
824 } // namespace edk 773 } // namespace edk
825 } // namespace mojo 774 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/edk/system/raw_channel_posix.cc ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698