OLD | NEW |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/disk_cache/file.h" | 5 #include "net/disk_cache/file.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/lazy_instance.h" | |
9 #include "base/location.h" | 8 #include "base/location.h" |
10 #include "base/logging.h" | 9 #include "base/logging.h" |
11 #include "base/run_loop.h" | 10 #include "base/threading/worker_pool.h" |
12 #include "base/task_runner_util.h" | |
13 #include "base/threading/sequenced_worker_pool.h" | |
14 #include "net/base/net_errors.h" | 11 #include "net/base/net_errors.h" |
15 #include "net/disk_cache/disk_cache.h" | 12 #include "net/disk_cache/disk_cache.h" |
| 13 #include "net/disk_cache/in_flight_io.h" |
16 | 14 |
17 namespace { | 15 namespace { |
18 | 16 |
19 // The maximum number of threads for this pool. | 17 // This class represents a single asynchronous IO operation while it is being |
20 const int kMaxThreads = 5; | 18 // bounced between threads. |
| 19 class FileBackgroundIO : public disk_cache::BackgroundIO { |
| 20 public: |
| 21 // Other than the actual parameters for the IO operation (including the |
| 22 // |callback| that must be notified at the end), we need the controller that |
| 23 // is keeping track of all operations. When done, we notify the controller |
| 24 // (we do NOT invoke the callback), in the worker thead that completed the |
| 25 // operation. |
| 26 FileBackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len, |
| 27 size_t offset, disk_cache::FileIOCallback* callback, |
| 28 disk_cache::InFlightIO* controller) |
| 29 : disk_cache::BackgroundIO(controller), callback_(callback), file_(file), |
| 30 buf_(buf), buf_len_(buf_len), offset_(offset) { |
| 31 } |
21 | 32 |
22 class FileWorkerPool : public base::SequencedWorkerPool { | 33 disk_cache::FileIOCallback* callback() { |
| 34 return callback_; |
| 35 } |
| 36 |
| 37 disk_cache::File* file() { |
| 38 return file_; |
| 39 } |
| 40 |
| 41 // Read and Write are the operations that can be performed asynchronously. |
| 42 // The actual parameters for the operation are setup in the constructor of |
| 43 // the object. Both methods should be called from a worker thread, by posting |
| 44 // a task to the WorkerPool (they are RunnableMethods). When finished, |
| 45 // controller->OnIOComplete() is called. |
| 46 void Read(); |
| 47 void Write(); |
| 48 |
| 49 private: |
| 50 virtual ~FileBackgroundIO() {} |
| 51 |
| 52 disk_cache::FileIOCallback* callback_; |
| 53 |
| 54 disk_cache::File* file_; |
| 55 const void* buf_; |
| 56 size_t buf_len_; |
| 57 size_t offset_; |
| 58 |
| 59 DISALLOW_COPY_AND_ASSIGN(FileBackgroundIO); |
| 60 }; |
| 61 |
| 62 |
| 63 // The specialized controller that keeps track of current operations. |
| 64 class FileInFlightIO : public disk_cache::InFlightIO { |
23 public: | 65 public: |
24 FileWorkerPool() : base::SequencedWorkerPool(kMaxThreads, "CachePool") {} | 66 FileInFlightIO() {} |
| 67 virtual ~FileInFlightIO() {} |
| 68 |
| 69 // These methods start an asynchronous operation. The arguments have the same |
| 70 // semantics of the File asynchronous operations, with the exception that the |
| 71 // operation never finishes synchronously. |
| 72 void PostRead(disk_cache::File* file, void* buf, size_t buf_len, |
| 73 size_t offset, disk_cache::FileIOCallback* callback); |
| 74 void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len, |
| 75 size_t offset, disk_cache::FileIOCallback* callback); |
25 | 76 |
26 protected: | 77 protected: |
27 virtual ~FileWorkerPool() {} | 78 // Invokes the users' completion callback at the end of the IO operation. |
| 79 // |cancel| is true if the actual task posted to the thread is still |
| 80 // queued (because we are inside WaitForPendingIO), and false if said task is |
| 81 // the one performing the call. |
| 82 virtual void OnOperationComplete(disk_cache::BackgroundIO* operation, |
| 83 bool cancel) OVERRIDE; |
| 84 |
| 85 private: |
| 86 DISALLOW_COPY_AND_ASSIGN(FileInFlightIO); |
28 }; | 87 }; |
29 | 88 |
30 base::LazyInstance<FileWorkerPool>::Leaky s_worker_pool = | 89 // --------------------------------------------------------------------------- |
31 LAZY_INSTANCE_INITIALIZER; | 90 |
| 91 // Runs on a worker thread. |
| 92 void FileBackgroundIO::Read() { |
| 93 if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) { |
| 94 result_ = static_cast<int>(buf_len_); |
| 95 } else { |
| 96 result_ = net::ERR_CACHE_READ_FAILURE; |
| 97 } |
| 98 NotifyController(); |
| 99 } |
| 100 |
| 101 // Runs on a worker thread. |
| 102 void FileBackgroundIO::Write() { |
| 103 bool rv = file_->Write(buf_, buf_len_, offset_); |
| 104 |
| 105 result_ = rv ? static_cast<int>(buf_len_) : net::ERR_CACHE_WRITE_FAILURE; |
| 106 NotifyController(); |
| 107 } |
| 108 |
| 109 // --------------------------------------------------------------------------- |
| 110 |
| 111 void FileInFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len, |
| 112 size_t offset, disk_cache::FileIOCallback *callback) { |
| 113 scoped_refptr<FileBackgroundIO> operation( |
| 114 new FileBackgroundIO(file, buf, buf_len, offset, callback, this)); |
| 115 file->AddRef(); // Balanced on OnOperationComplete() |
| 116 |
| 117 base::WorkerPool::PostTask(FROM_HERE, |
| 118 base::Bind(&FileBackgroundIO::Read, operation.get()), true); |
| 119 OnOperationPosted(operation.get()); |
| 120 } |
| 121 |
| 122 void FileInFlightIO::PostWrite(disk_cache::File* file, const void* buf, |
| 123 size_t buf_len, size_t offset, |
| 124 disk_cache::FileIOCallback* callback) { |
| 125 scoped_refptr<FileBackgroundIO> operation( |
| 126 new FileBackgroundIO(file, buf, buf_len, offset, callback, this)); |
| 127 file->AddRef(); // Balanced on OnOperationComplete() |
| 128 |
| 129 base::WorkerPool::PostTask(FROM_HERE, |
| 130 base::Bind(&FileBackgroundIO::Write, operation.get()), true); |
| 131 OnOperationPosted(operation.get()); |
| 132 } |
| 133 |
| 134 // Runs on the IO thread. |
| 135 void FileInFlightIO::OnOperationComplete(disk_cache::BackgroundIO* operation, |
| 136 bool cancel) { |
| 137 FileBackgroundIO* op = static_cast<FileBackgroundIO*>(operation); |
| 138 |
| 139 disk_cache::FileIOCallback* callback = op->callback(); |
| 140 int bytes = operation->result(); |
| 141 |
| 142 // Release the references acquired in PostRead / PostWrite. |
| 143 op->file()->Release(); |
| 144 callback->OnFileIOComplete(bytes); |
| 145 } |
| 146 |
| 147 // A static object tha will broker all async operations. |
| 148 FileInFlightIO* s_file_operations = NULL; |
| 149 |
| 150 // Returns the current FileInFlightIO. |
| 151 FileInFlightIO* GetFileInFlightIO() { |
| 152 if (!s_file_operations) { |
| 153 s_file_operations = new FileInFlightIO; |
| 154 } |
| 155 return s_file_operations; |
| 156 } |
| 157 |
| 158 // Deletes the current FileInFlightIO. |
| 159 void DeleteFileInFlightIO() { |
| 160 DCHECK(s_file_operations); |
| 161 delete s_file_operations; |
| 162 s_file_operations = NULL; |
| 163 } |
32 | 164 |
33 } // namespace | 165 } // namespace |
34 | 166 |
35 namespace disk_cache { | 167 namespace disk_cache { |
36 | 168 |
37 File::File(base::PlatformFile file) | 169 File::File(base::PlatformFile file) |
38 : init_(true), | 170 : init_(true), |
39 mixed_(true), | 171 mixed_(true), |
40 platform_file_(file), | 172 platform_file_(file), |
41 sync_platform_file_(base::kInvalidPlatformFileValue) { | 173 sync_platform_file_(base::kInvalidPlatformFileValue) { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
86 offset > static_cast<size_t>(kint32max)) { | 218 offset > static_cast<size_t>(kint32max)) { |
87 return false; | 219 return false; |
88 } | 220 } |
89 | 221 |
90 int ret = base::WritePlatformFile(platform_file_, offset, | 222 int ret = base::WritePlatformFile(platform_file_, offset, |
91 static_cast<const char*>(buffer), | 223 static_cast<const char*>(buffer), |
92 buffer_len); | 224 buffer_len); |
93 return (static_cast<size_t>(ret) == buffer_len); | 225 return (static_cast<size_t>(ret) == buffer_len); |
94 } | 226 } |
95 | 227 |
| 228 // We have to increase the ref counter of the file before performing the IO to |
| 229 // prevent the completion to happen with an invalid handle (if the file is |
| 230 // closed while the IO is in flight). |
96 bool File::Read(void* buffer, size_t buffer_len, size_t offset, | 231 bool File::Read(void* buffer, size_t buffer_len, size_t offset, |
97 FileIOCallback* callback, bool* completed) { | 232 FileIOCallback* callback, bool* completed) { |
98 DCHECK(init_); | 233 DCHECK(init_); |
99 if (!callback) { | 234 if (!callback) { |
100 if (completed) | 235 if (completed) |
101 *completed = true; | 236 *completed = true; |
102 return Read(buffer, buffer_len, offset); | 237 return Read(buffer, buffer_len, offset); |
103 } | 238 } |
104 | 239 |
105 if (buffer_len > static_cast<size_t>(kint32max) || | 240 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) |
106 offset > static_cast<size_t>(kint32max)) { | |
107 return false; | 241 return false; |
108 } | |
109 | 242 |
110 base::PostTaskAndReplyWithResult( | 243 GetFileInFlightIO()->PostRead(this, buffer, buffer_len, offset, callback); |
111 s_worker_pool.Pointer(), FROM_HERE, | |
112 base::Bind(&File::DoRead, this, buffer, buffer_len, offset), | |
113 base::Bind(&File::OnOperationComplete, this, callback)); | |
114 | 244 |
115 *completed = false; | 245 *completed = false; |
116 return true; | 246 return true; |
117 } | 247 } |
118 | 248 |
119 bool File::Write(const void* buffer, size_t buffer_len, size_t offset, | 249 bool File::Write(const void* buffer, size_t buffer_len, size_t offset, |
120 FileIOCallback* callback, bool* completed) { | 250 FileIOCallback* callback, bool* completed) { |
121 DCHECK(init_); | 251 DCHECK(init_); |
122 if (!callback) { | 252 if (!callback) { |
123 if (completed) | 253 if (completed) |
124 *completed = true; | 254 *completed = true; |
125 return Write(buffer, buffer_len, offset); | 255 return Write(buffer, buffer_len, offset); |
126 } | 256 } |
127 | 257 |
128 if (buffer_len > static_cast<size_t>(kint32max) || | 258 return AsyncWrite(buffer, buffer_len, offset, callback, completed); |
129 offset > static_cast<size_t>(kint32max)) { | |
130 return false; | |
131 } | |
132 | |
133 base::PostTaskAndReplyWithResult( | |
134 s_worker_pool.Pointer(), FROM_HERE, | |
135 base::Bind(&File::DoWrite, this, buffer, buffer_len, offset), | |
136 base::Bind(&File::OnOperationComplete, this, callback)); | |
137 | |
138 *completed = false; | |
139 return true; | |
140 } | 259 } |
141 | 260 |
142 bool File::SetLength(size_t length) { | 261 bool File::SetLength(size_t length) { |
143 DCHECK(init_); | 262 DCHECK(init_); |
144 if (length > kuint32max) | 263 if (length > kuint32max) |
145 return false; | 264 return false; |
146 | 265 |
147 return base::TruncatePlatformFile(platform_file_, length); | 266 return base::TruncatePlatformFile(platform_file_, length); |
148 } | 267 } |
149 | 268 |
150 size_t File::GetLength() { | 269 size_t File::GetLength() { |
151 DCHECK(init_); | 270 DCHECK(init_); |
152 int64 len = base::SeekPlatformFile(platform_file_, | 271 int64 len = base::SeekPlatformFile(platform_file_, |
153 base::PLATFORM_FILE_FROM_END, 0); | 272 base::PLATFORM_FILE_FROM_END, 0); |
154 | 273 |
155 if (len > static_cast<int64>(kuint32max)) | 274 if (len > static_cast<int64>(kuint32max)) |
156 return kuint32max; | 275 return kuint32max; |
157 | 276 |
158 return static_cast<size_t>(len); | 277 return static_cast<size_t>(len); |
159 } | 278 } |
160 | 279 |
161 // Static. | 280 // Static. |
162 void File::WaitForPendingIO(int* num_pending_io) { | 281 void File::WaitForPendingIO(int* num_pending_io) { |
163 // We are running unit tests so we should wait for all callbacks. Sadly, the | 282 // We may be running unit tests so we should allow be able to reset the |
164 // worker pool only waits for tasks on the worker pool, not the "Reply" tasks | 283 // message loop. |
165 // so we have to let the current message loop to run. | 284 GetFileInFlightIO()->WaitForPendingIO(); |
166 s_worker_pool.Get().FlushForTesting(); | 285 DeleteFileInFlightIO(); |
167 base::RunLoop().RunUntilIdle(); | 286 } |
| 287 |
| 288 // Static. |
| 289 void File::DropPendingIO() { |
| 290 GetFileInFlightIO()->DropPendingIO(); |
| 291 DeleteFileInFlightIO(); |
168 } | 292 } |
169 | 293 |
170 File::~File() { | 294 File::~File() { |
171 if (IsValid()) | 295 if (IsValid()) |
172 base::ClosePlatformFile(platform_file_); | 296 base::ClosePlatformFile(platform_file_); |
173 } | 297 } |
174 | 298 |
175 // Runs on a worker thread. | 299 bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset, |
176 int File::DoRead(void* buffer, size_t buffer_len, size_t offset) { | 300 FileIOCallback* callback, bool* completed) { |
177 if (Read(const_cast<void*>(buffer), buffer_len, offset)) | 301 DCHECK(init_); |
178 return static_cast<int>(buffer_len); | 302 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) |
| 303 return false; |
179 | 304 |
180 return net::ERR_CACHE_READ_FAILURE; | 305 GetFileInFlightIO()->PostWrite(this, buffer, buffer_len, offset, callback); |
181 } | |
182 | 306 |
183 // Runs on a worker thread. | 307 if (completed) |
184 int File::DoWrite(const void* buffer, size_t buffer_len, size_t offset) { | 308 *completed = false; |
185 if (Write(const_cast<void*>(buffer), buffer_len, offset)) | 309 return true; |
186 return static_cast<int>(buffer_len); | |
187 | |
188 return net::ERR_CACHE_WRITE_FAILURE; | |
189 } | |
190 | |
191 // This method actually makes sure that the last reference to the file doesn't | |
192 // go away on the worker pool. | |
193 void File::OnOperationComplete(FileIOCallback* callback, int result) { | |
194 callback->OnFileIOComplete(result); | |
195 } | 310 } |
196 | 311 |
197 } // namespace disk_cache | 312 } // namespace disk_cache |
OLD | NEW |