Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(351)

Side by Side Diff: net/disk_cache/file_posix.cc

Issue 23752005: Disk Cache: Replace the worker pool with a sequenced worker pool (posix). (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/disk_cache/file.h" 5 #include "net/disk_cache/file.h"
6 6
7 #include <fcntl.h>
8
9 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/lazy_instance.h"
10 #include "base/location.h" 9 #include "base/location.h"
11 #include "base/logging.h" 10 #include "base/logging.h"
12 #include "base/threading/worker_pool.h" 11 #include "base/run_loop.h"
12 #include "base/task_runner_util.h"
13 #include "base/threading/sequenced_worker_pool.h"
13 #include "net/base/net_errors.h" 14 #include "net/base/net_errors.h"
14 #include "net/disk_cache/disk_cache.h" 15 #include "net/disk_cache/disk_cache.h"
15 #include "net/disk_cache/in_flight_io.h"
16 16
17 namespace { 17 namespace {
18 18
19 // This class represents a single asynchronous IO operation while it is being 19 // The maximum number of threads for this pool.
20 // bounced between threads. 20 const int kMaxThreads = 5;
21 class FileBackgroundIO : public disk_cache::BackgroundIO { 21
22 class FileWorkerPool : public base::SequencedWorkerPool {
22 public: 23 public:
23 // Other than the actual parameters for the IO operation (including the 24 FileWorkerPool() : base::SequencedWorkerPool(kMaxThreads, "CachePool") {}
24 // |callback| that must be notified at the end), we need the controller that
25 // is keeping track of all operations. When done, we notify the controller
26 // (we do NOT invoke the callback), in the worker thead that completed the
27 // operation.
28 FileBackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len,
29 size_t offset, disk_cache::FileIOCallback* callback,
30 disk_cache::InFlightIO* controller)
31 : disk_cache::BackgroundIO(controller), callback_(callback), file_(file),
32 buf_(buf), buf_len_(buf_len), offset_(offset) {
33 }
34
35 disk_cache::FileIOCallback* callback() {
36 return callback_;
37 }
38
39 disk_cache::File* file() {
40 return file_;
41 }
42
43 // Read and Write are the operations that can be performed asynchronously.
44 // The actual parameters for the operation are setup in the constructor of
45 // the object. Both methods should be called from a worker thread, by posting
46 // a task to the WorkerPool (they are RunnableMethods). When finished,
47 // controller->OnIOComplete() is called.
48 void Read();
49 void Write();
50
51 private:
52 virtual ~FileBackgroundIO() {}
53
54 disk_cache::FileIOCallback* callback_;
55
56 disk_cache::File* file_;
57 const void* buf_;
58 size_t buf_len_;
59 size_t offset_;
60
61 DISALLOW_COPY_AND_ASSIGN(FileBackgroundIO);
62 }; 25 };
63 26
64 27 base::LazyInstance<FileWorkerPool>::Leaky s_worker_pool =
65 // The specialized controller that keeps track of current operations. 28 LAZY_INSTANCE_INITIALIZER;
66 class FileInFlightIO : public disk_cache::InFlightIO {
67 public:
68 FileInFlightIO() {}
69 virtual ~FileInFlightIO() {}
70
71 // These methods start an asynchronous operation. The arguments have the same
72 // semantics of the File asynchronous operations, with the exception that the
73 // operation never finishes synchronously.
74 void PostRead(disk_cache::File* file, void* buf, size_t buf_len,
75 size_t offset, disk_cache::FileIOCallback* callback);
76 void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len,
77 size_t offset, disk_cache::FileIOCallback* callback);
78
79 protected:
80 // Invokes the users' completion callback at the end of the IO operation.
81 // |cancel| is true if the actual task posted to the thread is still
82 // queued (because we are inside WaitForPendingIO), and false if said task is
83 // the one performing the call.
84 virtual void OnOperationComplete(disk_cache::BackgroundIO* operation,
85 bool cancel) OVERRIDE;
86
87 private:
88 DISALLOW_COPY_AND_ASSIGN(FileInFlightIO);
89 };
90
91 // ---------------------------------------------------------------------------
92
93 // Runs on a worker thread.
94 void FileBackgroundIO::Read() {
95 if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) {
96 result_ = static_cast<int>(buf_len_);
97 } else {
98 result_ = net::ERR_CACHE_READ_FAILURE;
99 }
100 NotifyController();
101 }
102
103 // Runs on a worker thread.
104 void FileBackgroundIO::Write() {
105 bool rv = file_->Write(buf_, buf_len_, offset_);
106
107 result_ = rv ? static_cast<int>(buf_len_) : net::ERR_CACHE_WRITE_FAILURE;
108 NotifyController();
109 }
110
111 // ---------------------------------------------------------------------------
112
113 void FileInFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len,
114 size_t offset, disk_cache::FileIOCallback *callback) {
115 scoped_refptr<FileBackgroundIO> operation(
116 new FileBackgroundIO(file, buf, buf_len, offset, callback, this));
117 file->AddRef(); // Balanced on OnOperationComplete()
118
119 base::WorkerPool::PostTask(FROM_HERE,
120 base::Bind(&FileBackgroundIO::Read, operation.get()), true);
121 OnOperationPosted(operation.get());
122 }
123
124 void FileInFlightIO::PostWrite(disk_cache::File* file, const void* buf,
125 size_t buf_len, size_t offset,
126 disk_cache::FileIOCallback* callback) {
127 scoped_refptr<FileBackgroundIO> operation(
128 new FileBackgroundIO(file, buf, buf_len, offset, callback, this));
129 file->AddRef(); // Balanced on OnOperationComplete()
130
131 base::WorkerPool::PostTask(FROM_HERE,
132 base::Bind(&FileBackgroundIO::Write, operation.get()), true);
133 OnOperationPosted(operation.get());
134 }
135
136 // Runs on the IO thread.
137 void FileInFlightIO::OnOperationComplete(disk_cache::BackgroundIO* operation,
138 bool cancel) {
139 FileBackgroundIO* op = static_cast<FileBackgroundIO*>(operation);
140
141 disk_cache::FileIOCallback* callback = op->callback();
142 int bytes = operation->result();
143
144 // Release the references acquired in PostRead / PostWrite.
145 op->file()->Release();
146 callback->OnFileIOComplete(bytes);
147 }
148
149 // A static object tha will broker all async operations.
150 FileInFlightIO* s_file_operations = NULL;
151
152 // Returns the current FileInFlightIO.
153 FileInFlightIO* GetFileInFlightIO() {
154 if (!s_file_operations) {
155 s_file_operations = new FileInFlightIO;
156 }
157 return s_file_operations;
158 }
159
160 // Deletes the current FileInFlightIO.
161 void DeleteFileInFlightIO() {
162 DCHECK(s_file_operations);
163 delete s_file_operations;
164 s_file_operations = NULL;
165 }
166 29
167 } // namespace 30 } // namespace
168 31
169 namespace disk_cache { 32 namespace disk_cache {
170 33
171 File::File(base::PlatformFile file) 34 File::File(base::PlatformFile file)
172 : init_(true), 35 : init_(true),
173 mixed_(true), 36 mixed_(true),
174 platform_file_(file), 37 platform_file_(file),
175 sync_platform_file_(base::kInvalidPlatformFileValue) { 38 sync_platform_file_(base::kInvalidPlatformFileValue) {
(...skipping 19 matching lines...) Expand all
195 base::PlatformFile File::platform_file() const { 58 base::PlatformFile File::platform_file() const {
196 return platform_file_; 59 return platform_file_;
197 } 60 }
198 61
199 bool File::IsValid() const { 62 bool File::IsValid() const {
200 if (!init_) 63 if (!init_)
201 return false; 64 return false;
202 return (base::kInvalidPlatformFileValue != platform_file_); 65 return (base::kInvalidPlatformFileValue != platform_file_);
203 } 66 }
204 67
205 bool File::Read(void* buffer, size_t buffer_len, size_t offset) { 68 bool File::Read(void* buffer, size_t buffer_len, size_t offset) {
gavinp 2013/09/09 21:18:16 The references to init_ and platform_file_ from th
rvargas (doing something else) 2013/09/09 22:26:57 I don't think init_ is a problem as it is guarding
gavinp 2013/09/10 17:50:15 I agree.
206 DCHECK(init_); 69 DCHECK(init_);
207 if (buffer_len > static_cast<size_t>(kint32max) || 70 if (buffer_len > static_cast<size_t>(kint32max) ||
208 offset > static_cast<size_t>(kint32max)) 71 offset > static_cast<size_t>(kint32max)) {
209 return false; 72 return false;
73 }
210 74
211 int ret = base::ReadPlatformFile(platform_file_, offset, 75 int ret = base::ReadPlatformFile(platform_file_, offset,
212 static_cast<char*>(buffer), buffer_len); 76 static_cast<char*>(buffer), buffer_len);
213 return (static_cast<size_t>(ret) == buffer_len); 77 return (static_cast<size_t>(ret) == buffer_len);
214 } 78 }
215 79
216 bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { 80 bool File::Write(const void* buffer, size_t buffer_len, size_t offset) {
217 DCHECK(init_); 81 DCHECK(init_);
218 if (buffer_len > static_cast<size_t>(kint32max) || 82 if (buffer_len > static_cast<size_t>(kint32max) ||
219 offset > static_cast<size_t>(kint32max)) 83 offset > static_cast<size_t>(kint32max)) {
220 return false; 84 return false;
85 }
221 86
222 int ret = base::WritePlatformFile(platform_file_, offset, 87 int ret = base::WritePlatformFile(platform_file_, offset,
223 static_cast<const char*>(buffer), 88 static_cast<const char*>(buffer),
224 buffer_len); 89 buffer_len);
225 return (static_cast<size_t>(ret) == buffer_len); 90 return (static_cast<size_t>(ret) == buffer_len);
226 } 91 }
227 92
228 // We have to increase the ref counter of the file before performing the IO to
229 // prevent the completion to happen with an invalid handle (if the file is
230 // closed while the IO is in flight).
231 bool File::Read(void* buffer, size_t buffer_len, size_t offset, 93 bool File::Read(void* buffer, size_t buffer_len, size_t offset,
232 FileIOCallback* callback, bool* completed) { 94 FileIOCallback* callback, bool* completed) {
233 DCHECK(init_); 95 DCHECK(init_);
234 if (!callback) { 96 if (!callback) {
235 if (completed) 97 if (completed)
236 *completed = true; 98 *completed = true;
237 return Read(buffer, buffer_len, offset); 99 return Read(buffer, buffer_len, offset);
238 } 100 }
239 101
240 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) 102 if (buffer_len > static_cast<size_t>(kint32max) ||
103 offset > static_cast<size_t>(kint32max)) {
241 return false; 104 return false;
105 }
242 106
243 GetFileInFlightIO()->PostRead(this, buffer, buffer_len, offset, callback); 107 base::PostTaskAndReplyWithResult(
108 s_worker_pool.Pointer(), FROM_HERE,
109 base::Bind(&File::DoRead, this, buffer, buffer_len, offset),
110 base::Bind(&File::OnOperationComplete, this, callback));
244 111
245 *completed = false; 112 *completed = false;
246 return true; 113 return true;
247 } 114 }
248 115
249 bool File::Write(const void* buffer, size_t buffer_len, size_t offset, 116 bool File::Write(const void* buffer, size_t buffer_len, size_t offset,
250 FileIOCallback* callback, bool* completed) { 117 FileIOCallback* callback, bool* completed) {
251 DCHECK(init_); 118 DCHECK(init_);
252 if (!callback) { 119 if (!callback) {
253 if (completed) 120 if (completed)
254 *completed = true; 121 *completed = true;
255 return Write(buffer, buffer_len, offset); 122 return Write(buffer, buffer_len, offset);
256 } 123 }
257 124
258 return AsyncWrite(buffer, buffer_len, offset, callback, completed); 125 if (buffer_len > static_cast<size_t>(kint32max) ||
gavinp 2013/09/09 21:18:16 I'd have written this as std::numeric_limits<int32
rvargas (doing something else) 2013/09/09 22:26:57 Given that both are valid today I have a strong pr
gavinp 2013/09/10 17:50:15 SGTM. I would probably use numeric_limits<>(), but
126 offset > static_cast<size_t>(kint32max)) {
127 return false;
128 }
129
130 base::PostTaskAndReplyWithResult(
131 s_worker_pool.Pointer(), FROM_HERE,
132 base::Bind(&File::DoWrite, this, buffer, buffer_len, offset),
133 base::Bind(&File::OnOperationComplete, this, callback));
134
135 *completed = false;
136 return true;
259 } 137 }
260 138
261 bool File::SetLength(size_t length) { 139 bool File::SetLength(size_t length) {
262 DCHECK(init_); 140 DCHECK(init_);
263 if (length > ULONG_MAX) 141 if (length > kuint32max)
264 return false; 142 return false;
265 143
266 return base::TruncatePlatformFile(platform_file_, length); 144 return base::TruncatePlatformFile(platform_file_, length);
267 } 145 }
268 146
269 size_t File::GetLength() { 147 size_t File::GetLength() {
270 DCHECK(init_); 148 DCHECK(init_);
271 off_t ret = lseek(platform_file_, 0, SEEK_END); 149 int64 len = base::SeekPlatformFile(platform_file_,
272 if (ret < 0) 150 base::PLATFORM_FILE_FROM_END, 0);
273 return 0; 151
274 return ret; 152 if (len > static_cast<int64>(kuint32max))
153 return kuint32max;
154
155 return static_cast<size_t>(len);
275 } 156 }
276 157
277 // Static. 158 // Static.
278 void File::WaitForPendingIO(int* num_pending_io) { 159 void File::WaitForPendingIO(int* num_pending_io) {
279 // We may be running unit tests so we should allow be able to reset the 160 // We are running unit tests so we should wait for all callbacks. Sadly, the
280 // message loop. 161 // worker pool only waits for tasks on the worker pool, not the "Reply" tasks
281 GetFileInFlightIO()->WaitForPendingIO(); 162 // so we have to let the current message loop to run.
282 DeleteFileInFlightIO(); 163 s_worker_pool.Get().FlushForTesting();
283 } 164 base::RunLoop().RunUntilIdle();
284
285 // Static.
286 void File::DropPendingIO() {
287 GetFileInFlightIO()->DropPendingIO();
288 DeleteFileInFlightIO();
289 } 165 }
290 166
291 File::~File() { 167 File::~File() {
292 if (IsValid()) 168 if (IsValid())
293 base::ClosePlatformFile(platform_file_); 169 base::ClosePlatformFile(platform_file_);
294 } 170 }
295 171
296 bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset, 172 // Runs on a worker thread.
297 FileIOCallback* callback, bool* completed) { 173 int File::DoRead(void* buffer, size_t buffer_len, size_t offset) {
298 DCHECK(init_); 174 if (Read(const_cast<void*>(buffer), buffer_len, offset))
299 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) 175 return static_cast<int>(buffer_len);
300 return false;
301 176
302 GetFileInFlightIO()->PostWrite(this, buffer, buffer_len, offset, callback); 177 return net::ERR_CACHE_READ_FAILURE;
178 }
303 179
304 if (completed) 180 // Runs on a worker thread.
305 *completed = false; 181 int File::DoWrite(const void* buffer, size_t buffer_len, size_t offset) {
306 return true; 182 if (Write(const_cast<void*>(buffer), buffer_len, offset))
183 return static_cast<int>(buffer_len);
184
185 return net::ERR_CACHE_WRITE_FAILURE;
186 }
187
188 // This method actually makes sure that the last reference to the file doesn't
189 // go away on the worker pool.
190 void File::OnOperationComplete(FileIOCallback* callback, int result) {
191 callback->OnFileIOComplete(result);
307 } 192 }
308 193
309 } // namespace disk_cache 194 } // namespace disk_cache
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698