Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(144)

Side by Side Diff: net/base/file_stream_win.cc

Issue 10701050: net: Implement canceling of all async operations in FileStream. (Closed) Base URL: https://src.chromium.org/chrome/trunk/src/
Patch Set: Created 8 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/base/file_stream.h" 5 #include "net/base/file_stream_win.h"
6 6
7 #include <windows.h> 7 #include <windows.h>
8 8
9 #include "base/file_path.h" 9 #include "base/file_path.h"
10 #include "base/logging.h" 10 #include "base/logging.h"
11 #include "base/message_loop.h" 11 #include "base/memory/ref_counted.h"
12 #include "base/metrics/histogram.h" 12 #include "base/metrics/histogram.h"
13 #include "base/synchronization/waitable_event.h"
14 #include "base/threading/thread_restrictions.h" 13 #include "base/threading/thread_restrictions.h"
15 #include "base/threading/worker_pool.h" 14 #include "base/threading/worker_pool.h"
16 #include "net/base/file_stream_metrics.h"
17 #include "net/base/file_stream_net_log_parameters.h" 15 #include "net/base/file_stream_net_log_parameters.h"
18 #include "net/base/io_buffer.h" 16 #include "net/base/io_buffer.h"
19 #include "net/base/net_errors.h" 17 #include "net/base/net_errors.h"
20 18
21 namespace net { 19 namespace net {
22 20
23 // Ensure that we can just use our Whence values directly. 21 // Ensure that we can just use our Whence values directly.
24 COMPILE_ASSERT(FROM_BEGIN == FILE_BEGIN, bad_whence_begin); 22 COMPILE_ASSERT(FROM_BEGIN == FILE_BEGIN, bad_whence_begin);
25 COMPILE_ASSERT(FROM_CURRENT == FILE_CURRENT, bad_whence_current); 23 COMPILE_ASSERT(FROM_CURRENT == FILE_CURRENT, bad_whence_current);
26 COMPILE_ASSERT(FROM_END == FILE_END, bad_whence_end); 24 COMPILE_ASSERT(FROM_END == FILE_END, bad_whence_end);
27 25
28 namespace { 26 namespace {
29 27
30 void SetOffset(OVERLAPPED* overlapped, const LARGE_INTEGER& offset) { 28 void SetOffset(OVERLAPPED* overlapped, const LARGE_INTEGER& offset) {
31 overlapped->Offset = offset.LowPart; 29 overlapped->Offset = offset.LowPart;
32 overlapped->OffsetHigh = offset.HighPart; 30 overlapped->OffsetHigh = offset.HighPart;
33 } 31 }
34 32
35 void IncrementOffset(OVERLAPPED* overlapped, DWORD count) { 33 void IncrementOffset(OVERLAPPED* overlapped, DWORD count) {
36 LARGE_INTEGER offset; 34 LARGE_INTEGER offset;
37 offset.LowPart = overlapped->Offset; 35 offset.LowPart = overlapped->Offset;
38 offset.HighPart = overlapped->OffsetHigh; 36 offset.HighPart = overlapped->OffsetHigh;
39 offset.QuadPart += static_cast<LONGLONG>(count); 37 offset.QuadPart += static_cast<LONGLONG>(count);
40 SetOffset(overlapped, offset); 38 SetOffset(overlapped, offset);
41 } 39 }
42 40
43 int RecordAndMapError(int error,
44 FileErrorSource source,
45 bool record_uma,
46 const net::BoundNetLog& bound_net_log) {
47 net::Error net_error = MapSystemError(error);
48
49 bound_net_log.AddEvent(
50 net::NetLog::TYPE_FILE_STREAM_ERROR,
51 base::Bind(&NetLogFileStreamErrorCallback,
52 source, error, net_error));
53
54 RecordFileError(error, source, record_uma);
55
56 return net_error;
57 }
58
59 // Opens a file with some network logging.
60 // The opened file and the result code are written to |file| and |result|.
61 void OpenFile(const FilePath& path,
62 int open_flags,
63 bool record_uma,
64 base::PlatformFile* file,
65 int* result,
66 const net::BoundNetLog& bound_net_log) {
67 std::string file_name = path.AsUTF8Unsafe();
68 bound_net_log.BeginEvent(
69 net::NetLog::TYPE_FILE_STREAM_OPEN,
70 NetLog::StringCallback("file_name", &file_name));
71
72 *file = base::CreatePlatformFile(path, open_flags, NULL, NULL);
73 if (*file == base::kInvalidPlatformFileValue) {
74 DWORD error = GetLastError();
75 LOG(WARNING) << "Failed to open file: " << error;
76 *result = RecordAndMapError(error,
77 FILE_ERROR_SOURCE_OPEN,
78 record_uma,
79 bound_net_log);
80 bound_net_log.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN);
81 return;
82 }
83 }
84
85 // Closes a file with some network logging.
86 void CloseFile(base::PlatformFile file,
87 const net::BoundNetLog& bound_net_log) {
88 bound_net_log.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE);
89 if (file == base::kInvalidPlatformFileValue)
90 return;
91
92 CancelIo(file);
93
94 if (!base::ClosePlatformFile(file))
95 NOTREACHED();
96 bound_net_log.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN);
97 }
98
99 // Closes a file with CloseFile() and signals the completion.
100 void CloseFileAndSignal(base::PlatformFile* file,
101 base::WaitableEvent* on_io_complete,
102 const net::BoundNetLog& bound_net_log) {
103 CloseFile(*file, bound_net_log);
104 *file = base::kInvalidPlatformFileValue;
105 on_io_complete->Signal();
106 }
107
108 // Invokes a given closure and signals the completion.
109 void InvokeAndSignal(const base::Closure& closure,
110 base::WaitableEvent* on_io_complete) {
111 closure.Run();
112 on_io_complete->Signal();
113 }
114
115 } // namespace 41 } // namespace
116 42
117 // FileStreamWin::AsyncContext ---------------------------------------------- 43 // FileStream::AsyncContext ----------------------------------------------
118 44
119 class FileStreamWin::AsyncContext : public MessageLoopForIO::IOHandler {
120 public:
121 explicit AsyncContext(const net::BoundNetLog& bound_net_log)
122 : context_(), is_closing_(false),
123 record_uma_(false), bound_net_log_(bound_net_log),
124 error_source_(FILE_ERROR_SOURCE_COUNT) {
125 context_.handler = this;
126 }
127 ~AsyncContext();
128 45
129 void IOCompletionIsPending(const CompletionCallback& callback, 46 FileStream::AsyncContext::AsyncContext(const BoundNetLog& bound_net_log)
130 IOBuffer* buf); 47 : io_context_(),
48 file_(base::kInvalidPlatformFileValue),
49 record_uma_(false),
50 async_in_progress_(false),
51 destroyed_(false),
52 bound_net_log_(bound_net_log),
53 error_source_(FILE_ERROR_SOURCE_COUNT) {
54 io_context_.handler = this;
55 }
131 56
132 OVERLAPPED* overlapped() { return &context_.overlapped; } 57 FileStream::AsyncContext::AsyncContext(base::PlatformFile file,
133 const CompletionCallback& callback() const { return callback_; } 58 const BoundNetLog& bound_net_log,
59 int open_flags)
60 : io_context_(),
61 file_(file),
62 record_uma_(false),
63 async_in_progress_(false),
64 destroyed_(false),
65 bound_net_log_(bound_net_log),
66 error_source_(FILE_ERROR_SOURCE_COUNT) {
67 io_context_.handler = this;
68 if (open_flags & base::PLATFORM_FILE_ASYNC)
69 RegisterInMessageLoop();
70 }
134 71
135 void set_error_source(FileErrorSource source) { error_source_ = source; } 72 void FileStream::AsyncContext::Destroy() {
73 destroyed_ = true;
74 CancelIo(file_);
75 if (!async_in_progress_)
76 DeleteAbandoned();
77 }
136 78
137 void EnableErrorStatistics() { 79 void FileStream::AsyncContext::OpenAsync(
138 record_uma_ = true; 80 const FilePath& path,
139 } 81 int open_flags,
82 const CompletionCallback& callback) {
83 DCHECK(!async_in_progress_);
140 84
141 private: 85 BeginOpenEvent(path);
142 virtual void OnIOCompleted(MessageLoopForIO::IOContext* context,
143 DWORD bytes_read, DWORD error) OVERRIDE;
144 86
145 MessageLoopForIO::IOContext context_; 87 int* result = new int(OK);
146 CompletionCallback callback_; 88 const bool posted = base::WorkerPool::PostTaskAndReply(
147 scoped_refptr<IOBuffer> in_flight_buf_; 89 FROM_HERE,
148 bool is_closing_; 90 base::Bind(&AsyncContext::OpenFileImpl, base::Unretained(this),
149 bool record_uma_; 91 path, open_flags, result),
150 const net::BoundNetLog bound_net_log_; 92 base::Bind(&AsyncContext::OnOpenCompleted,
151 FileErrorSource error_source_; 93 base::Unretained(this),
152 }; 94 callback, base::Owned(result)),
95 true /* task_is_slow */);
96 DCHECK(posted);
153 97
154 FileStreamWin::AsyncContext::~AsyncContext() { 98 async_in_progress_ = true;
155 is_closing_ = true; 99 }
156 bool waited = false; 100
157 base::TimeTicks start = base::TimeTicks::Now(); 101 int FileStream::AsyncContext::OpenSync(const FilePath& path,
158 while (!callback_.is_null()) { 102 int open_flags) {
159 waited = true; 103 int result = OK;
160 MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this); 104 BeginOpenEvent(path);
161 } 105 OpenFileImpl(path, open_flags, &result);
162 if (waited) { 106 CheckForOpenError(&result);
163 // We want to see if we block the message loop for too long. 107 // TODO(satorux): Remove this once all async clients are migrated to use
164 UMA_HISTOGRAM_TIMES("AsyncIO.FileStreamClose", 108 // Open(). crbug.com/114783
165 base::TimeTicks::Now() - start); 109 if (open_flags & base::PLATFORM_FILE_ASYNC)
110 RegisterInMessageLoop();
111 return result;
112 }
113
114 void FileStream::AsyncContext::CloseAsync(
115 const CompletionCallback& callback) {
116 DCHECK(!async_in_progress_);
117
118 bound_net_log_.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE);
119
120 // Value OK will never be changed in AsyncContext::CloseFile() and is needed
121 // here just to use the same AsyncContext::OnAsyncCompleted().
122 int* result = new int(OK);
123 if (file_ == base::kInvalidPlatformFileValue) {
124 MessageLoop::current()->PostTask(
125 FROM_HERE,
126 base::Bind(&AsyncContext::OnAsyncCompleted<int>,
127 base::Unretained(this),
128 callback, base::Owned(result)));
129 } else {
130 const bool posted = base::WorkerPool::PostTaskAndReply(
131 FROM_HERE,
132 base::Bind(&AsyncContext::CloseFileImpl, base::Unretained(this)),
133 base::Bind(&AsyncContext::OnCloseCompleted,
134 base::Unretained(this),
135 callback, base::Owned(result)),
136 true /* task_is_slow */);
137 DCHECK(posted);
138
139 async_in_progress_ = true;
166 } 140 }
167 } 141 }
168 142
169 void FileStreamWin::AsyncContext::IOCompletionIsPending( 143 void FileStream::AsyncContext::CloseSync() {
170 const CompletionCallback& callback, 144 DCHECK(!async_in_progress_);
171 IOBuffer* buf) {
172 DCHECK(callback_.is_null());
173 callback_ = callback;
174 in_flight_buf_ = buf; // Hold until the async operation ends.
175 }
176
177 void FileStreamWin::AsyncContext::OnIOCompleted(
178 MessageLoopForIO::IOContext* context, DWORD bytes_read, DWORD error) {
179 DCHECK_EQ(&context_, context);
180 DCHECK(!callback_.is_null());
181
182 if (is_closing_) {
183 callback_.Reset();
184 in_flight_buf_ = NULL;
185 return;
186 }
187
188 int result = static_cast<int>(bytes_read);
189 if (error && error != ERROR_HANDLE_EOF) {
190 result = RecordAndMapError(error, error_source_, record_uma_,
191 bound_net_log_);
192 }
193
194 if (bytes_read)
195 IncrementOffset(&context->overlapped, bytes_read);
196
197 CompletionCallback temp_callback = callback_;
198 callback_.Reset();
199 scoped_refptr<IOBuffer> temp_buf = in_flight_buf_;
200 in_flight_buf_ = NULL;
201 temp_callback.Run(result);
202 }
203
204 // FileStream ------------------------------------------------------------
205
206 FileStreamWin::FileStreamWin(net::NetLog* net_log)
207 : file_(base::kInvalidPlatformFileValue),
208 open_flags_(0),
209 auto_closed_(true),
210 record_uma_(false),
211 bound_net_log_(net::BoundNetLog::Make(net_log,
212 net::NetLog::SOURCE_FILESTREAM)),
213 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) {
214 bound_net_log_.BeginEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE);
215 }
216
217 FileStreamWin::FileStreamWin(
218 base::PlatformFile file, int flags, net::NetLog* net_log)
219 : file_(file),
220 open_flags_(flags),
221 auto_closed_(false),
222 record_uma_(false),
223 bound_net_log_(net::BoundNetLog::Make(net_log,
224 net::NetLog::SOURCE_FILESTREAM)),
225 weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)) {
226 bound_net_log_.BeginEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE);
227
228 // If the file handle is opened with base::PLATFORM_FILE_ASYNC, we need to
229 // make sure we will perform asynchronous File IO to it.
230 if (flags & base::PLATFORM_FILE_ASYNC) {
231 async_context_.reset(new AsyncContext(bound_net_log_));
232 MessageLoopForIO::current()->RegisterIOHandler(file_,
233 async_context_.get());
234 }
235 }
236
237 FileStreamWin::~FileStreamWin() {
238 if (open_flags_ & base::PLATFORM_FILE_ASYNC) {
239 // Block until the in-flight open/close operation is complete.
240 // TODO(satorux): Ideally we should not block. crbug.com/115067
241 WaitForIOCompletion();
242
243 // Block until the last read/write operation is complete.
244 async_context_.reset();
245 }
246
247 if (auto_closed_) {
248 if (open_flags_ & base::PLATFORM_FILE_ASYNC) {
249 // Close the file in the background.
250 if (IsOpen()) {
251 const bool posted = base::WorkerPool::PostTask(
252 FROM_HERE,
253 base::Bind(&CloseFile, file_, bound_net_log_),
254 true /* task_is_slow */);
255 DCHECK(posted);
256 }
257 } else {
258 CloseSync();
259 }
260 }
261
262 bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_ALIVE);
263 }
264
265 void FileStreamWin::Close(const CompletionCallback& callback) {
266 DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC);
267 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
268 DCHECK(!on_io_complete_.get());
269 on_io_complete_.reset(new base::WaitableEvent(
270 false /* manual_reset */, false /* initially_signaled */));
271
272 // Passing &file_ to a thread pool looks unsafe but it's safe here as the
273 // destructor ensures that the close operation is complete with
274 // WaitForIOCompletion(). See also the destructor.
275 const bool posted = base::WorkerPool::PostTaskAndReply(
276 FROM_HERE,
277 base::Bind(&CloseFileAndSignal, &file_, on_io_complete_.get(),
278 bound_net_log_),
279 base::Bind(&FileStreamWin::OnClosed,
280 weak_ptr_factory_.GetWeakPtr(),
281 callback),
282 true /* task_is_slow */);
283 DCHECK(posted);
284 }
285
286 void FileStreamWin::CloseSync() {
287 // The logic here is similar to CloseFile() but async_context_.reset() is
288 // caled in this function.
289
290 // Block until the in-flight open operation is complete.
291 // TODO(satorux): Replace this with a DCHECK(open_flags & ASYNC) once this
292 // once all async clients are migrated to use Close(). crbug.com/114783
293 WaitForIOCompletion();
294
295 bound_net_log_.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE); 145 bound_net_log_.AddEvent(net::NetLog::TYPE_FILE_STREAM_CLOSE);
296 if (file_ != base::kInvalidPlatformFileValue)
297 CancelIo(file_);
298
299 // Block until the last read/write operation is complete.
300 async_context_.reset();
301
302 if (file_ != base::kInvalidPlatformFileValue) { 146 if (file_ != base::kInvalidPlatformFileValue) {
303 if (!base::ClosePlatformFile(file_)) 147 CloseFileImpl();
304 NOTREACHED();
305 file_ = base::kInvalidPlatformFileValue;
306
307 bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN); 148 bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN);
308 } 149 }
309 } 150 }
310 151
311 int FileStreamWin::Open(const FilePath& path, int open_flags, 152 void FileStream::AsyncContext::SeekAsync(
312 const CompletionCallback& callback) { 153 Whence whence,
313 if (IsOpen()) { 154 int64 offset,
314 DLOG(FATAL) << "File is already open!"; 155 const Int64CompletionCallback& callback) {
315 return ERR_UNEXPECTED; 156 DCHECK(!async_in_progress_);
316 }
317 157
318 open_flags_ = open_flags; 158 int64* result = new int64(-1);
319 DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC);
320 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
321 DCHECK(!on_io_complete_.get());
322 on_io_complete_.reset(new base::WaitableEvent(
323 false /* manual_reset */, false /* initially_signaled */));
324
325 // Passing &file_ to a thread pool looks unsafe but it's safe here as the
326 // destructor ensures that the open operation is complete with
327 // WaitForIOCompletion(). See also the destructor.
328 int* result = new int(OK);
329 const bool posted = base::WorkerPool::PostTaskAndReply( 159 const bool posted = base::WorkerPool::PostTaskAndReply(
330 FROM_HERE, 160 FROM_HERE,
331 base::Bind(&InvokeAndSignal, 161 base::Bind(&AsyncContext::SeekFileImpl, base::Unretained(this),
332 base::Bind(&OpenFile, path, open_flags, record_uma_, &file_, 162 whence, offset, result),
333 result, bound_net_log_), 163 base::Bind(&AsyncContext::OnSeekCompleted,
334 on_io_complete_.get()), 164 base::Unretained(this),
335 base::Bind(&FileStreamWin::OnOpened,
336 weak_ptr_factory_.GetWeakPtr(),
337 callback, base::Owned(result)),
338 true /* task_is_slow */);
339 DCHECK(posted);
340 return ERR_IO_PENDING;
341 }
342
343 int FileStreamWin::OpenSync(const FilePath& path, int open_flags) {
344 if (IsOpen()) {
345 DLOG(FATAL) << "File is already open!";
346 return ERR_UNEXPECTED;
347 }
348
349 open_flags_ = open_flags;
350
351 int result = OK;
352 OpenFile(path, open_flags_, record_uma_, &file_, &result, bound_net_log_);
353 if (result != OK)
354 return result;
355
356 // TODO(satorux): Remove this once all async clients are migrated to use
357 // Open(). crbug.com/114783
358 if (open_flags_ & base::PLATFORM_FILE_ASYNC) {
359 async_context_.reset(new AsyncContext(bound_net_log_));
360 if (record_uma_)
361 async_context_->EnableErrorStatistics();
362 MessageLoopForIO::current()->RegisterIOHandler(file_,
363 async_context_.get());
364 }
365
366 return OK;
367 }
368
369 bool FileStreamWin::IsOpen() const {
370 return file_ != base::kInvalidPlatformFileValue;
371 }
372
373 int FileStreamWin::Seek(Whence whence, int64 offset,
374 const Int64CompletionCallback& callback) {
375 if (!IsOpen())
376 return ERR_UNEXPECTED;
377
378 // Make sure we're async and we have no other in-flight async operations.
379 DCHECK(open_flags_ & base::PLATFORM_FILE_ASYNC);
380 DCHECK(!weak_ptr_factory_.HasWeakPtrs());
381 DCHECK(!on_io_complete_.get());
382
383 int64* result = new int64(-1);
384 on_io_complete_.reset(new base::WaitableEvent(
385 false /* manual_reset */, false /* initially_signaled */));
386
387 const bool posted = base::WorkerPool::PostTaskAndReply(
388 FROM_HERE,
389 base::Bind(&InvokeAndSignal,
390 // Unretained should be fine as we wait for a signal on
391 // on_io_complete_ at the destructor.
392 base::Bind(&FileStreamWin::SeekFile, base::Unretained(this),
393 whence, offset, result),
394 on_io_complete_.get()),
395 base::Bind(&FileStreamWin::OnSeeked,
396 weak_ptr_factory_.GetWeakPtr(),
397 callback, base::Owned(result)), 165 callback, base::Owned(result)),
398 true /* task is slow */); 166 true /* task is slow */);
399 DCHECK(posted); 167 DCHECK(posted);
400 return ERR_IO_PENDING; 168
169 async_in_progress_ = true;
401 } 170 }
402 171
403 int64 FileStreamWin::SeekSync(Whence whence, int64 offset) { 172 int64 FileStream::AsyncContext::SeekSync(Whence whence, int64 offset) {
404 if (!IsOpen())
405 return ERR_UNEXPECTED;
406
407 DCHECK(!async_context_.get() || async_context_->callback().is_null());
408 int64 result = -1; 173 int64 result = -1;
409 SeekFile(whence, offset, &result); 174 SeekFileImpl(whence, offset, &result);
175 CheckForSeekError(&result);
410 return result; 176 return result;
411 } 177 }
412 178
413 int64 FileStreamWin::Available() { 179 int64 FileStream::AsyncContext::GetFileSize() {
414 base::ThreadRestrictions::AssertIOAllowed();
415
416 if (!IsOpen())
417 return ERR_UNEXPECTED;
418
419 int64 cur_pos = SeekSync(FROM_CURRENT, 0);
420 if (cur_pos < 0)
421 return cur_pos;
422
423 LARGE_INTEGER file_size; 180 LARGE_INTEGER file_size;
424 if (!GetFileSizeEx(file_, &file_size)) { 181 if (!GetFileSizeEx(file_, &file_size)) {
425 DWORD error = GetLastError(); 182 DWORD error = GetLastError();
426 LOG(WARNING) << "GetFileSizeEx failed: " << error; 183 LOG(WARNING) << "GetFileSizeEx failed: " << error;
427 return RecordAndMapError(error, 184 return RecordAndMapError(error, FILE_ERROR_SOURCE_GET_SIZE);
428 FILE_ERROR_SOURCE_GET_SIZE,
429 record_uma_,
430 bound_net_log_);
431 } 185 }
432 186
433 return file_size.QuadPart - cur_pos; 187 return file_size.QuadPart;
434 } 188 }
435 189
436 int FileStreamWin::Read( 190 int FileStream::AsyncContext::ReadAsync(
437 IOBuffer* buf, int buf_len, const CompletionCallback& callback) { 191 IOBuffer* buf,
438 DCHECK(async_context_.get()); 192 int buf_len,
439 193 const CompletionCallback& callback) {
440 if (!IsOpen()) 194 DCHECK(!async_in_progress_);
441 return ERR_UNEXPECTED; 195 error_source_ = FILE_ERROR_SOURCE_READ;
442
443 DCHECK(open_flags_ & base::PLATFORM_FILE_READ);
444
445 OVERLAPPED* overlapped = NULL;
446 DCHECK(!callback.is_null());
447 DCHECK(async_context_->callback().is_null());
448 overlapped = async_context_->overlapped();
449 async_context_->set_error_source(FILE_ERROR_SOURCE_READ);
450 196
451 int rv = 0; 197 int rv = 0;
452 198
453 DWORD bytes_read; 199 DWORD bytes_read;
454 if (!ReadFile(file_, buf->data(), buf_len, &bytes_read, overlapped)) { 200 if (!ReadFile(file_, buf->data(), buf_len,
201 &bytes_read, &io_context_.overlapped)) {
455 DWORD error = GetLastError(); 202 DWORD error = GetLastError();
456 if (error == ERROR_IO_PENDING) { 203 if (error == ERROR_IO_PENDING) {
457 async_context_->IOCompletionIsPending(callback, buf); 204 IOCompletionIsPending(callback, buf);
458 rv = ERR_IO_PENDING; 205 rv = ERR_IO_PENDING;
459 } else if (error == ERROR_HANDLE_EOF) { 206 } else if (error == ERROR_HANDLE_EOF) {
460 rv = 0; // Report EOF by returning 0 bytes read. 207 rv = 0; // Report EOF by returning 0 bytes read.
461 } else { 208 } else {
462 LOG(WARNING) << "ReadFile failed: " << error; 209 LOG(WARNING) << "ReadFile failed: " << error;
463 rv = RecordAndMapError(error, 210 rv = RecordAndMapError(error, FILE_ERROR_SOURCE_READ);
464 FILE_ERROR_SOURCE_READ,
465 record_uma_,
466 bound_net_log_);
467 } 211 }
468 } else if (overlapped) { 212 } else {
469 async_context_->IOCompletionIsPending(callback, buf); 213 IOCompletionIsPending(callback, buf);
470 rv = ERR_IO_PENDING; 214 rv = ERR_IO_PENDING;
471 } else {
472 rv = static_cast<int>(bytes_read);
473 } 215 }
474 return rv; 216 return rv;
475 } 217 }
476 218
477 int FileStreamWin::ReadSync(char* buf, int buf_len) { 219 int FileStream::AsyncContext::ReadSync(char* buf, int buf_len) {
478 DCHECK(!async_context_.get());
479 base::ThreadRestrictions::AssertIOAllowed(); 220 base::ThreadRestrictions::AssertIOAllowed();
480 221
481 if (!IsOpen())
482 return ERR_UNEXPECTED;
483
484 DCHECK(open_flags_ & base::PLATFORM_FILE_READ);
485
486 int rv = 0; 222 int rv = 0;
487 223
488 DWORD bytes_read; 224 DWORD bytes_read;
489 if (!ReadFile(file_, buf, buf_len, &bytes_read, NULL)) { 225 if (!ReadFile(file_, buf, buf_len, &bytes_read, NULL)) {
490 DWORD error = GetLastError(); 226 DWORD error = GetLastError();
491 if (error == ERROR_HANDLE_EOF) { 227 if (error == ERROR_HANDLE_EOF) {
492 rv = 0; // Report EOF by returning 0 bytes read. 228 rv = 0; // Report EOF by returning 0 bytes read.
493 } else { 229 } else {
494 LOG(WARNING) << "ReadFile failed: " << error; 230 LOG(WARNING) << "ReadFile failed: " << error;
495 rv = RecordAndMapError(error, 231 rv = RecordAndMapError(error, FILE_ERROR_SOURCE_READ);
496 FILE_ERROR_SOURCE_READ,
497 record_uma_,
498 bound_net_log_);
499 } 232 }
500 } else { 233 } else {
501 rv = static_cast<int>(bytes_read); 234 rv = static_cast<int>(bytes_read);
502 } 235 }
503 return rv; 236 return rv;
504 } 237 }
505 238
506 int FileStreamWin::ReadUntilComplete(char *buf, int buf_len) { 239 int FileStream::AsyncContext::WriteAsync(
507 int to_read = buf_len; 240 IOBuffer* buf,
508 int bytes_total = 0; 241 int buf_len,
509 242 const CompletionCallback& callback) {
510 do { 243 error_source_ = FILE_ERROR_SOURCE_WRITE;
511 int bytes_read = ReadSync(buf, to_read);
512 if (bytes_read <= 0) {
513 if (bytes_total == 0)
514 return bytes_read;
515
516 return bytes_total;
517 }
518
519 bytes_total += bytes_read;
520 buf += bytes_read;
521 to_read -= bytes_read;
522 } while (bytes_total < buf_len);
523
524 return bytes_total;
525 }
526
527 int FileStreamWin::Write(
528 IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
529 DCHECK(async_context_.get());
530
531 if (!IsOpen())
532 return ERR_UNEXPECTED;
533
534 DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE);
535
536 OVERLAPPED* overlapped = NULL;
537 DCHECK(!callback.is_null());
538 DCHECK(async_context_->callback().is_null());
539 overlapped = async_context_->overlapped();
540 async_context_->set_error_source(FILE_ERROR_SOURCE_WRITE);
541 244
542 int rv = 0; 245 int rv = 0;
543 DWORD bytes_written = 0; 246 DWORD bytes_written = 0;
544 if (!WriteFile(file_, buf->data(), buf_len, &bytes_written, overlapped)) { 247 if (!WriteFile(file_, buf->data(), buf_len,
248 &bytes_written, &io_context_.overlapped)) {
545 DWORD error = GetLastError(); 249 DWORD error = GetLastError();
546 if (error == ERROR_IO_PENDING) { 250 if (error == ERROR_IO_PENDING) {
547 async_context_->IOCompletionIsPending(callback, buf); 251 IOCompletionIsPending(callback, buf);
548 rv = ERR_IO_PENDING; 252 rv = ERR_IO_PENDING;
549 } else { 253 } else {
550 LOG(WARNING) << "WriteFile failed: " << error; 254 LOG(WARNING) << "WriteFile failed: " << error;
551 rv = RecordAndMapError(error, 255 rv = RecordAndMapError(error, FILE_ERROR_SOURCE_WRITE);
552 FILE_ERROR_SOURCE_WRITE,
553 record_uma_,
554 bound_net_log_);
555 } 256 }
556 } else if (overlapped) { 257 } else {
557 async_context_->IOCompletionIsPending(callback, buf); 258 IOCompletionIsPending(callback, buf);
558 rv = ERR_IO_PENDING; 259 rv = ERR_IO_PENDING;
559 } else {
560 rv = static_cast<int>(bytes_written);
561 } 260 }
562 return rv; 261 return rv;
563 } 262 }
564 263
565 int FileStreamWin::WriteSync( 264 int FileStream::AsyncContext::WriteSync(const char* buf, int buf_len) {
566 const char* buf, int buf_len) {
567 DCHECK(!async_context_.get());
568 base::ThreadRestrictions::AssertIOAllowed(); 265 base::ThreadRestrictions::AssertIOAllowed();
569 266
570 if (!IsOpen())
571 return ERR_UNEXPECTED;
572
573 DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE);
574
575 int rv = 0; 267 int rv = 0;
576 DWORD bytes_written = 0; 268 DWORD bytes_written = 0;
577 if (!WriteFile(file_, buf, buf_len, &bytes_written, NULL)) { 269 if (!WriteFile(file_, buf, buf_len, &bytes_written, NULL)) {
578 DWORD error = GetLastError(); 270 DWORD error = GetLastError();
579 LOG(WARNING) << "WriteFile failed: " << error; 271 LOG(WARNING) << "WriteFile failed: " << error;
580 rv = RecordAndMapError(error, 272 rv = RecordAndMapError(error, FILE_ERROR_SOURCE_WRITE);
581 FILE_ERROR_SOURCE_WRITE,
582 record_uma_,
583 bound_net_log_);
584 } else { 273 } else {
585 rv = static_cast<int>(bytes_written); 274 rv = static_cast<int>(bytes_written);
586 } 275 }
587 return rv; 276 return rv;
588 } 277 }
589 278
590 int FileStreamWin::Flush() { 279 int FileStream::AsyncContext::Flush() {
280 if (FlushFileBuffers(file_))
281 return OK;
282
283 return RecordAndMapError(GetLastError(), FILE_ERROR_SOURCE_FLUSH);
284 }
285
286 int FileStream::AsyncContext::Truncate(int64 bytes) {
287 BOOL result = SetEndOfFile(file_);
288 if (result)
289 return bytes;
290
291 DWORD error = GetLastError();
292 LOG(WARNING) << "SetEndOfFile failed: " << error;
293 return RecordAndMapError(error, FILE_ERROR_SOURCE_SET_EOF);
294 }
295
296 int FileStream::AsyncContext::RecordAndMapError(int error,
297 FileErrorSource source) {
298 // The following check is against incorrect use or bug. File descriptor
299 // shouldn't ever be closed outside of FileStream while it still tries to do
300 // something with it.
301 DCHECK(error != ERROR_INVALID_HANDLE);
302 net::Error net_error = MapSystemError(error);
303
304 if (!destroyed_) {
305 bound_net_log_.AddEvent(net::NetLog::TYPE_FILE_STREAM_ERROR,
306 base::Bind(&NetLogFileStreamErrorCallback,
307 source, error, net_error));
308 }
309 RecordFileError(error, source, record_uma_);
310 return net_error;
311 }
312
313 void FileStream::AsyncContext::BeginOpenEvent(const FilePath& path) {
314 std::string file_name = path.AsUTF8Unsafe();
315 bound_net_log_.BeginEvent(net::NetLog::TYPE_FILE_STREAM_OPEN,
316 NetLog::StringCallback("file_name", &file_name));
317 }
318
319 void FileStream::AsyncContext::OpenFileImpl(const FilePath& path,
320 int open_flags,
321 int* result) {
322 if (destroyed_)
323 return;
324
325 file_ = base::CreatePlatformFile(path, open_flags, NULL, NULL);
326 if (file_ == base::kInvalidPlatformFileValue)
327 *result = GetLastError();
328 }
329
330 void FileStream::AsyncContext::CheckForOpenError(int* result) {
331 if (file_ == base::kInvalidPlatformFileValue) {
332 bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN);
333 *result = RecordAndMapError(*result, FILE_ERROR_SOURCE_OPEN);
334 }
335 }
336
337 void FileStream::AsyncContext::OnOpenCompleted(
338 const CompletionCallback& callback,
339 int* result) {
340 CheckForOpenError(result);
341 if (!destroyed_)
342 RegisterInMessageLoop();
343 OnAsyncCompleted(callback, result);
344 }
345
346 void FileStream::AsyncContext::RegisterInMessageLoop() {
347 if (file_ != base::kInvalidPlatformFileValue)
348 MessageLoopForIO::current()->RegisterIOHandler(file_, this);
349 }
350
351 void FileStream::AsyncContext::CloseFileImpl() {
352 if (!base::ClosePlatformFile(file_))
353 NOTREACHED();
354 file_ = base::kInvalidPlatformFileValue;
355 }
356
357 void FileStream::AsyncContext::OnCloseCompleted(
358 const CompletionCallback& callback,
359 int* result) {
360 if (!destroyed_)
361 bound_net_log_.EndEvent(net::NetLog::TYPE_FILE_STREAM_OPEN);
362 OnAsyncCompleted(callback, result);
363 }
364
365 void FileStream::AsyncContext::SeekFileImpl(Whence whence,
366 int64 offset,
367 int64* result) {
591 base::ThreadRestrictions::AssertIOAllowed(); 368 base::ThreadRestrictions::AssertIOAllowed();
592 369
593 if (!IsOpen()) 370 // If context has been already destroyed nobody waits for operation results.
594 return ERR_UNEXPECTED; 371 if (destroyed_)
595
596 DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE);
597 if (FlushFileBuffers(file_)) {
598 return OK;
599 }
600
601 return RecordAndMapError(GetLastError(),
602 FILE_ERROR_SOURCE_FLUSH,
603 record_uma_,
604 bound_net_log_);
605 }
606
607 int64 FileStreamWin::Truncate(int64 bytes) {
608 base::ThreadRestrictions::AssertIOAllowed();
609
610 if (!IsOpen())
611 return ERR_UNEXPECTED;
612
613 // We'd better be open for writing.
614 DCHECK(open_flags_ & base::PLATFORM_FILE_WRITE);
615
616 // Seek to the position to truncate from.
617 int64 seek_position = SeekSync(FROM_BEGIN, bytes);
618 if (seek_position != bytes)
619 return ERR_UNEXPECTED;
620
621 // And truncate the file.
622 BOOL result = SetEndOfFile(file_);
623 if (!result) {
624 DWORD error = GetLastError();
625 LOG(WARNING) << "SetEndOfFile failed: " << error;
626 return RecordAndMapError(error,
627 FILE_ERROR_SOURCE_SET_EOF,
628 record_uma_,
629 bound_net_log_);
630 }
631
632 // Success.
633 return seek_position;
634 }
635
636 void FileStreamWin::EnableErrorStatistics() {
637 record_uma_ = true;
638
639 if (async_context_.get())
640 async_context_->EnableErrorStatistics();
641 }
642
643 void FileStreamWin::SetBoundNetLogSource(
644 const net::BoundNetLog& owner_bound_net_log) {
645 if ((owner_bound_net_log.source().id == net::NetLog::Source::kInvalidId) &&
646 (bound_net_log_.source().id == net::NetLog::Source::kInvalidId)) {
647 // Both |BoundNetLog|s are invalid.
648 return; 372 return;
649 } 373
650
651 // Should never connect to itself.
652 DCHECK_NE(bound_net_log_.source().id, owner_bound_net_log.source().id);
653
654 bound_net_log_.AddEvent(
655 net::NetLog::TYPE_FILE_STREAM_BOUND_TO_OWNER,
656 owner_bound_net_log.source().ToEventParametersCallback());
657
658 owner_bound_net_log.AddEvent(
659 net::NetLog::TYPE_FILE_STREAM_SOURCE,
660 bound_net_log_.source().ToEventParametersCallback());
661 }
662
663 base::PlatformFile FileStreamWin::GetPlatformFileForTesting() {
664 return file_;
665 }
666
667 void FileStreamWin::OnClosed(const CompletionCallback& callback) {
668 file_ = base::kInvalidPlatformFileValue;
669
670 // Reset this before Run() as Run() may issue a new async operation.
671 ResetOnIOComplete();
672 callback.Run(OK);
673 }
674
675 void FileStreamWin::SeekFile(Whence whence, int64 offset, int64* result) {
676 LARGE_INTEGER distance, res; 374 LARGE_INTEGER distance, res;
677 distance.QuadPart = offset; 375 distance.QuadPart = offset;
678 DWORD move_method = static_cast<DWORD>(whence); 376 DWORD move_method = static_cast<DWORD>(whence);
679 if (!SetFilePointerEx(file_, distance, &res, move_method)) { 377 if (SetFilePointerEx(file_, distance, &res, move_method)) {
680 DWORD error = GetLastError(); 378 SetOffset(&io_context_.overlapped, res);
681 LOG(WARNING) << "SetFilePointerEx failed: " << error; 379 *result = res.QuadPart;
682 *result = RecordAndMapError(error, 380 } else {
683 FILE_ERROR_SOURCE_SEEK, 381 *result = -static_cast<int>(GetLastError());
684 record_uma_, 382 }
685 bound_net_log_); 383 }
384
385 void FileStream::AsyncContext::CheckForSeekError(int64* result) {
386 if (*result < 0) {
387 *result = RecordAndMapError(static_cast<int>(-(*result)),
388 FILE_ERROR_SOURCE_SEEK);
389 }
390 }
391
392 void FileStream::AsyncContext::OnSeekCompleted(
393 const CompletionCallback64& callback,
394 int64* result) {
395 CheckForSeekError(result);
396 OnAsyncCompleted(callback, result);
397 }
398
399 void FileStream::AsyncContext::IOCompletionIsPending(
400 const CompletionCallback& callback,
401 IOBuffer* buf) {
402 DCHECK(callback_.is_null());
403 callback_ = callback;
404 in_flight_buf_ = buf; // Hold until the async operation ends.
405 async_in_progress_ = true;
406 }
407
408 void FileStream::AsyncContext::OnIOCompleted(
409 MessageLoopForIO::IOContext* context,
410 DWORD bytes_read,
411 DWORD error) {
412 DCHECK_EQ(&io_context_, context);
413 DCHECK(!callback_.is_null());
414
415 if (destroyed_) {
416 callback_.Reset();
417 in_flight_buf_ = NULL;
418 DeleteAbandoned();
686 return; 419 return;
687 } 420 }
688 if (async_context_.get()) { 421
689 async_context_->set_error_source(FILE_ERROR_SOURCE_SEEK); 422 int result = static_cast<int>(bytes_read);
690 SetOffset(async_context_->overlapped(), res); 423 if (error && error != ERROR_HANDLE_EOF)
691 } 424 result = RecordAndMapError(error, error_source_);
692 *result = res.QuadPart; 425
693 } 426 if (bytes_read)
694 427 IncrementOffset(&io_context_.overlapped, bytes_read);
695 void FileStreamWin::OnOpened(const CompletionCallback& callback, int* result) {
696 if (*result == OK) {
697 async_context_.reset(new AsyncContext(bound_net_log_));
698 if (record_uma_)
699 async_context_->EnableErrorStatistics();
700 MessageLoopForIO::current()->RegisterIOHandler(file_,
701 async_context_.get());
702 }
703 428
704 // Reset this before Run() as Run() may issue a new async operation. 429 // Reset this before Run() as Run() may issue a new async operation.
705 ResetOnIOComplete(); 430 async_in_progress_ = false;
706 callback.Run(*result); 431 CompletionCallback temp_callback = callback_;
707 } 432 callback_.Reset();
708 433 scoped_refptr<IOBuffer> temp_buf = in_flight_buf_;
709 void FileStreamWin::OnSeeked( 434 in_flight_buf_ = NULL;
710 const Int64CompletionCallback& callback, 435 temp_callback.Run(result);
711 int64* result) { 436 }
712 // Reset this before Run() as Run() may issue a new async operation. 437
713 ResetOnIOComplete(); 438 template <typename R>
714 callback.Run(*result); 439 void FileStream::AsyncContext::OnAsyncCompleted(
715 } 440 const base::Callback<void(R)>& callback,
716 441 R* result) {
717 void FileStreamWin::ResetOnIOComplete() { 442 if (destroyed_) {
718 on_io_complete_.reset(); 443 DeleteAbandoned();
719 weak_ptr_factory_.InvalidateWeakPtrs(); 444 } else {
720 } 445 // Reset this before Run() as Run() may issue a new async operation.
721 446 async_in_progress_ = false;
722 void FileStreamWin::WaitForIOCompletion() { 447 callback.Run(*result);
723 // http://crbug.com/115067 448 }
724 base::ThreadRestrictions::ScopedAllowWait allow_wait; 449 }
725 if (on_io_complete_.get()) { 450
726 on_io_complete_->Wait(); 451 void FileStream::AsyncContext::DeleteAbandoned() {
727 on_io_complete_.reset(); 452 if (file_ != base::kInvalidPlatformFileValue) {
728 } 453 const bool posted = base::WorkerPool::PostTask(
729 } 454 FROM_HERE,
730 455 // Context should be deleted after closing, thus Owned().
456 base::Bind(&AsyncContext::CloseFileImpl, base::Owned(this)),
457 true /* task_is_slow */);
458 DCHECK(posted);
459 } else {
460 delete this;
461 }
462 }
463
731 } // namespace net 464 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698