OLD | NEW |
1 // Copyright (c) 2006-2008 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2006-2010 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/disk_cache/file.h" | 5 #include "net/disk_cache/in_flight_io.h" |
6 | |
7 #include <fcntl.h> | |
8 | |
9 #include <set> | |
10 | 6 |
11 #include "base/logging.h" | 7 #include "base/logging.h" |
12 #include "base/message_loop.h" | |
13 #include "base/singleton.h" | |
14 #include "base/waitable_event.h" | |
15 #include "base/worker_pool.h" | |
16 #include "net/disk_cache/disk_cache.h" | |
17 | 8 |
18 namespace { | 9 namespace disk_cache { |
19 | 10 |
20 class InFlightIO; | 11 // Runs on the IO thread. |
21 | 12 void BackgroundIO::OnIOSignalled() { |
22 // This class represents a single asynchronous IO operation while it is being | 13 if (controller_) |
23 // bounced between threads. | 14 controller_->InvokeCallback(this, false); |
24 class BackgroundIO : public base::RefCountedThreadSafe<BackgroundIO> { | |
25 public: | |
26 // Other than the actual parameters for the IO operation (including the | |
27 // |callback| that must be notified at the end), we need the controller that | |
28 // is keeping track of all operations. When done, we notify the controller | |
29 // (we do NOT invoke the callback), in the worker thead that completed the | |
30 // operation. | |
31 BackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len, | |
32 size_t offset, disk_cache::FileIOCallback* callback, | |
33 InFlightIO* controller) | |
34 : io_completed_(true, false), callback_(callback), file_(file), buf_(buf), | |
35 buf_len_(buf_len), offset_(offset), controller_(controller), | |
36 bytes_(0) {} | |
37 | |
38 // Read and Write are the operations that can be performed asynchronously. | |
39 // The actual parameters for the operation are setup in the constructor of | |
40 // the object, with the exception of |delete_buffer|, that allows a write | |
41 // without a callback. Both methods should be called from a worker thread, by | |
42 // posting a task to the WorkerPool (they are RunnableMethods). When finished, | |
43 // controller->OnIOComplete() is called. | |
44 void Read(); | |
45 void Write(bool delete_buffer); | |
46 | |
47 // This method signals the controller that this operation is finished, in the | |
48 // original thread (presumably the IO-Thread). In practice, this is a | |
49 // RunableMethod that allows cancellation. | |
50 void OnIOSignalled(); | |
51 | |
52 // Allows the cancellation of the task to notify the controller (step number 7 | |
53 // in the diagram below). In practice, if the controller waits for the | |
54 // operation to finish it doesn't have to wait for the final task to be | |
55 // processed by the message loop so calling this method prevents its delivery. | |
56 // Note that this method is not intended to cancel the actual IO operation or | |
57 // to prevent the first notification to take place (OnIOComplete). | |
58 void Cancel(); | |
59 | |
60 // Retrieves the number of bytes transfered. | |
61 int Result(); | |
62 | |
63 base::WaitableEvent* io_completed() { | |
64 return &io_completed_; | |
65 } | |
66 | |
67 disk_cache::FileIOCallback* callback() { | |
68 return callback_; | |
69 } | |
70 | |
71 disk_cache::File* file() { | |
72 return file_; | |
73 } | |
74 | |
75 private: | |
76 friend class base::RefCountedThreadSafe<BackgroundIO>; | |
77 ~BackgroundIO() {} | |
78 | |
79 // An event to signal when the operation completes, and the user callback that | |
80 // has to be invoked. These members are accessed directly by the controller. | |
81 base::WaitableEvent io_completed_; | |
82 disk_cache::FileIOCallback* callback_; | |
83 | |
84 disk_cache::File* file_; | |
85 const void* buf_; | |
86 size_t buf_len_; | |
87 size_t offset_; | |
88 InFlightIO* controller_; // The controller that tracks all operations. | |
89 int bytes_; // Final operation result. | |
90 | |
91 DISALLOW_COPY_AND_ASSIGN(BackgroundIO); | |
92 }; | |
93 | |
94 // This class keeps track of every asynchronous IO operation. A single instance | |
95 // of this class is meant to be used to start an asynchronous operation (using | |
96 // PostRead/PostWrite). This class will post the operation to a worker thread, | |
97 // hanlde the notification when the operation finishes and perform the callback | |
98 // on the same thread that was used to start the operation. | |
99 // | |
100 // The regular sequence of calls is: | |
101 // Thread_1 Worker_thread | |
102 // 1. InFlightIO::PostRead() | |
103 // 2. -> PostTask -> | |
104 // 3. BackgroundIO::Read() | |
105 // 4. IO operation completes | |
106 // 5. InFlightIO::OnIOComplete() | |
107 // 6. <- PostTask <- | |
108 // 7. BackgroundIO::OnIOSignalled() | |
109 // 8. InFlightIO::InvokeCallback() | |
110 // 9. invoke callback | |
111 // | |
112 // Shutdown is a special case that is handled though WaitForPendingIO() instead | |
113 // of just waiting for step 7. | |
114 class InFlightIO { | |
115 public: | |
116 InFlightIO() : callback_thread_(MessageLoop::current()) {} | |
117 ~InFlightIO() {} | |
118 | |
119 // These methods start an asynchronous operation. The arguments have the same | |
120 // semantics of the File asynchronous operations, with the exception that the | |
121 // operation never finishes synchronously. | |
122 void PostRead(disk_cache::File* file, void* buf, size_t buf_len, | |
123 size_t offset, disk_cache::FileIOCallback* callback); | |
124 void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len, | |
125 size_t offset, disk_cache::FileIOCallback* callback, | |
126 bool delete_buffer); | |
127 | |
128 // Blocks the current thread until all IO operations tracked by this object | |
129 // complete. | |
130 void WaitForPendingIO(); | |
131 | |
132 // Called on a worker thread when |operation| completes. | |
133 void OnIOComplete(BackgroundIO* operation); | |
134 | |
135 // Invokes the users' completion callback at the end of the IO operation. | |
136 // |cancel_task| is true if the actual task posted to the thread is still | |
137 // queued (because we are inside WaitForPendingIO), and false if said task is | |
138 // the one performing the call. | |
139 void InvokeCallback(BackgroundIO* operation, bool cancel_task); | |
140 | |
141 private: | |
142 typedef std::set<scoped_refptr<BackgroundIO> > IOList; | |
143 | |
144 IOList io_list_; // List of pending io operations. | |
145 MessageLoop* callback_thread_; | |
146 }; | |
147 | |
148 // --------------------------------------------------------------------------- | |
149 | |
150 // Runs on a worker thread. | |
151 void BackgroundIO::Read() { | |
152 if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) { | |
153 bytes_ = static_cast<int>(buf_len_); | |
154 } else { | |
155 bytes_ = -1; | |
156 } | |
157 controller_->OnIOComplete(this); | |
158 } | |
159 | |
160 int BackgroundIO::Result() { | |
161 return bytes_; | |
162 } | 15 } |
163 | 16 |
164 void BackgroundIO::Cancel() { | 17 void BackgroundIO::Cancel() { |
165 DCHECK(controller_); | 18 DCHECK(controller_); |
166 controller_ = NULL; | 19 controller_ = NULL; |
167 } | 20 } |
168 | 21 |
169 // Runs on a worker thread. | 22 // Runs on the background thread. |
170 void BackgroundIO::Write(bool delete_buffer) { | 23 void BackgroundIO::NotifyController() { |
171 bool rv = file_->Write(buf_, buf_len_, offset_); | |
172 if (delete_buffer) { | |
173 // TODO(rvargas): remove or update this code. | |
174 delete[] reinterpret_cast<const char*>(buf_); | |
175 } | |
176 | |
177 bytes_ = rv ? static_cast<int>(buf_len_) : -1; | |
178 controller_->OnIOComplete(this); | 24 controller_->OnIOComplete(this); |
179 } | 25 } |
180 | 26 |
181 // Runs on the IO thread. | |
182 void BackgroundIO::OnIOSignalled() { | |
183 if (controller_) | |
184 controller_->InvokeCallback(this, false); | |
185 } | |
186 | |
187 // --------------------------------------------------------------------------- | 27 // --------------------------------------------------------------------------- |
188 | 28 |
189 void InFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len, | |
190 size_t offset, disk_cache::FileIOCallback *callback) { | |
191 scoped_refptr<BackgroundIO> operation = | |
192 new BackgroundIO(file, buf, buf_len, offset, callback, this); | |
193 io_list_.insert(operation.get()); | |
194 file->AddRef(); // Balanced on InvokeCallback() | |
195 | |
196 WorkerPool::PostTask(FROM_HERE, | |
197 NewRunnableMethod(operation.get(), &BackgroundIO::Read), | |
198 true); | |
199 } | |
200 | |
201 void InFlightIO::PostWrite(disk_cache::File* file, const void* buf, | |
202 size_t buf_len, size_t offset, | |
203 disk_cache::FileIOCallback* callback, | |
204 bool delete_buffer) { | |
205 scoped_refptr<BackgroundIO> operation = | |
206 new BackgroundIO(file, buf, buf_len, offset, callback, this); | |
207 io_list_.insert(operation.get()); | |
208 file->AddRef(); // Balanced on InvokeCallback() | |
209 | |
210 WorkerPool::PostTask(FROM_HERE, | |
211 NewRunnableMethod(operation.get(), &BackgroundIO::Write, | |
212 delete_buffer), | |
213 true); | |
214 } | |
215 | |
216 void InFlightIO::WaitForPendingIO() { | 29 void InFlightIO::WaitForPendingIO() { |
217 while (!io_list_.empty()) { | 30 while (!io_list_.empty()) { |
218 // Block the current thread until all pending IO completes. | 31 // Block the current thread until all pending IO completes. |
219 IOList::iterator it = io_list_.begin(); | 32 IOList::iterator it = io_list_.begin(); |
220 InvokeCallback(*it, true); | 33 InvokeCallback(*it, true); |
221 } | 34 } |
222 } | 35 } |
223 | 36 |
224 // Runs on a worker thread. | 37 // Runs on a background thread. |
225 void InFlightIO::OnIOComplete(BackgroundIO* operation) { | 38 void InFlightIO::OnIOComplete(BackgroundIO* operation) { |
226 callback_thread_->PostTask(FROM_HERE, | 39 callback_thread_->PostTask(FROM_HERE, |
227 NewRunnableMethod(operation, | 40 NewRunnableMethod(operation, |
228 &BackgroundIO::OnIOSignalled)); | 41 &BackgroundIO::OnIOSignalled)); |
229 operation->io_completed()->Signal(); | 42 operation->io_completed()->Signal(); |
230 } | 43 } |
231 | 44 |
232 // Runs on the IO thread. | 45 // Runs on the IO thread. |
233 void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) { | 46 void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) { |
234 operation->io_completed()->Wait(); | 47 operation->io_completed()->Wait(); |
235 | 48 |
236 if (cancel_task) | 49 if (cancel_task) |
237 operation->Cancel(); | 50 operation->Cancel(); |
238 | 51 |
239 disk_cache::FileIOCallback* callback = operation->callback(); | 52 // Make sure that we remove the operation from the list before invoking the |
240 int bytes = operation->Result(); | 53 // callback (so that a subsequent cancel does not invoke the callback again). |
241 | 54 DCHECK(io_list_.find(operation) != io_list_.end()); |
242 // Release the references acquired in PostRead / PostWrite. | |
243 operation->file()->Release(); | |
244 io_list_.erase(operation); | 55 io_list_.erase(operation); |
245 callback->OnFileIOComplete(bytes); | 56 OnOperationComplete(operation, cancel_task); |
246 } | 57 } |
247 | 58 |
248 } // namespace | 59 // Runs on the IO thread. |
249 | 60 void InFlightIO::OnOperationPosted(BackgroundIO* operation) { |
250 namespace disk_cache { | 61 io_list_.insert(operation); |
251 | |
252 File::File(base::PlatformFile file) | |
253 : init_(true), mixed_(true), platform_file_(file) { | |
254 } | |
255 | |
256 bool File::Init(const FilePath& name) { | |
257 if (init_) | |
258 return false; | |
259 | |
260 int flags = base::PLATFORM_FILE_OPEN | | |
261 base::PLATFORM_FILE_READ | | |
262 base::PLATFORM_FILE_WRITE; | |
263 platform_file_ = base::CreatePlatformFile(name, flags, NULL); | |
264 if (platform_file_ < 0) { | |
265 platform_file_ = 0; | |
266 return false; | |
267 } | |
268 | |
269 init_ = true; | |
270 return true; | |
271 } | |
272 | |
273 File::~File() { | |
274 if (platform_file_) | |
275 close(platform_file_); | |
276 } | |
277 | |
278 base::PlatformFile File::platform_file() const { | |
279 return platform_file_; | |
280 } | |
281 | |
282 bool File::IsValid() const { | |
283 if (!init_) | |
284 return false; | |
285 return (base::kInvalidPlatformFileValue != platform_file_); | |
286 } | |
287 | |
288 bool File::Read(void* buffer, size_t buffer_len, size_t offset) { | |
289 DCHECK(init_); | |
290 if (buffer_len > ULONG_MAX || offset > LONG_MAX) | |
291 return false; | |
292 | |
293 int ret = pread(platform_file_, buffer, buffer_len, offset); | |
294 return (static_cast<size_t>(ret) == buffer_len); | |
295 } | |
296 | |
297 bool File::Write(const void* buffer, size_t buffer_len, size_t offset) { | |
298 DCHECK(init_); | |
299 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) | |
300 return false; | |
301 | |
302 int ret = pwrite(platform_file_, buffer, buffer_len, offset); | |
303 return (static_cast<size_t>(ret) == buffer_len); | |
304 } | |
305 | |
306 // We have to increase the ref counter of the file before performing the IO to | |
307 // prevent the completion to happen with an invalid handle (if the file is | |
308 // closed while the IO is in flight). | |
309 bool File::Read(void* buffer, size_t buffer_len, size_t offset, | |
310 FileIOCallback* callback, bool* completed) { | |
311 DCHECK(init_); | |
312 if (!callback) { | |
313 if (completed) | |
314 *completed = true; | |
315 return Read(buffer, buffer_len, offset); | |
316 } | |
317 | |
318 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) | |
319 return false; | |
320 | |
321 InFlightIO* io_operations = Singleton<InFlightIO>::get(); | |
322 io_operations->PostRead(this, buffer, buffer_len, offset, callback); | |
323 | |
324 *completed = false; | |
325 return true; | |
326 } | |
327 | |
328 bool File::Write(const void* buffer, size_t buffer_len, size_t offset, | |
329 FileIOCallback* callback, bool* completed) { | |
330 DCHECK(init_); | |
331 if (!callback) { | |
332 if (completed) | |
333 *completed = true; | |
334 return Write(buffer, buffer_len, offset); | |
335 } | |
336 | |
337 return AsyncWrite(buffer, buffer_len, offset, true, callback, completed); | |
338 } | |
339 | |
340 bool File::PostWrite(const void* buffer, size_t buffer_len, size_t offset) { | |
341 DCHECK(init_); | |
342 return AsyncWrite(buffer, buffer_len, offset, false, NULL, NULL); | |
343 } | |
344 | |
345 bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset, | |
346 bool notify, FileIOCallback* callback, bool* completed) { | |
347 DCHECK(init_); | |
348 if (buffer_len > ULONG_MAX || offset > ULONG_MAX) | |
349 return false; | |
350 | |
351 InFlightIO* io_operations = Singleton<InFlightIO>::get(); | |
352 io_operations->PostWrite(this, buffer, buffer_len, offset, callback, !notify); | |
353 | |
354 if (completed) | |
355 *completed = false; | |
356 return true; | |
357 } | |
358 | |
359 bool File::SetLength(size_t length) { | |
360 DCHECK(init_); | |
361 if (length > ULONG_MAX) | |
362 return false; | |
363 | |
364 return 0 == ftruncate(platform_file_, length); | |
365 } | |
366 | |
367 size_t File::GetLength() { | |
368 DCHECK(init_); | |
369 size_t ret = lseek(platform_file_, 0, SEEK_END); | |
370 return ret; | |
371 } | |
372 | |
373 // Static. | |
374 void File::WaitForPendingIO(int* num_pending_io) { | |
375 if (*num_pending_io) | |
376 Singleton<InFlightIO>::get()->WaitForPendingIO(); | |
377 } | 62 } |
378 | 63 |
379 } // namespace disk_cache | 64 } // namespace disk_cache |
OLD | NEW |