| Index: filesystem_copier_action.cc
|
| diff --git a/filesystem_copier_action.cc b/filesystem_copier_action.cc
|
| index 1452b4dbffed38399e04f39d462a6b5313d139b0..f3f71bdbee4b107ddfc7ccba92e1e68cb168da91 100755
|
| --- a/filesystem_copier_action.cc
|
| +++ b/filesystem_copier_action.cc
|
| @@ -32,9 +32,32 @@ using std::vector;
|
| namespace chromeos_update_engine {
|
|
|
| namespace {
|
| -const off_t kCopyFileBufferSize = 2 * 1024 * 1024;
|
| +const off_t kCopyFileBufferSize = 512 * 1024;
|
| } // namespace {}
|
|
|
| +FilesystemCopierAction::FilesystemCopierAction(
|
| + bool copying_kernel_install_path)
|
| + : copying_kernel_install_path_(copying_kernel_install_path),
|
| + src_stream_(NULL),
|
| + dst_stream_(NULL),
|
| + read_done_(false),
|
| + failed_(false),
|
| + cancelled_(false),
|
| + filesystem_size_(kint64max) {
|
| + // A lot of code works on the implicit assumption that processing is done on
|
| + // exactly 2 ping-pong buffers.
|
| + COMPILE_ASSERT(arraysize(buffer_) == 2 &&
|
| + arraysize(buffer_state_) == 2 &&
|
| + arraysize(buffer_valid_size_) == 2 &&
|
| + arraysize(canceller_) == 2,
|
| + ping_pong_buffers_not_two);
|
| + for (int i = 0; i < 2; ++i) {
|
| + buffer_state_[i] = kBufferStateEmpty;
|
| + buffer_valid_size_[i] = 0;
|
| + canceller_[i] = NULL;
|
| + }
|
| +}
|
| +
|
| void FilesystemCopierAction::PerformAction() {
|
| // Will tell the ActionProcessor we've failed if we return.
|
| ScopedActionCompleter abort_action_completer(processor_, this);
|
| @@ -84,35 +107,35 @@ void FilesystemCopierAction::PerformAction() {
|
| src_stream_ = g_unix_input_stream_new(src_fd, TRUE);
|
| dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE);
|
|
|
| - buffer_.resize(kCopyFileBufferSize);
|
| -
|
| - // Set up the first read
|
| - canceller_ = g_cancellable_new();
|
| + for (int i = 0; i < 2; i++) {
|
| + buffer_[i].resize(kCopyFileBufferSize);
|
| + canceller_[i] = g_cancellable_new();
|
| + }
|
|
|
| - g_input_stream_read_async(src_stream_,
|
| - &buffer_[0],
|
| - GetBytesToRead(),
|
| - G_PRIORITY_DEFAULT,
|
| - canceller_,
|
| - &FilesystemCopierAction::StaticAsyncReadyCallback,
|
| - this);
|
| - read_in_flight_ = true;
|
| + // Start the first read.
|
| + SpawnAsyncActions();
|
|
|
| abort_action_completer.set_should_complete(false);
|
| }
|
|
|
| void FilesystemCopierAction::TerminateProcessing() {
|
| - if (canceller_) {
|
| - g_cancellable_cancel(canceller_);
|
| + for (int i = 0; i < 2; i++) {
|
| + if (canceller_[i]) {
|
| + g_cancellable_cancel(canceller_[i]);
|
| + }
|
| }
|
| }
|
|
|
| -void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) {
|
| +void FilesystemCopierAction::Cleanup(bool success) {
|
| + for (int i = 0; i < 2; i++) {
|
| + g_object_unref(canceller_[i]);
|
| + canceller_[i] = NULL;
|
| + }
|
| g_object_unref(src_stream_);
|
| src_stream_ = NULL;
|
| g_object_unref(dst_stream_);
|
| dst_stream_ = NULL;
|
| - if (was_cancelled)
|
| + if (cancelled_)
|
| return;
|
| if (success && HasOutputPipe())
|
| SetOutputObject(install_plan_);
|
| @@ -121,85 +144,141 @@ void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) {
|
| success ? kActionCodeSuccess : kActionCodeError);
|
| }
|
|
|
| -void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object,
|
| - GAsyncResult *res) {
|
| +void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object,
|
| + GAsyncResult *res) {
|
| + int index = buffer_state_[0] == kBufferStateReading ? 0 : 1;
|
| + CHECK(buffer_state_[index] == kBufferStateReading);
|
| +
|
| GError* error = NULL;
|
| - CHECK(canceller_);
|
| - bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE;
|
| - g_object_unref(canceller_);
|
| - canceller_ = NULL;
|
| -
|
| - if (read_in_flight_) {
|
| - ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
|
| - if (bytes_read < 0) {
|
| - LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error);
|
| - Cleanup(false, was_cancelled);
|
| - return;
|
| - }
|
| + CHECK(canceller_[index]);
|
| + cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
|
|
|
| - if (bytes_read == 0) {
|
| - // We're done!
|
| - if (!hasher_.Finalize()) {
|
| - LOG(ERROR) << "Unable to finalize the hash.";
|
| - Cleanup(false, was_cancelled);
|
| - return;
|
| - }
|
| - LOG(INFO) << "hash: " << hasher_.hash();
|
| - if (copying_kernel_install_path_) {
|
| - install_plan_.current_kernel_hash = hasher_.raw_hash();
|
| - } else {
|
| - install_plan_.current_rootfs_hash = hasher_.raw_hash();
|
| - }
|
| - Cleanup(true, was_cancelled);
|
| - return;
|
| - }
|
| - if (!hasher_.Update(buffer_.data(), bytes_read)) {
|
| - LOG(ERROR) << "Unable to update the hash.";
|
| - Cleanup(false, was_cancelled);
|
| - return;
|
| - }
|
| + ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
|
| + if (bytes_read < 0) {
|
| + LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error);
|
| + failed_ = true;
|
| + buffer_state_[index] = kBufferStateEmpty;
|
| + } else if (bytes_read == 0) {
|
| + read_done_ = true;
|
| + buffer_state_[index] = kBufferStateEmpty;
|
| + } else {
|
| + buffer_valid_size_[index] = bytes_read;
|
| + buffer_state_[index] = kBufferStateFull;
|
| + }
|
| +
|
| + if (bytes_read > 0) {
|
| filesystem_size_ -= bytes_read;
|
| + }
|
|
|
| - // Kick off a write
|
| - read_in_flight_ = false;
|
| - buffer_valid_size_ = bytes_read;
|
| - canceller_ = g_cancellable_new();
|
| - g_output_stream_write_async(
|
| - dst_stream_,
|
| - &buffer_[0],
|
| - bytes_read,
|
| - G_PRIORITY_DEFAULT,
|
| - canceller_,
|
| - &FilesystemCopierAction::StaticAsyncReadyCallback,
|
| - this);
|
| - return;
|
| + SpawnAsyncActions();
|
| +
|
| + if (bytes_read > 0) {
|
| + if (!hasher_.Update(buffer_[index].data(), bytes_read)) {
|
| + LOG(ERROR) << "Unable to update the hash.";
|
| + failed_ = true;
|
| + }
|
| }
|
| +}
|
| +
|
| +void FilesystemCopierAction::StaticAsyncReadReadyCallback(
|
| + GObject *source_object,
|
| + GAsyncResult *res,
|
| + gpointer user_data) {
|
| + reinterpret_cast<FilesystemCopierAction*>(user_data)->
|
| + AsyncReadReadyCallback(source_object, res);
|
| +}
|
| +
|
| +void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object,
|
| + GAsyncResult *res) {
|
| + int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1;
|
| + CHECK(buffer_state_[index] == kBufferStateWriting);
|
| + buffer_state_[index] = kBufferStateEmpty;
|
| +
|
| + GError* error = NULL;
|
| + CHECK(canceller_[index]);
|
| + cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
|
|
|
| ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
|
| res,
|
| &error);
|
| - if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) {
|
| + if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) {
|
| if (bytes_written < 0) {
|
| - LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error);
|
| + LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error);
|
| } else {
|
| LOG(ERROR) << "Write was short: wrote " << bytes_written
|
| - << " but expected to write " << buffer_valid_size_;
|
| + << " but expected to write " << buffer_valid_size_[index];
|
| }
|
| - Cleanup(false, was_cancelled);
|
| - return;
|
| + failed_ = true;
|
| }
|
|
|
| - // Kick off a read
|
| - read_in_flight_ = true;
|
| - canceller_ = g_cancellable_new();
|
| - g_input_stream_read_async(
|
| - src_stream_,
|
| - &buffer_[0],
|
| - GetBytesToRead(),
|
| - G_PRIORITY_DEFAULT,
|
| - canceller_,
|
| - &FilesystemCopierAction::StaticAsyncReadyCallback,
|
| - this);
|
| + SpawnAsyncActions();
|
| +}
|
| +
|
| +void FilesystemCopierAction::StaticAsyncWriteReadyCallback(
|
| + GObject *source_object,
|
| + GAsyncResult *res,
|
| + gpointer user_data) {
|
| + reinterpret_cast<FilesystemCopierAction*>(user_data)->
|
| + AsyncWriteReadyCallback(source_object, res);
|
| +}
|
| +
|
| +void FilesystemCopierAction::SpawnAsyncActions() {
|
| + bool reading = false;
|
| + bool writing = false;
|
| + for (int i = 0; i < 2; i++) {
|
| + if (buffer_state_[i] == kBufferStateReading) {
|
| + reading = true;
|
| + }
|
| + if (buffer_state_[i] == kBufferStateWriting) {
|
| + writing = true;
|
| + }
|
| + }
|
| + if (failed_ || cancelled_) {
|
| + if (!reading && !writing) {
|
| + Cleanup(false);
|
| + }
|
| + return;
|
| + }
|
| + for (int i = 0; i < 2; i++) {
|
| + if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) {
|
| + g_input_stream_read_async(
|
| + src_stream_,
|
| + buffer_[i].data(),
|
| + GetBytesToRead(),
|
| + G_PRIORITY_DEFAULT,
|
| + canceller_[i],
|
| + &FilesystemCopierAction::StaticAsyncReadReadyCallback,
|
| + this);
|
| + reading = true;
|
| + buffer_state_[i] = kBufferStateReading;
|
| + } else if (!writing && buffer_state_[i] == kBufferStateFull) {
|
| + g_output_stream_write_async(
|
| + dst_stream_,
|
| + buffer_[i].data(),
|
| + buffer_valid_size_[i],
|
| + G_PRIORITY_DEFAULT,
|
| + canceller_[i],
|
| + &FilesystemCopierAction::StaticAsyncWriteReadyCallback,
|
| + this);
|
| + writing = true;
|
| + buffer_state_[i] = kBufferStateWriting;
|
| + }
|
| + }
|
| + if (!reading && !writing) {
|
| + // We're done!
|
| + if (hasher_.Finalize()) {
|
| + LOG(INFO) << "hash: " << hasher_.hash();
|
| + if (copying_kernel_install_path_) {
|
| + install_plan_.current_kernel_hash = hasher_.raw_hash();
|
| + } else {
|
| + install_plan_.current_rootfs_hash = hasher_.raw_hash();
|
| + }
|
| + Cleanup(true);
|
| + } else {
|
| + LOG(ERROR) << "Unable to finalize the hash.";
|
| + Cleanup(false);
|
| + }
|
| + }
|
| }
|
|
|
| void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
|
| @@ -214,7 +293,7 @@ void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
|
| }
|
|
|
| int64_t FilesystemCopierAction::GetBytesToRead() {
|
| - return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_);
|
| + return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_);
|
| }
|
|
|
| } // namespace chromeos_update_engine
|
|
|