Chromium Code Reviews| Index: filesystem_copier_action.cc |
| diff --git a/filesystem_copier_action.cc b/filesystem_copier_action.cc |
| index 1452b4dbffed38399e04f39d462a6b5313d139b0..6f83a85842ded270ae5a3b35e0c76affae5d00a0 100755 |
| --- a/filesystem_copier_action.cc |
| +++ b/filesystem_copier_action.cc |
| @@ -32,9 +32,25 @@ using std::vector; |
| namespace chromeos_update_engine { |
| namespace { |
| -const off_t kCopyFileBufferSize = 2 * 1024 * 1024; |
| +const off_t kCopyFileBufferSize = 512 * 1024; |
| } // namespace {} |
| +FilesystemCopierAction::FilesystemCopierAction( |
| + bool copying_kernel_install_path) |
| + : copying_kernel_install_path_(copying_kernel_install_path), |
| + src_stream_(NULL), |
| + dst_stream_(NULL), |
| + read_done_(false), |
| + failed_(false), |
| + cancelled_(false), |
| + filesystem_size_(kint64max) { |
| + for (int i = 0; i < 2; ++i) { |
|
adlr
2010/12/02 22:12:14
s/2/arraysize(buffer_state_)/?
petkov
2010/12/02 22:39:40
Added a COMPILE_ASSERT and a comment. A lot of cod
|
| + buffer_state_[i] = kBufferStateEmpty; |
|
adlr
2010/12/02 22:12:14
i suppose you can't just use the initialization li
petkov
2010/12/02 22:39:40
I didn't quite figure out if this can be done unle
|
| + buffer_valid_size_[i] = 0; |
| + canceller_[i] = NULL; |
| + } |
| +} |
| + |
| void FilesystemCopierAction::PerformAction() { |
| // Will tell the ActionProcessor we've failed if we return. |
| ScopedActionCompleter abort_action_completer(processor_, this); |
| @@ -84,35 +100,35 @@ void FilesystemCopierAction::PerformAction() { |
| src_stream_ = g_unix_input_stream_new(src_fd, TRUE); |
| dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE); |
| - buffer_.resize(kCopyFileBufferSize); |
| - |
| - // Set up the first read |
| - canceller_ = g_cancellable_new(); |
| + for (int i = 0; i < 2; i++) { |
| + buffer_[i].resize(kCopyFileBufferSize); |
| + canceller_[i] = g_cancellable_new(); |
| + } |
| - g_input_stream_read_async(src_stream_, |
| - &buffer_[0], |
| - GetBytesToRead(), |
| - G_PRIORITY_DEFAULT, |
| - canceller_, |
| - &FilesystemCopierAction::StaticAsyncReadyCallback, |
| - this); |
| - read_in_flight_ = true; |
| + // Start the first read. |
| + SpawnAsyncActions(); |
| abort_action_completer.set_should_complete(false); |
| } |
| void FilesystemCopierAction::TerminateProcessing() { |
| - if (canceller_) { |
| - g_cancellable_cancel(canceller_); |
| + for (int i = 0; i < 2; i++) { |
| + if (canceller_[i]) { |
| + g_cancellable_cancel(canceller_[i]); |
| + } |
| } |
| } |
| -void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) { |
| +void FilesystemCopierAction::Cleanup(bool success) { |
| + for (int i = 0; i < 2; i++) { |
| + g_object_unref(canceller_[i]); |
| + canceller_[i] = NULL; |
| + } |
| g_object_unref(src_stream_); |
| src_stream_ = NULL; |
| g_object_unref(dst_stream_); |
| dst_stream_ = NULL; |
| - if (was_cancelled) |
| + if (cancelled_) |
| return; |
| if (success && HasOutputPipe()) |
| SetOutputObject(install_plan_); |
| @@ -121,85 +137,141 @@ void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) { |
| success ? kActionCodeSuccess : kActionCodeError); |
| } |
| -void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object, |
| - GAsyncResult *res) { |
| +void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object, |
| + GAsyncResult *res) { |
| + int index = buffer_state_[0] == kBufferStateReading ? 0 : 1; |
| + CHECK(buffer_state_[index] == kBufferStateReading); |
| + |
| GError* error = NULL; |
| - CHECK(canceller_); |
| - bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE; |
| - g_object_unref(canceller_); |
| - canceller_ = NULL; |
| - |
| - if (read_in_flight_) { |
| - ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); |
| - if (bytes_read < 0) { |
| - LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error); |
| - Cleanup(false, was_cancelled); |
| - return; |
| - } |
| + CHECK(canceller_[index]); |
| + cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE; |
| - if (bytes_read == 0) { |
| - // We're done! |
| - if (!hasher_.Finalize()) { |
| - LOG(ERROR) << "Unable to finalize the hash."; |
| - Cleanup(false, was_cancelled); |
| - return; |
| - } |
| - LOG(INFO) << "hash: " << hasher_.hash(); |
| - if (copying_kernel_install_path_) { |
| - install_plan_.current_kernel_hash = hasher_.raw_hash(); |
| - } else { |
| - install_plan_.current_rootfs_hash = hasher_.raw_hash(); |
| - } |
| - Cleanup(true, was_cancelled); |
| - return; |
| - } |
| - if (!hasher_.Update(buffer_.data(), bytes_read)) { |
| - LOG(ERROR) << "Unable to update the hash."; |
| - Cleanup(false, was_cancelled); |
| - return; |
| - } |
| + ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); |
| + if (bytes_read < 0) { |
| + LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error); |
| + failed_ = true; |
| + buffer_state_[index] = kBufferStateEmpty; |
| + } else if (bytes_read == 0) { |
| + read_done_ = true; |
| + buffer_state_[index] = kBufferStateEmpty; |
| + } else { |
| + buffer_valid_size_[index] = bytes_read; |
| + buffer_state_[index] = kBufferStateFull; |
| + } |
| + |
| + if (bytes_read > 0) { |
| filesystem_size_ -= bytes_read; |
| + } |
| - // Kick off a write |
| - read_in_flight_ = false; |
| - buffer_valid_size_ = bytes_read; |
| - canceller_ = g_cancellable_new(); |
| - g_output_stream_write_async( |
| - dst_stream_, |
| - &buffer_[0], |
| - bytes_read, |
| - G_PRIORITY_DEFAULT, |
| - canceller_, |
| - &FilesystemCopierAction::StaticAsyncReadyCallback, |
| - this); |
| - return; |
| + SpawnAsyncActions(); |
| + |
| + if (bytes_read > 0) { |
| + if (!hasher_.Update(buffer_[index].data(), bytes_read)) { |
| + LOG(ERROR) << "Unable to update the hash."; |
| + failed_ = true; |
| + } |
| } |
| +} |
| + |
| +void FilesystemCopierAction::StaticAsyncReadReadyCallback( |
| + GObject *source_object, |
| + GAsyncResult *res, |
| + gpointer user_data) { |
| + reinterpret_cast<FilesystemCopierAction*>(user_data)-> |
| + AsyncReadReadyCallback(source_object, res); |
| +} |
| + |
| +void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object, |
| + GAsyncResult *res) { |
| + int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1; |
| + CHECK(buffer_state_[index] == kBufferStateWriting); |
| + buffer_state_[index] = kBufferStateEmpty; |
| + |
| + GError* error = NULL; |
| + CHECK(canceller_[index]); |
| + cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE; |
| ssize_t bytes_written = g_output_stream_write_finish(dst_stream_, |
| res, |
| &error); |
| - if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) { |
| + if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) { |
| if (bytes_written < 0) { |
| - LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error); |
| + LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error); |
| } else { |
| LOG(ERROR) << "Write was short: wrote " << bytes_written |
| - << " but expected to write " << buffer_valid_size_; |
| + << " but expected to write " << buffer_valid_size_[index]; |
| } |
| - Cleanup(false, was_cancelled); |
| - return; |
| + failed_ = true; |
| } |
| - // Kick off a read |
| - read_in_flight_ = true; |
| - canceller_ = g_cancellable_new(); |
| - g_input_stream_read_async( |
| - src_stream_, |
| - &buffer_[0], |
| - GetBytesToRead(), |
| - G_PRIORITY_DEFAULT, |
| - canceller_, |
| - &FilesystemCopierAction::StaticAsyncReadyCallback, |
| - this); |
| + SpawnAsyncActions(); |
| +} |
| + |
| +void FilesystemCopierAction::StaticAsyncWriteReadyCallback( |
| + GObject *source_object, |
| + GAsyncResult *res, |
| + gpointer user_data) { |
| + reinterpret_cast<FilesystemCopierAction*>(user_data)-> |
| + AsyncWriteReadyCallback(source_object, res); |
| +} |
| + |
| +void FilesystemCopierAction::SpawnAsyncActions() { |
| + bool reading = false; |
| + bool writing = false; |
| + for (int i = 0; i < 2; i++) { |
| + if (buffer_state_[i] == kBufferStateReading) { |
| + reading = true; |
| + } |
| + if (buffer_state_[i] == kBufferStateWriting) { |
| + writing = true; |
| + } |
| + } |
| + if (failed_ || cancelled_) { |
| + if (!reading && !writing) { |
| + Cleanup(false); |
| + } |
| + return; |
| + } |
| + for (int i = 0; i < 2; i++) { |
| + if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) { |
| + g_input_stream_read_async( |
| + src_stream_, |
| + buffer_[i].data(), |
| + GetBytesToRead(), |
| + G_PRIORITY_DEFAULT, |
| + canceller_[i], |
| + &FilesystemCopierAction::StaticAsyncReadReadyCallback, |
| + this); |
| + reading = true; |
| + buffer_state_[i] = kBufferStateReading; |
| + } else if (!writing && buffer_state_[i] == kBufferStateFull) { |
| + g_output_stream_write_async( |
| + dst_stream_, |
| + buffer_[i].data(), |
| + buffer_valid_size_[i], |
| + G_PRIORITY_DEFAULT, |
| + canceller_[i], |
| + &FilesystemCopierAction::StaticAsyncWriteReadyCallback, |
| + this); |
| + writing = true; |
| + buffer_state_[i] = kBufferStateWriting; |
| + } |
| + } |
| + if (!reading && !writing) { |
| + // We're done! |
| + if (hasher_.Finalize()) { |
| + LOG(INFO) << "hash: " << hasher_.hash(); |
| + if (copying_kernel_install_path_) { |
| + install_plan_.current_kernel_hash = hasher_.raw_hash(); |
| + } else { |
| + install_plan_.current_rootfs_hash = hasher_.raw_hash(); |
| + } |
| + Cleanup(true); |
| + } else { |
| + LOG(ERROR) << "Unable to finalize the hash."; |
| + Cleanup(false); |
| + } |
| + } |
| } |
| void FilesystemCopierAction::DetermineFilesystemSize(int fd) { |
| @@ -214,7 +286,7 @@ void FilesystemCopierAction::DetermineFilesystemSize(int fd) { |
| } |
| int64_t FilesystemCopierAction::GetBytesToRead() { |
| - return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_); |
| + return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_); |
| } |
| } // namespace chromeos_update_engine |