Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(730)

Side by Side Diff: mojo/data_pipe_utils/data_pipe_file_utils.cc

Issue 1841863002: Update monet. (Closed) Base URL: https://github.com/domokit/monet.git@master
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « mojo/data_pipe_utils/data_pipe_drainer.cc ('k') | mojo/data_pipe_utils/data_pipe_utils.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/data_pipe_utils/data_pipe_utils.h"
6
7 #include <stdio.h>
8
9 #include <limits>
10
11 #include "base/files/file.h"
12 #include "base/files/file_path.h"
13 #include "base/files/file_util.h"
14 #include "base/files/scoped_file.h"
15 #include "base/location.h"
16 #include "base/trace_event/trace_event.h"
17 #include "mojo/data_pipe_utils/data_pipe_utils_internal.h"
18 #include "mojo/public/cpp/environment/async_waiter.h"
19
20 namespace mojo {
21 namespace common {
22 namespace {
23
24 class CopyToFileHandler {
25 public:
26 CopyToFileHandler(ScopedDataPipeConsumerHandle source,
27 const base::FilePath& destination,
28 base::TaskRunner* task_runner,
29 const base::Callback<void(bool)>& callback);
30
31 private:
32 ~CopyToFileHandler();
33
34 void SendCallback(bool value);
35 void OpenFile();
36 void OnHandleReady(MojoResult result);
37 void WriteToFile();
38
39 ScopedDataPipeConsumerHandle source_;
40 const base::FilePath destination_;
41 base::TaskRunner* file_task_runner_;
42 base::Callback<void(bool)> callback_;
43 base::File file_;
44 scoped_ptr<AsyncWaiter> waiter_;
45 const void* buffer_;
46 uint32_t buffer_size_;
47 scoped_refptr<base::SingleThreadTaskRunner> main_runner_;
48
49 DISALLOW_COPY_AND_ASSIGN(CopyToFileHandler);
50 };
51
52 CopyToFileHandler::CopyToFileHandler(ScopedDataPipeConsumerHandle source,
53 const base::FilePath& destination,
54 base::TaskRunner* task_runner,
55 const base::Callback<void(bool)>& callback)
56 : source_(source.Pass()),
57 destination_(destination),
58 file_task_runner_(task_runner),
59 callback_(callback),
60 buffer_(nullptr),
61 buffer_size_(0u),
62 main_runner_(base::MessageLoop::current()->task_runner()) {
63 TRACE_EVENT_ASYNC_BEGIN1("data_pipe_utils", "CopyToFile", this, "destination",
64 destination.MaybeAsASCII());
65 file_task_runner_->PostTask(
66 FROM_HERE,
67 base::Bind(&CopyToFileHandler::OpenFile, base::Unretained(this)));
68 }
69
70 CopyToFileHandler::~CopyToFileHandler() {
71 TRACE_EVENT_ASYNC_END0("data_pipe_utils", "CopyToFile", this);
72 }
73
74 void CopyToFileHandler::SendCallback(bool value) {
75 DCHECK(main_runner_->RunsTasksOnCurrentThread());
76 if (file_.IsValid()) {
77 // Need to close the file before calling the callback.
78 file_task_runner_->PostTaskAndReply(
79 FROM_HERE, base::Bind(&base::File::Close, base::Unretained(&file_)),
80 base::Bind(&CopyToFileHandler::SendCallback, base::Unretained(this),
81 value));
82 return;
83 }
84 base::Callback<void(bool)> callback = callback_;
85 delete this;
86 callback.Run(value);
87 }
88
89 void CopyToFileHandler::OpenFile() {
90 DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
91 file_.Initialize(destination_,
92 base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE);
93 if (!file_.IsValid()) {
94 LOG(ERROR) << "Opening file '" << destination_.value()
95 << "' failed in CopyToFileHandler::OpenFile";
96 main_runner_->PostTask(FROM_HERE,
97 base::Bind(&CopyToFileHandler::SendCallback,
98 base::Unretained(this), false));
99 return;
100 }
101 main_runner_->PostTask(FROM_HERE,
102 base::Bind(&CopyToFileHandler::OnHandleReady,
103 base::Unretained(this), MOJO_RESULT_OK));
104 }
105
106 void CopyToFileHandler::OnHandleReady(MojoResult result) {
107 DCHECK(main_runner_->RunsTasksOnCurrentThread());
108 if (result == MOJO_RESULT_OK) {
109 result = BeginReadDataRaw(source_.get(), &buffer_, &buffer_size_,
110 MOJO_READ_DATA_FLAG_NONE);
111 if (result == MOJO_RESULT_OK) {
112 file_task_runner_->PostTask(
113 FROM_HERE,
114 base::Bind(&CopyToFileHandler::WriteToFile, base::Unretained(this)));
115 return;
116 }
117 }
118 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
119 SendCallback(true);
120 return;
121 }
122 if (result == MOJO_RESULT_SHOULD_WAIT) {
123 waiter_.reset(new AsyncWaiter(
124 source_.get(), MOJO_HANDLE_SIGNAL_READABLE,
125 base::Bind(&CopyToFileHandler::OnHandleReady, base::Unretained(this))));
126 return;
127 }
128 SendCallback(false);
129 }
130
131 void CopyToFileHandler::WriteToFile() {
132 DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
133 uint32_t num_bytes = buffer_size_;
134 size_t num_bytes_written =
135 file_.WriteAtCurrentPos(static_cast<const char*>(buffer_), num_bytes);
136 MojoResult result = EndReadDataRaw(source_.get(), num_bytes);
137 buffer_ = nullptr;
138 buffer_size_ = 0;
139 if (num_bytes_written != num_bytes) {
140 LOG(ERROR) << "Wrote fewer bytes (" << num_bytes_written
141 << ") than expected (" << num_bytes
142 << "), (pipe closed? out of disk space?)";
143 main_runner_->PostTask(FROM_HERE,
144 base::Bind(&CopyToFileHandler::SendCallback,
145 base::Unretained(this), false));
146 return;
147 }
148 if (result != MOJO_RESULT_OK) {
149 LOG(ERROR) << "EndReadDataRaw error (" << result << ")";
150 main_runner_->PostTask(FROM_HERE,
151 base::Bind(&CopyToFileHandler::SendCallback,
152 base::Unretained(this), false));
153 }
154 main_runner_->PostTask(FROM_HERE,
155 base::Bind(&CopyToFileHandler::OnHandleReady,
156 base::Unretained(this), result));
157 }
158
159 class CopyFromFileHandler {
160 public:
161 CopyFromFileHandler(const base::FilePath& source,
162 ScopedDataPipeProducerHandle destination,
163 uint32_t skip,
164 base::TaskRunner* task_runner,
165 const base::Callback<void(bool)>& callback);
166
167 private:
168 ~CopyFromFileHandler();
169
170 void SendCallback(bool value);
171 void OpenFile();
172 void OnHandleReady(MojoResult result);
173 void ReadFromFile();
174
175 const base::FilePath source_;
176 ScopedDataPipeProducerHandle destination_;
177 uint32_t skip_;
178 base::TaskRunner* file_task_runner_;
179 base::Callback<void(bool)> callback_;
180 base::File file_;
181 scoped_ptr<AsyncWaiter> waiter_;
182 void* buffer_;
183 uint32_t buffer_size_;
184 scoped_refptr<base::SingleThreadTaskRunner> main_runner_;
185
186 DISALLOW_COPY_AND_ASSIGN(CopyFromFileHandler);
187 };
188
189 CopyFromFileHandler::CopyFromFileHandler(
190 const base::FilePath& source,
191 ScopedDataPipeProducerHandle destination,
192 uint32_t skip,
193 base::TaskRunner* task_runner,
194 const base::Callback<void(bool)>& callback)
195 : source_(source),
196 destination_(destination.Pass()),
197 skip_(skip),
198 file_task_runner_(task_runner),
199 callback_(callback),
200 buffer_(nullptr),
201 buffer_size_(0u),
202 main_runner_(base::MessageLoop::current()->task_runner()) {
203 TRACE_EVENT_ASYNC_BEGIN1("data_pipe_utils", "CopyFromFile", this, "source",
204 source.MaybeAsASCII());
205 file_task_runner_->PostTask(
206 FROM_HERE,
207 base::Bind(&CopyFromFileHandler::OpenFile, base::Unretained(this)));
208 }
209
210 CopyFromFileHandler::~CopyFromFileHandler() {
211 TRACE_EVENT_ASYNC_END0("data_pipe_utils", "CopyFromFile", this);
212 }
213
214 void CopyFromFileHandler::SendCallback(bool value) {
215 DCHECK(main_runner_->RunsTasksOnCurrentThread());
216 if (file_.IsValid()) {
217 // Need to close the file before calling the callback.
218 file_task_runner_->PostTaskAndReply(
219 FROM_HERE, base::Bind(&base::File::Close, base::Unretained(&file_)),
220 base::Bind(&CopyFromFileHandler::SendCallback, base::Unretained(this),
221 value));
222 return;
223 }
224 base::Callback<void(bool)> callback = callback_;
225 delete this;
226 callback.Run(value);
227 }
228
229 void CopyFromFileHandler::OpenFile() {
230 DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
231 file_.Initialize(source_, base::File::FLAG_OPEN | base::File::FLAG_READ);
232 if (!file_.IsValid()) {
233 LOG(ERROR) << "Opening file '" << source_.value()
234 << "' failed in CopyFromFileHandler::OpenFile";
235 main_runner_->PostTask(FROM_HERE,
236 base::Bind(&CopyFromFileHandler::SendCallback,
237 base::Unretained(this), false));
238 return;
239 }
240 if (file_.Seek(base::File::FROM_BEGIN, skip_) != skip_) {
241 LOG(ERROR) << "Seek of " << skip_ << " failed";
242 main_runner_->PostTask(FROM_HERE,
243 base::Bind(&CopyFromFileHandler::SendCallback,
244 base::Unretained(this), false));
245 return;
246 }
247 main_runner_->PostTask(FROM_HERE,
248 base::Bind(&CopyFromFileHandler::OnHandleReady,
249 base::Unretained(this), MOJO_RESULT_OK));
250 }
251
252 void CopyFromFileHandler::OnHandleReady(MojoResult result) {
253 DCHECK(main_runner_->RunsTasksOnCurrentThread());
254 if (result == MOJO_RESULT_OK) {
255 result = BeginWriteDataRaw(destination_.get(), &buffer_, &buffer_size_,
256 MOJO_READ_DATA_FLAG_NONE);
257 if (result == MOJO_RESULT_OK) {
258 file_task_runner_->PostTask(FROM_HERE,
259 base::Bind(&CopyFromFileHandler::ReadFromFile,
260 base::Unretained(this)));
261
262 return;
263 }
264 }
265 if (result == MOJO_RESULT_SHOULD_WAIT) {
266 waiter_.reset(
267 new AsyncWaiter(destination_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
268 base::Bind(&CopyFromFileHandler::OnHandleReady,
269 base::Unretained(this))));
270 return;
271 }
272 SendCallback(false);
273 }
274
275 void CopyFromFileHandler::ReadFromFile() {
276 DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
277 DCHECK_LT(buffer_size_,
278 static_cast<uint32_t>(std::numeric_limits<int>::max()));
279 int num_bytes = buffer_size_;
280 int num_bytes_read =
281 file_.ReadAtCurrentPos(static_cast<char*>(buffer_), num_bytes);
282 MojoResult result =
283 EndWriteDataRaw(destination_.get(), std::max(0, num_bytes_read));
284 buffer_ = nullptr;
285 buffer_size_ = 0;
286 if (num_bytes_read == -1) {
287 LOG(ERROR) << "Error while reading from file.";
288 main_runner_->PostTask(FROM_HERE,
289 base::Bind(&CopyFromFileHandler::SendCallback,
290 base::Unretained(this), false));
291 return;
292 }
293 if (result != MOJO_RESULT_OK) {
294 LOG(ERROR) << "EndWriteDataRaw error (" << result << ")";
295 main_runner_->PostTask(FROM_HERE,
296 base::Bind(&CopyFromFileHandler::SendCallback,
297 base::Unretained(this), false));
298 return;
299 }
300 if (num_bytes_read != num_bytes) {
301 // Reached EOF. Stop the process.
302 main_runner_->PostTask(FROM_HERE,
303 base::Bind(&CopyFromFileHandler::SendCallback,
304 base::Unretained(this), true));
305 return;
306 }
307 main_runner_->PostTask(FROM_HERE,
308 base::Bind(&CopyFromFileHandler::OnHandleReady,
309 base::Unretained(this), result));
310 }
311
312 size_t CopyToFileHelper(FILE* fp, const void* buffer, uint32_t num_bytes) {
313 return fwrite(buffer, 1, num_bytes, fp);
314 }
315
316 } // namespace
317
318 base::ScopedFILE BlockingCopyToTempFile(ScopedDataPipeConsumerHandle source) {
319 base::FilePath path;
320 base::ScopedFILE fp(CreateAndOpenTemporaryFile(&path));
321 if (!fp) {
322 LOG(ERROR) << "CreateAndOpenTemporaryFile failed in"
323 << "BlockingCopyToTempFile";
324 return nullptr;
325 }
326 if (unlink(path.value().c_str())) {
327 LOG(ERROR) << "Failed to unlink temporary file";
328 return nullptr;
329 }
330 if (!BlockingCopyHelper(source.Pass(),
331 base::Bind(&CopyToFileHelper, fp.get()))) {
332 LOG(ERROR) << "Could not copy source to temporary file";
333 return nullptr;
334 }
335 return fp;
336 }
337
338 bool BlockingCopyToFile(ScopedDataPipeConsumerHandle source, FILE* fp) {
339 if (!BlockingCopyHelper(source.Pass(),
340 base::Bind(&CopyToFileHelper, fp))) {
341 LOG(ERROR) << "Could not copy source to file";
342 return false;
343 }
344 return true;
345 }
346
347 void CopyToFile(ScopedDataPipeConsumerHandle source,
348 const base::FilePath& destination,
349 base::TaskRunner* task_runner,
350 const base::Callback<void(bool)>& callback) {
351 new CopyToFileHandler(source.Pass(), destination, task_runner, callback);
352 }
353
354 void CopyFromFile(const base::FilePath& source,
355 ScopedDataPipeProducerHandle destination,
356 uint32_t skip,
357 base::TaskRunner* task_runner,
358 const base::Callback<void(bool)>& callback) {
359 new CopyFromFileHandler(source, destination.Pass(), skip, task_runner,
360 callback);
361 }
362
363 } // namespace common
364 } // namespace mojo
OLDNEW
« no previous file with comments | « mojo/data_pipe_utils/data_pipe_drainer.cc ('k') | mojo/data_pipe_utils/data_pipe_utils.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698