Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1646)

Unified Diff: mojo/data_pipe_utils/data_pipe_file_utils.cc

Issue 1841863002: Update monet. (Closed) Base URL: https://github.com/domokit/monet.git@master
Patch Set: Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « mojo/data_pipe_utils/data_pipe_drainer.cc ('k') | mojo/data_pipe_utils/data_pipe_utils.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: mojo/data_pipe_utils/data_pipe_file_utils.cc
diff --git a/mojo/data_pipe_utils/data_pipe_file_utils.cc b/mojo/data_pipe_utils/data_pipe_file_utils.cc
new file mode 100644
index 0000000000000000000000000000000000000000..91f88c9e441d00eacbbb8c805c8f5c78f428309f
--- /dev/null
+++ b/mojo/data_pipe_utils/data_pipe_file_utils.cc
@@ -0,0 +1,364 @@
+// Copyright 2014 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "mojo/data_pipe_utils/data_pipe_utils.h"
+
+#include <stdio.h>
+
+#include <limits>
+
+#include "base/files/file.h"
+#include "base/files/file_path.h"
+#include "base/files/file_util.h"
+#include "base/files/scoped_file.h"
+#include "base/location.h"
+#include "base/trace_event/trace_event.h"
+#include "mojo/data_pipe_utils/data_pipe_utils_internal.h"
+#include "mojo/public/cpp/environment/async_waiter.h"
+
+namespace mojo {
+namespace common {
+namespace {
+
+class CopyToFileHandler {
+ public:
+ CopyToFileHandler(ScopedDataPipeConsumerHandle source,
+ const base::FilePath& destination,
+ base::TaskRunner* task_runner,
+ const base::Callback<void(bool)>& callback);
+
+ private:
+ ~CopyToFileHandler();
+
+ void SendCallback(bool value);
+ void OpenFile();
+ void OnHandleReady(MojoResult result);
+ void WriteToFile();
+
+ ScopedDataPipeConsumerHandle source_;
+ const base::FilePath destination_;
+ base::TaskRunner* file_task_runner_;
+ base::Callback<void(bool)> callback_;
+ base::File file_;
+ scoped_ptr<AsyncWaiter> waiter_;
+ const void* buffer_;
+ uint32_t buffer_size_;
+ scoped_refptr<base::SingleThreadTaskRunner> main_runner_;
+
+ DISALLOW_COPY_AND_ASSIGN(CopyToFileHandler);
+};
+
+CopyToFileHandler::CopyToFileHandler(ScopedDataPipeConsumerHandle source,
+ const base::FilePath& destination,
+ base::TaskRunner* task_runner,
+ const base::Callback<void(bool)>& callback)
+ : source_(source.Pass()),
+ destination_(destination),
+ file_task_runner_(task_runner),
+ callback_(callback),
+ buffer_(nullptr),
+ buffer_size_(0u),
+ main_runner_(base::MessageLoop::current()->task_runner()) {
+ TRACE_EVENT_ASYNC_BEGIN1("data_pipe_utils", "CopyToFile", this, "destination",
+ destination.MaybeAsASCII());
+ file_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&CopyToFileHandler::OpenFile, base::Unretained(this)));
+}
+
+CopyToFileHandler::~CopyToFileHandler() {
+ TRACE_EVENT_ASYNC_END0("data_pipe_utils", "CopyToFile", this);
+}
+
+void CopyToFileHandler::SendCallback(bool value) {
+ DCHECK(main_runner_->RunsTasksOnCurrentThread());
+ if (file_.IsValid()) {
+ // Need to close the file before calling the callback.
+ file_task_runner_->PostTaskAndReply(
+ FROM_HERE, base::Bind(&base::File::Close, base::Unretained(&file_)),
+ base::Bind(&CopyToFileHandler::SendCallback, base::Unretained(this),
+ value));
+ return;
+ }
+ base::Callback<void(bool)> callback = callback_;
+ delete this;
+ callback.Run(value);
+}
+
+void CopyToFileHandler::OpenFile() {
+ DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
+ file_.Initialize(destination_,
+ base::File::FLAG_CREATE_ALWAYS | base::File::FLAG_WRITE);
+ if (!file_.IsValid()) {
+ LOG(ERROR) << "Opening file '" << destination_.value()
+ << "' failed in CopyToFileHandler::OpenFile";
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyToFileHandler::SendCallback,
+ base::Unretained(this), false));
+ return;
+ }
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyToFileHandler::OnHandleReady,
+ base::Unretained(this), MOJO_RESULT_OK));
+}
+
+void CopyToFileHandler::OnHandleReady(MojoResult result) {
+ DCHECK(main_runner_->RunsTasksOnCurrentThread());
+ if (result == MOJO_RESULT_OK) {
+ result = BeginReadDataRaw(source_.get(), &buffer_, &buffer_size_,
+ MOJO_READ_DATA_FLAG_NONE);
+ if (result == MOJO_RESULT_OK) {
+ file_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&CopyToFileHandler::WriteToFile, base::Unretained(this)));
+ return;
+ }
+ }
+ if (result == MOJO_RESULT_FAILED_PRECONDITION) {
+ SendCallback(true);
+ return;
+ }
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ waiter_.reset(new AsyncWaiter(
+ source_.get(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&CopyToFileHandler::OnHandleReady, base::Unretained(this))));
+ return;
+ }
+ SendCallback(false);
+}
+
+void CopyToFileHandler::WriteToFile() {
+ DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
+ uint32_t num_bytes = buffer_size_;
+ size_t num_bytes_written =
+ file_.WriteAtCurrentPos(static_cast<const char*>(buffer_), num_bytes);
+ MojoResult result = EndReadDataRaw(source_.get(), num_bytes);
+ buffer_ = nullptr;
+ buffer_size_ = 0;
+ if (num_bytes_written != num_bytes) {
+ LOG(ERROR) << "Wrote fewer bytes (" << num_bytes_written
+ << ") than expected (" << num_bytes
+ << "), (pipe closed? out of disk space?)";
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyToFileHandler::SendCallback,
+ base::Unretained(this), false));
+ return;
+ }
+ if (result != MOJO_RESULT_OK) {
+ LOG(ERROR) << "EndReadDataRaw error (" << result << ")";
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyToFileHandler::SendCallback,
+ base::Unretained(this), false));
+ }
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyToFileHandler::OnHandleReady,
+ base::Unretained(this), result));
+}
+
+class CopyFromFileHandler {
+ public:
+ CopyFromFileHandler(const base::FilePath& source,
+ ScopedDataPipeProducerHandle destination,
+ uint32_t skip,
+ base::TaskRunner* task_runner,
+ const base::Callback<void(bool)>& callback);
+
+ private:
+ ~CopyFromFileHandler();
+
+ void SendCallback(bool value);
+ void OpenFile();
+ void OnHandleReady(MojoResult result);
+ void ReadFromFile();
+
+ const base::FilePath source_;
+ ScopedDataPipeProducerHandle destination_;
+ uint32_t skip_;
+ base::TaskRunner* file_task_runner_;
+ base::Callback<void(bool)> callback_;
+ base::File file_;
+ scoped_ptr<AsyncWaiter> waiter_;
+ void* buffer_;
+ uint32_t buffer_size_;
+ scoped_refptr<base::SingleThreadTaskRunner> main_runner_;
+
+ DISALLOW_COPY_AND_ASSIGN(CopyFromFileHandler);
+};
+
+CopyFromFileHandler::CopyFromFileHandler(
+ const base::FilePath& source,
+ ScopedDataPipeProducerHandle destination,
+ uint32_t skip,
+ base::TaskRunner* task_runner,
+ const base::Callback<void(bool)>& callback)
+ : source_(source),
+ destination_(destination.Pass()),
+ skip_(skip),
+ file_task_runner_(task_runner),
+ callback_(callback),
+ buffer_(nullptr),
+ buffer_size_(0u),
+ main_runner_(base::MessageLoop::current()->task_runner()) {
+ TRACE_EVENT_ASYNC_BEGIN1("data_pipe_utils", "CopyFromFile", this, "source",
+ source.MaybeAsASCII());
+ file_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&CopyFromFileHandler::OpenFile, base::Unretained(this)));
+}
+
+CopyFromFileHandler::~CopyFromFileHandler() {
+ TRACE_EVENT_ASYNC_END0("data_pipe_utils", "CopyFromFile", this);
+}
+
+void CopyFromFileHandler::SendCallback(bool value) {
+ DCHECK(main_runner_->RunsTasksOnCurrentThread());
+ if (file_.IsValid()) {
+ // Need to close the file before calling the callback.
+ file_task_runner_->PostTaskAndReply(
+ FROM_HERE, base::Bind(&base::File::Close, base::Unretained(&file_)),
+ base::Bind(&CopyFromFileHandler::SendCallback, base::Unretained(this),
+ value));
+ return;
+ }
+ base::Callback<void(bool)> callback = callback_;
+ delete this;
+ callback.Run(value);
+}
+
+void CopyFromFileHandler::OpenFile() {
+ DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
+ file_.Initialize(source_, base::File::FLAG_OPEN | base::File::FLAG_READ);
+ if (!file_.IsValid()) {
+ LOG(ERROR) << "Opening file '" << source_.value()
+ << "' failed in CopyFromFileHandler::OpenFile";
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::SendCallback,
+ base::Unretained(this), false));
+ return;
+ }
+ if (file_.Seek(base::File::FROM_BEGIN, skip_) != skip_) {
+ LOG(ERROR) << "Seek of " << skip_ << " failed";
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::SendCallback,
+ base::Unretained(this), false));
+ return;
+ }
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::OnHandleReady,
+ base::Unretained(this), MOJO_RESULT_OK));
+}
+
+void CopyFromFileHandler::OnHandleReady(MojoResult result) {
+ DCHECK(main_runner_->RunsTasksOnCurrentThread());
+ if (result == MOJO_RESULT_OK) {
+ result = BeginWriteDataRaw(destination_.get(), &buffer_, &buffer_size_,
+ MOJO_READ_DATA_FLAG_NONE);
+ if (result == MOJO_RESULT_OK) {
+ file_task_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::ReadFromFile,
+ base::Unretained(this)));
+
+ return;
+ }
+ }
+ if (result == MOJO_RESULT_SHOULD_WAIT) {
+ waiter_.reset(
+ new AsyncWaiter(destination_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
+ base::Bind(&CopyFromFileHandler::OnHandleReady,
+ base::Unretained(this))));
+ return;
+ }
+ SendCallback(false);
+}
+
+void CopyFromFileHandler::ReadFromFile() {
+ DCHECK(file_task_runner_->RunsTasksOnCurrentThread());
+ DCHECK_LT(buffer_size_,
+ static_cast<uint32_t>(std::numeric_limits<int>::max()));
+ int num_bytes = buffer_size_;
+ int num_bytes_read =
+ file_.ReadAtCurrentPos(static_cast<char*>(buffer_), num_bytes);
+ MojoResult result =
+ EndWriteDataRaw(destination_.get(), std::max(0, num_bytes_read));
+ buffer_ = nullptr;
+ buffer_size_ = 0;
+ if (num_bytes_read == -1) {
+ LOG(ERROR) << "Error while reading from file.";
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::SendCallback,
+ base::Unretained(this), false));
+ return;
+ }
+ if (result != MOJO_RESULT_OK) {
+ LOG(ERROR) << "EndWriteDataRaw error (" << result << ")";
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::SendCallback,
+ base::Unretained(this), false));
+ return;
+ }
+ if (num_bytes_read != num_bytes) {
+ // Reached EOF. Stop the process.
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::SendCallback,
+ base::Unretained(this), true));
+ return;
+ }
+ main_runner_->PostTask(FROM_HERE,
+ base::Bind(&CopyFromFileHandler::OnHandleReady,
+ base::Unretained(this), result));
+}
+
+size_t CopyToFileHelper(FILE* fp, const void* buffer, uint32_t num_bytes) {
+ return fwrite(buffer, 1, num_bytes, fp);
+}
+
+} // namespace
+
+base::ScopedFILE BlockingCopyToTempFile(ScopedDataPipeConsumerHandle source) {
+ base::FilePath path;
+ base::ScopedFILE fp(CreateAndOpenTemporaryFile(&path));
+ if (!fp) {
+ LOG(ERROR) << "CreateAndOpenTemporaryFile failed in"
+ << "BlockingCopyToTempFile";
+ return nullptr;
+ }
+ if (unlink(path.value().c_str())) {
+ LOG(ERROR) << "Failed to unlink temporary file";
+ return nullptr;
+ }
+ if (!BlockingCopyHelper(source.Pass(),
+ base::Bind(&CopyToFileHelper, fp.get()))) {
+ LOG(ERROR) << "Could not copy source to temporary file";
+ return nullptr;
+ }
+ return fp;
+}
+
+bool BlockingCopyToFile(ScopedDataPipeConsumerHandle source, FILE* fp) {
+ if (!BlockingCopyHelper(source.Pass(),
+ base::Bind(&CopyToFileHelper, fp))) {
+ LOG(ERROR) << "Could not copy source to file";
+ return false;
+ }
+ return true;
+}
+
+void CopyToFile(ScopedDataPipeConsumerHandle source,
+ const base::FilePath& destination,
+ base::TaskRunner* task_runner,
+ const base::Callback<void(bool)>& callback) {
+ new CopyToFileHandler(source.Pass(), destination, task_runner, callback);
+}
+
+void CopyFromFile(const base::FilePath& source,
+ ScopedDataPipeProducerHandle destination,
+ uint32_t skip,
+ base::TaskRunner* task_runner,
+ const base::Callback<void(bool)>& callback) {
+ new CopyFromFileHandler(source, destination.Pass(), skip, task_runner,
+ callback);
+}
+
+} // namespace common
+} // namespace mojo
« no previous file with comments | « mojo/data_pipe_utils/data_pipe_drainer.cc ('k') | mojo/data_pipe_utils/data_pipe_utils.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698