Index: filesystem_copier_action.cc |
diff --git a/filesystem_copier_action.cc b/filesystem_copier_action.cc |
index 1452b4dbffed38399e04f39d462a6b5313d139b0..f3f71bdbee4b107ddfc7ccba92e1e68cb168da91 100755 |
--- a/filesystem_copier_action.cc |
+++ b/filesystem_copier_action.cc |
@@ -32,9 +32,32 @@ using std::vector; |
namespace chromeos_update_engine { |
namespace { |
-const off_t kCopyFileBufferSize = 2 * 1024 * 1024; |
+const off_t kCopyFileBufferSize = 512 * 1024; |
} // namespace {} |
+FilesystemCopierAction::FilesystemCopierAction( |
+ bool copying_kernel_install_path) |
+ : copying_kernel_install_path_(copying_kernel_install_path), |
+ src_stream_(NULL), |
+ dst_stream_(NULL), |
+ read_done_(false), |
+ failed_(false), |
+ cancelled_(false), |
+ filesystem_size_(kint64max) { |
+ // A lot of code works on the implicit assumption that processing is done on |
+ // exactly 2 ping-pong buffers. |
+ COMPILE_ASSERT(arraysize(buffer_) == 2 && |
+ arraysize(buffer_state_) == 2 && |
+ arraysize(buffer_valid_size_) == 2 && |
+ arraysize(canceller_) == 2, |
+ ping_pong_buffers_not_two); |
+ for (int i = 0; i < 2; ++i) { |
+ buffer_state_[i] = kBufferStateEmpty; |
+ buffer_valid_size_[i] = 0; |
+ canceller_[i] = NULL; |
+ } |
+} |
+ |
void FilesystemCopierAction::PerformAction() { |
// Will tell the ActionProcessor we've failed if we return. |
ScopedActionCompleter abort_action_completer(processor_, this); |
@@ -84,35 +107,35 @@ void FilesystemCopierAction::PerformAction() { |
src_stream_ = g_unix_input_stream_new(src_fd, TRUE); |
dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE); |
- buffer_.resize(kCopyFileBufferSize); |
- |
- // Set up the first read |
- canceller_ = g_cancellable_new(); |
+ for (int i = 0; i < 2; i++) { |
+ buffer_[i].resize(kCopyFileBufferSize); |
+ canceller_[i] = g_cancellable_new(); |
+ } |
- g_input_stream_read_async(src_stream_, |
- &buffer_[0], |
- GetBytesToRead(), |
- G_PRIORITY_DEFAULT, |
- canceller_, |
- &FilesystemCopierAction::StaticAsyncReadyCallback, |
- this); |
- read_in_flight_ = true; |
+ // Start the first read. |
+ SpawnAsyncActions(); |
abort_action_completer.set_should_complete(false); |
} |
void FilesystemCopierAction::TerminateProcessing() { |
- if (canceller_) { |
- g_cancellable_cancel(canceller_); |
+ for (int i = 0; i < 2; i++) { |
+ if (canceller_[i]) { |
+ g_cancellable_cancel(canceller_[i]); |
+ } |
} |
} |
-void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) { |
+void FilesystemCopierAction::Cleanup(bool success) { |
+ for (int i = 0; i < 2; i++) { |
+ g_object_unref(canceller_[i]); |
+ canceller_[i] = NULL; |
+ } |
g_object_unref(src_stream_); |
src_stream_ = NULL; |
g_object_unref(dst_stream_); |
dst_stream_ = NULL; |
- if (was_cancelled) |
+ if (cancelled_) |
return; |
if (success && HasOutputPipe()) |
SetOutputObject(install_plan_); |
@@ -121,85 +144,141 @@ void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) { |
success ? kActionCodeSuccess : kActionCodeError); |
} |
-void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object, |
- GAsyncResult *res) { |
+void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object, |
+ GAsyncResult *res) { |
+ int index = buffer_state_[0] == kBufferStateReading ? 0 : 1; |
+ CHECK(buffer_state_[index] == kBufferStateReading); |
+ |
GError* error = NULL; |
- CHECK(canceller_); |
- bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE; |
- g_object_unref(canceller_); |
- canceller_ = NULL; |
- |
- if (read_in_flight_) { |
- ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); |
- if (bytes_read < 0) { |
- LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error); |
- Cleanup(false, was_cancelled); |
- return; |
- } |
+ CHECK(canceller_[index]); |
+ cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE; |
- if (bytes_read == 0) { |
- // We're done! |
- if (!hasher_.Finalize()) { |
- LOG(ERROR) << "Unable to finalize the hash."; |
- Cleanup(false, was_cancelled); |
- return; |
- } |
- LOG(INFO) << "hash: " << hasher_.hash(); |
- if (copying_kernel_install_path_) { |
- install_plan_.current_kernel_hash = hasher_.raw_hash(); |
- } else { |
- install_plan_.current_rootfs_hash = hasher_.raw_hash(); |
- } |
- Cleanup(true, was_cancelled); |
- return; |
- } |
- if (!hasher_.Update(buffer_.data(), bytes_read)) { |
- LOG(ERROR) << "Unable to update the hash."; |
- Cleanup(false, was_cancelled); |
- return; |
- } |
+ ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); |
+ if (bytes_read < 0) { |
+ LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error); |
+ failed_ = true; |
+ buffer_state_[index] = kBufferStateEmpty; |
+ } else if (bytes_read == 0) { |
+ read_done_ = true; |
+ buffer_state_[index] = kBufferStateEmpty; |
+ } else { |
+ buffer_valid_size_[index] = bytes_read; |
+ buffer_state_[index] = kBufferStateFull; |
+ } |
+ |
+ if (bytes_read > 0) { |
filesystem_size_ -= bytes_read; |
+ } |
- // Kick off a write |
- read_in_flight_ = false; |
- buffer_valid_size_ = bytes_read; |
- canceller_ = g_cancellable_new(); |
- g_output_stream_write_async( |
- dst_stream_, |
- &buffer_[0], |
- bytes_read, |
- G_PRIORITY_DEFAULT, |
- canceller_, |
- &FilesystemCopierAction::StaticAsyncReadyCallback, |
- this); |
- return; |
+ SpawnAsyncActions(); |
+ |
+ if (bytes_read > 0) { |
+ if (!hasher_.Update(buffer_[index].data(), bytes_read)) { |
+ LOG(ERROR) << "Unable to update the hash."; |
+ failed_ = true; |
+ } |
} |
+} |
+ |
+void FilesystemCopierAction::StaticAsyncReadReadyCallback( |
+ GObject *source_object, |
+ GAsyncResult *res, |
+ gpointer user_data) { |
+ reinterpret_cast<FilesystemCopierAction*>(user_data)-> |
+ AsyncReadReadyCallback(source_object, res); |
+} |
+ |
+void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object, |
+ GAsyncResult *res) { |
+ int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1; |
+ CHECK(buffer_state_[index] == kBufferStateWriting); |
+ buffer_state_[index] = kBufferStateEmpty; |
+ |
+ GError* error = NULL; |
+ CHECK(canceller_[index]); |
+ cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE; |
ssize_t bytes_written = g_output_stream_write_finish(dst_stream_, |
res, |
&error); |
- if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) { |
+ if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) { |
if (bytes_written < 0) { |
- LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error); |
+ LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error); |
} else { |
LOG(ERROR) << "Write was short: wrote " << bytes_written |
- << " but expected to write " << buffer_valid_size_; |
+ << " but expected to write " << buffer_valid_size_[index]; |
} |
- Cleanup(false, was_cancelled); |
- return; |
+ failed_ = true; |
} |
- // Kick off a read |
- read_in_flight_ = true; |
- canceller_ = g_cancellable_new(); |
- g_input_stream_read_async( |
- src_stream_, |
- &buffer_[0], |
- GetBytesToRead(), |
- G_PRIORITY_DEFAULT, |
- canceller_, |
- &FilesystemCopierAction::StaticAsyncReadyCallback, |
- this); |
+ SpawnAsyncActions(); |
+} |
+ |
+void FilesystemCopierAction::StaticAsyncWriteReadyCallback( |
+ GObject *source_object, |
+ GAsyncResult *res, |
+ gpointer user_data) { |
+ reinterpret_cast<FilesystemCopierAction*>(user_data)-> |
+ AsyncWriteReadyCallback(source_object, res); |
+} |
+ |
+void FilesystemCopierAction::SpawnAsyncActions() { |
+ bool reading = false; |
+ bool writing = false; |
+ for (int i = 0; i < 2; i++) { |
+ if (buffer_state_[i] == kBufferStateReading) { |
+ reading = true; |
+ } |
+ if (buffer_state_[i] == kBufferStateWriting) { |
+ writing = true; |
+ } |
+ } |
+ if (failed_ || cancelled_) { |
+ if (!reading && !writing) { |
+ Cleanup(false); |
+ } |
+ return; |
+ } |
+ for (int i = 0; i < 2; i++) { |
+ if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) { |
+ g_input_stream_read_async( |
+ src_stream_, |
+ buffer_[i].data(), |
+ GetBytesToRead(), |
+ G_PRIORITY_DEFAULT, |
+ canceller_[i], |
+ &FilesystemCopierAction::StaticAsyncReadReadyCallback, |
+ this); |
+ reading = true; |
+ buffer_state_[i] = kBufferStateReading; |
+ } else if (!writing && buffer_state_[i] == kBufferStateFull) { |
+ g_output_stream_write_async( |
+ dst_stream_, |
+ buffer_[i].data(), |
+ buffer_valid_size_[i], |
+ G_PRIORITY_DEFAULT, |
+ canceller_[i], |
+ &FilesystemCopierAction::StaticAsyncWriteReadyCallback, |
+ this); |
+ writing = true; |
+ buffer_state_[i] = kBufferStateWriting; |
+ } |
+ } |
+ if (!reading && !writing) { |
+ // We're done! |
+ if (hasher_.Finalize()) { |
+ LOG(INFO) << "hash: " << hasher_.hash(); |
+ if (copying_kernel_install_path_) { |
+ install_plan_.current_kernel_hash = hasher_.raw_hash(); |
+ } else { |
+ install_plan_.current_rootfs_hash = hasher_.raw_hash(); |
+ } |
+ Cleanup(true); |
+ } else { |
+ LOG(ERROR) << "Unable to finalize the hash."; |
+ Cleanup(false); |
+ } |
+ } |
} |
void FilesystemCopierAction::DetermineFilesystemSize(int fd) { |
@@ -214,7 +293,7 @@ void FilesystemCopierAction::DetermineFilesystemSize(int fd) { |
} |
int64_t FilesystemCopierAction::GetBytesToRead() { |
- return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_); |
+ return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_); |
} |
} // namespace chromeos_update_engine |