Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(239)

Unified Diff: filesystem_copier_action.cc

Issue 5603001: Restructure FilesystemCopierAction to use ping-pong buffers. (Closed) Base URL: ssh://git@gitrw.chromium.org:9222/update_engine.git@master
Patch Set: review comments Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « filesystem_copier_action.h ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: filesystem_copier_action.cc
diff --git a/filesystem_copier_action.cc b/filesystem_copier_action.cc
index 1452b4dbffed38399e04f39d462a6b5313d139b0..f3f71bdbee4b107ddfc7ccba92e1e68cb168da91 100755
--- a/filesystem_copier_action.cc
+++ b/filesystem_copier_action.cc
@@ -32,9 +32,32 @@ using std::vector;
namespace chromeos_update_engine {
namespace {
-const off_t kCopyFileBufferSize = 2 * 1024 * 1024;
+const off_t kCopyFileBufferSize = 512 * 1024;
} // namespace {}
+FilesystemCopierAction::FilesystemCopierAction(
+ bool copying_kernel_install_path)
+ : copying_kernel_install_path_(copying_kernel_install_path),
+ src_stream_(NULL),
+ dst_stream_(NULL),
+ read_done_(false),
+ failed_(false),
+ cancelled_(false),
+ filesystem_size_(kint64max) {
+ // A lot of code works on the implicit assumption that processing is done on
+ // exactly 2 ping-pong buffers.
+ COMPILE_ASSERT(arraysize(buffer_) == 2 &&
+ arraysize(buffer_state_) == 2 &&
+ arraysize(buffer_valid_size_) == 2 &&
+ arraysize(canceller_) == 2,
+ ping_pong_buffers_not_two);
+ for (int i = 0; i < 2; ++i) {
+ buffer_state_[i] = kBufferStateEmpty;
+ buffer_valid_size_[i] = 0;
+ canceller_[i] = NULL;
+ }
+}
+
void FilesystemCopierAction::PerformAction() {
// Will tell the ActionProcessor we've failed if we return.
ScopedActionCompleter abort_action_completer(processor_, this);
@@ -84,35 +107,35 @@ void FilesystemCopierAction::PerformAction() {
src_stream_ = g_unix_input_stream_new(src_fd, TRUE);
dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE);
- buffer_.resize(kCopyFileBufferSize);
-
- // Set up the first read
- canceller_ = g_cancellable_new();
+ for (int i = 0; i < 2; i++) {
+ buffer_[i].resize(kCopyFileBufferSize);
+ canceller_[i] = g_cancellable_new();
+ }
- g_input_stream_read_async(src_stream_,
- &buffer_[0],
- GetBytesToRead(),
- G_PRIORITY_DEFAULT,
- canceller_,
- &FilesystemCopierAction::StaticAsyncReadyCallback,
- this);
- read_in_flight_ = true;
+ // Start the first read.
+ SpawnAsyncActions();
abort_action_completer.set_should_complete(false);
}
void FilesystemCopierAction::TerminateProcessing() {
- if (canceller_) {
- g_cancellable_cancel(canceller_);
+ for (int i = 0; i < 2; i++) {
+ if (canceller_[i]) {
+ g_cancellable_cancel(canceller_[i]);
+ }
}
}
-void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) {
+void FilesystemCopierAction::Cleanup(bool success) {
+ for (int i = 0; i < 2; i++) {
+ g_object_unref(canceller_[i]);
+ canceller_[i] = NULL;
+ }
g_object_unref(src_stream_);
src_stream_ = NULL;
g_object_unref(dst_stream_);
dst_stream_ = NULL;
- if (was_cancelled)
+ if (cancelled_)
return;
if (success && HasOutputPipe())
SetOutputObject(install_plan_);
@@ -121,85 +144,141 @@ void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) {
success ? kActionCodeSuccess : kActionCodeError);
}
-void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object,
- GAsyncResult *res) {
+void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object,
+ GAsyncResult *res) {
+ int index = buffer_state_[0] == kBufferStateReading ? 0 : 1;
+ CHECK(buffer_state_[index] == kBufferStateReading);
+
GError* error = NULL;
- CHECK(canceller_);
- bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE;
- g_object_unref(canceller_);
- canceller_ = NULL;
-
- if (read_in_flight_) {
- ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
- if (bytes_read < 0) {
- LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error);
- Cleanup(false, was_cancelled);
- return;
- }
+ CHECK(canceller_[index]);
+ cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
- if (bytes_read == 0) {
- // We're done!
- if (!hasher_.Finalize()) {
- LOG(ERROR) << "Unable to finalize the hash.";
- Cleanup(false, was_cancelled);
- return;
- }
- LOG(INFO) << "hash: " << hasher_.hash();
- if (copying_kernel_install_path_) {
- install_plan_.current_kernel_hash = hasher_.raw_hash();
- } else {
- install_plan_.current_rootfs_hash = hasher_.raw_hash();
- }
- Cleanup(true, was_cancelled);
- return;
- }
- if (!hasher_.Update(buffer_.data(), bytes_read)) {
- LOG(ERROR) << "Unable to update the hash.";
- Cleanup(false, was_cancelled);
- return;
- }
+ ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
+ if (bytes_read < 0) {
+ LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error);
+ failed_ = true;
+ buffer_state_[index] = kBufferStateEmpty;
+ } else if (bytes_read == 0) {
+ read_done_ = true;
+ buffer_state_[index] = kBufferStateEmpty;
+ } else {
+ buffer_valid_size_[index] = bytes_read;
+ buffer_state_[index] = kBufferStateFull;
+ }
+
+ if (bytes_read > 0) {
filesystem_size_ -= bytes_read;
+ }
- // Kick off a write
- read_in_flight_ = false;
- buffer_valid_size_ = bytes_read;
- canceller_ = g_cancellable_new();
- g_output_stream_write_async(
- dst_stream_,
- &buffer_[0],
- bytes_read,
- G_PRIORITY_DEFAULT,
- canceller_,
- &FilesystemCopierAction::StaticAsyncReadyCallback,
- this);
- return;
+ SpawnAsyncActions();
+
+ if (bytes_read > 0) {
+ if (!hasher_.Update(buffer_[index].data(), bytes_read)) {
+ LOG(ERROR) << "Unable to update the hash.";
+ failed_ = true;
+ }
}
+}
+
+void FilesystemCopierAction::StaticAsyncReadReadyCallback(
+ GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data) {
+ reinterpret_cast<FilesystemCopierAction*>(user_data)->
+ AsyncReadReadyCallback(source_object, res);
+}
+
+void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object,
+ GAsyncResult *res) {
+ int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1;
+ CHECK(buffer_state_[index] == kBufferStateWriting);
+ buffer_state_[index] = kBufferStateEmpty;
+
+ GError* error = NULL;
+ CHECK(canceller_[index]);
+ cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
res,
&error);
- if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) {
+ if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) {
if (bytes_written < 0) {
- LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error);
+ LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error);
} else {
LOG(ERROR) << "Write was short: wrote " << bytes_written
- << " but expected to write " << buffer_valid_size_;
+ << " but expected to write " << buffer_valid_size_[index];
}
- Cleanup(false, was_cancelled);
- return;
+ failed_ = true;
}
- // Kick off a read
- read_in_flight_ = true;
- canceller_ = g_cancellable_new();
- g_input_stream_read_async(
- src_stream_,
- &buffer_[0],
- GetBytesToRead(),
- G_PRIORITY_DEFAULT,
- canceller_,
- &FilesystemCopierAction::StaticAsyncReadyCallback,
- this);
+ SpawnAsyncActions();
+}
+
+void FilesystemCopierAction::StaticAsyncWriteReadyCallback(
+ GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data) {
+ reinterpret_cast<FilesystemCopierAction*>(user_data)->
+ AsyncWriteReadyCallback(source_object, res);
+}
+
+void FilesystemCopierAction::SpawnAsyncActions() {
+ bool reading = false;
+ bool writing = false;
+ for (int i = 0; i < 2; i++) {
+ if (buffer_state_[i] == kBufferStateReading) {
+ reading = true;
+ }
+ if (buffer_state_[i] == kBufferStateWriting) {
+ writing = true;
+ }
+ }
+ if (failed_ || cancelled_) {
+ if (!reading && !writing) {
+ Cleanup(false);
+ }
+ return;
+ }
+ for (int i = 0; i < 2; i++) {
+ if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) {
+ g_input_stream_read_async(
+ src_stream_,
+ buffer_[i].data(),
+ GetBytesToRead(),
+ G_PRIORITY_DEFAULT,
+ canceller_[i],
+ &FilesystemCopierAction::StaticAsyncReadReadyCallback,
+ this);
+ reading = true;
+ buffer_state_[i] = kBufferStateReading;
+ } else if (!writing && buffer_state_[i] == kBufferStateFull) {
+ g_output_stream_write_async(
+ dst_stream_,
+ buffer_[i].data(),
+ buffer_valid_size_[i],
+ G_PRIORITY_DEFAULT,
+ canceller_[i],
+ &FilesystemCopierAction::StaticAsyncWriteReadyCallback,
+ this);
+ writing = true;
+ buffer_state_[i] = kBufferStateWriting;
+ }
+ }
+ if (!reading && !writing) {
+ // We're done!
+ if (hasher_.Finalize()) {
+ LOG(INFO) << "hash: " << hasher_.hash();
+ if (copying_kernel_install_path_) {
+ install_plan_.current_kernel_hash = hasher_.raw_hash();
+ } else {
+ install_plan_.current_rootfs_hash = hasher_.raw_hash();
+ }
+ Cleanup(true);
+ } else {
+ LOG(ERROR) << "Unable to finalize the hash.";
+ Cleanup(false);
+ }
+ }
}
void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
@@ -214,7 +293,7 @@ void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
}
int64_t FilesystemCopierAction::GetBytesToRead() {
- return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_);
+ return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_);
}
} // namespace chromeos_update_engine
« no previous file with comments | « filesystem_copier_action.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698