Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(264)

Side by Side Diff: filesystem_copier_action.cc

Issue 5603001: Restructure FilesystemCopierAction to use ping-pong buffers. (Closed) Base URL: ssh://git@gitrw.chromium.org:9222/update_engine.git@master
Patch Set: Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « filesystem_copier_action.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "update_engine/filesystem_copier_action.h" 5 #include "update_engine/filesystem_copier_action.h"
6 6
7 #include <sys/stat.h> 7 #include <sys/stat.h>
8 #include <sys/types.h> 8 #include <sys/types.h>
9 #include <errno.h> 9 #include <errno.h>
10 #include <fcntl.h> 10 #include <fcntl.h>
(...skipping 14 matching lines...) Expand all
25 #include "update_engine/utils.h" 25 #include "update_engine/utils.h"
26 26
27 using std::map; 27 using std::map;
28 using std::min; 28 using std::min;
29 using std::string; 29 using std::string;
30 using std::vector; 30 using std::vector;
31 31
32 namespace chromeos_update_engine { 32 namespace chromeos_update_engine {
33 33
34 namespace { 34 namespace {
35 const off_t kCopyFileBufferSize = 2 * 1024 * 1024; 35 const off_t kCopyFileBufferSize = 512 * 1024;
36 } // namespace {} 36 } // namespace {}
37 37
38 FilesystemCopierAction::FilesystemCopierAction(
39 bool copying_kernel_install_path)
40 : copying_kernel_install_path_(copying_kernel_install_path),
41 src_stream_(NULL),
42 dst_stream_(NULL),
43 read_done_(false),
44 failed_(false),
45 cancelled_(false),
46 filesystem_size_(kint64max) {
47 for (int i = 0; i < 2; ++i) {
adlr 2010/12/02 22:12:14 s/2/arraysize(buffer_state_)/?
petkov 2010/12/02 22:39:40 Added a COMPILE_ASSERT and a comment. A lot of cod
48 buffer_state_[i] = kBufferStateEmpty;
adlr 2010/12/02 22:12:14 i suppose you can't just use the initialization li
petkov 2010/12/02 22:39:40 I didn't quite figure out if this can be done unle
49 buffer_valid_size_[i] = 0;
50 canceller_[i] = NULL;
51 }
52 }
53
38 void FilesystemCopierAction::PerformAction() { 54 void FilesystemCopierAction::PerformAction() {
39 // Will tell the ActionProcessor we've failed if we return. 55 // Will tell the ActionProcessor we've failed if we return.
40 ScopedActionCompleter abort_action_completer(processor_, this); 56 ScopedActionCompleter abort_action_completer(processor_, this);
41 57
42 if (!HasInputObject()) { 58 if (!HasInputObject()) {
43 LOG(ERROR) << "FilesystemCopierAction missing input object."; 59 LOG(ERROR) << "FilesystemCopierAction missing input object.";
44 return; 60 return;
45 } 61 }
46 install_plan_ = GetInputObject(); 62 install_plan_ = GetInputObject();
47 63
(...skipping 29 matching lines...) Expand all
77 PLOG(ERROR) << "Unable to open " << install_plan_.install_path 93 PLOG(ERROR) << "Unable to open " << install_plan_.install_path
78 << " for writing:"; 94 << " for writing:";
79 return; 95 return;
80 } 96 }
81 97
82 DetermineFilesystemSize(src_fd); 98 DetermineFilesystemSize(src_fd);
83 99
84 src_stream_ = g_unix_input_stream_new(src_fd, TRUE); 100 src_stream_ = g_unix_input_stream_new(src_fd, TRUE);
85 dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE); 101 dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE);
86 102
87 buffer_.resize(kCopyFileBufferSize); 103 for (int i = 0; i < 2; i++) {
104 buffer_[i].resize(kCopyFileBufferSize);
105 canceller_[i] = g_cancellable_new();
106 }
88 107
89 // Set up the first read 108 // Start the first read.
90 canceller_ = g_cancellable_new(); 109 SpawnAsyncActions();
91
92 g_input_stream_read_async(src_stream_,
93 &buffer_[0],
94 GetBytesToRead(),
95 G_PRIORITY_DEFAULT,
96 canceller_,
97 &FilesystemCopierAction::StaticAsyncReadyCallback,
98 this);
99 read_in_flight_ = true;
100 110
101 abort_action_completer.set_should_complete(false); 111 abort_action_completer.set_should_complete(false);
102 } 112 }
103 113
104 void FilesystemCopierAction::TerminateProcessing() { 114 void FilesystemCopierAction::TerminateProcessing() {
105 if (canceller_) { 115 for (int i = 0; i < 2; i++) {
106 g_cancellable_cancel(canceller_); 116 if (canceller_[i]) {
117 g_cancellable_cancel(canceller_[i]);
118 }
107 } 119 }
108 } 120 }
109 121
110 void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) { 122 void FilesystemCopierAction::Cleanup(bool success) {
123 for (int i = 0; i < 2; i++) {
124 g_object_unref(canceller_[i]);
125 canceller_[i] = NULL;
126 }
111 g_object_unref(src_stream_); 127 g_object_unref(src_stream_);
112 src_stream_ = NULL; 128 src_stream_ = NULL;
113 g_object_unref(dst_stream_); 129 g_object_unref(dst_stream_);
114 dst_stream_ = NULL; 130 dst_stream_ = NULL;
115 if (was_cancelled) 131 if (cancelled_)
116 return; 132 return;
117 if (success && HasOutputPipe()) 133 if (success && HasOutputPipe())
118 SetOutputObject(install_plan_); 134 SetOutputObject(install_plan_);
119 processor_->ActionComplete( 135 processor_->ActionComplete(
120 this, 136 this,
121 success ? kActionCodeSuccess : kActionCodeError); 137 success ? kActionCodeSuccess : kActionCodeError);
122 } 138 }
123 139
124 void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object, 140 void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object,
125 GAsyncResult *res) { 141 GAsyncResult *res) {
142 int index = buffer_state_[0] == kBufferStateReading ? 0 : 1;
143 CHECK(buffer_state_[index] == kBufferStateReading);
144
126 GError* error = NULL; 145 GError* error = NULL;
127 CHECK(canceller_); 146 CHECK(canceller_[index]);
128 bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE; 147 cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
129 g_object_unref(canceller_);
130 canceller_ = NULL;
131 148
132 if (read_in_flight_) { 149 ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
133 ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); 150 if (bytes_read < 0) {
134 if (bytes_read < 0) { 151 LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error);
135 LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error); 152 failed_ = true;
136 Cleanup(false, was_cancelled); 153 buffer_state_[index] = kBufferStateEmpty;
137 return; 154 } else if (bytes_read == 0) {
155 read_done_ = true;
156 buffer_state_[index] = kBufferStateEmpty;
157 } else {
158 buffer_valid_size_[index] = bytes_read;
159 buffer_state_[index] = kBufferStateFull;
160 }
161
162 if (bytes_read > 0) {
163 filesystem_size_ -= bytes_read;
164 }
165
166 SpawnAsyncActions();
167
168 if (bytes_read > 0) {
169 if (!hasher_.Update(buffer_[index].data(), bytes_read)) {
170 LOG(ERROR) << "Unable to update the hash.";
171 failed_ = true;
138 } 172 }
173 }
174 }
139 175
140 if (bytes_read == 0) { 176 void FilesystemCopierAction::StaticAsyncReadReadyCallback(
141 // We're done! 177 GObject *source_object,
142 if (!hasher_.Finalize()) { 178 GAsyncResult *res,
143 LOG(ERROR) << "Unable to finalize the hash."; 179 gpointer user_data) {
144 Cleanup(false, was_cancelled); 180 reinterpret_cast<FilesystemCopierAction*>(user_data)->
145 return; 181 AsyncReadReadyCallback(source_object, res);
146 } 182 }
183
184 void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object,
185 GAsyncResult *res) {
186 int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1;
187 CHECK(buffer_state_[index] == kBufferStateWriting);
188 buffer_state_[index] = kBufferStateEmpty;
189
190 GError* error = NULL;
191 CHECK(canceller_[index]);
192 cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
193
194 ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
195 res,
196 &error);
197 if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) {
198 if (bytes_written < 0) {
199 LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error);
200 } else {
201 LOG(ERROR) << "Write was short: wrote " << bytes_written
202 << " but expected to write " << buffer_valid_size_[index];
203 }
204 failed_ = true;
205 }
206
207 SpawnAsyncActions();
208 }
209
210 void FilesystemCopierAction::StaticAsyncWriteReadyCallback(
211 GObject *source_object,
212 GAsyncResult *res,
213 gpointer user_data) {
214 reinterpret_cast<FilesystemCopierAction*>(user_data)->
215 AsyncWriteReadyCallback(source_object, res);
216 }
217
218 void FilesystemCopierAction::SpawnAsyncActions() {
219 bool reading = false;
220 bool writing = false;
221 for (int i = 0; i < 2; i++) {
222 if (buffer_state_[i] == kBufferStateReading) {
223 reading = true;
224 }
225 if (buffer_state_[i] == kBufferStateWriting) {
226 writing = true;
227 }
228 }
229 if (failed_ || cancelled_) {
230 if (!reading && !writing) {
231 Cleanup(false);
232 }
233 return;
234 }
235 for (int i = 0; i < 2; i++) {
236 if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) {
237 g_input_stream_read_async(
238 src_stream_,
239 buffer_[i].data(),
240 GetBytesToRead(),
241 G_PRIORITY_DEFAULT,
242 canceller_[i],
243 &FilesystemCopierAction::StaticAsyncReadReadyCallback,
244 this);
245 reading = true;
246 buffer_state_[i] = kBufferStateReading;
247 } else if (!writing && buffer_state_[i] == kBufferStateFull) {
248 g_output_stream_write_async(
249 dst_stream_,
250 buffer_[i].data(),
251 buffer_valid_size_[i],
252 G_PRIORITY_DEFAULT,
253 canceller_[i],
254 &FilesystemCopierAction::StaticAsyncWriteReadyCallback,
255 this);
256 writing = true;
257 buffer_state_[i] = kBufferStateWriting;
258 }
259 }
260 if (!reading && !writing) {
261 // We're done!
262 if (hasher_.Finalize()) {
147 LOG(INFO) << "hash: " << hasher_.hash(); 263 LOG(INFO) << "hash: " << hasher_.hash();
148 if (copying_kernel_install_path_) { 264 if (copying_kernel_install_path_) {
149 install_plan_.current_kernel_hash = hasher_.raw_hash(); 265 install_plan_.current_kernel_hash = hasher_.raw_hash();
150 } else { 266 } else {
151 install_plan_.current_rootfs_hash = hasher_.raw_hash(); 267 install_plan_.current_rootfs_hash = hasher_.raw_hash();
152 } 268 }
153 Cleanup(true, was_cancelled); 269 Cleanup(true);
154 return; 270 } else {
271 LOG(ERROR) << "Unable to finalize the hash.";
272 Cleanup(false);
155 } 273 }
156 if (!hasher_.Update(buffer_.data(), bytes_read)) {
157 LOG(ERROR) << "Unable to update the hash.";
158 Cleanup(false, was_cancelled);
159 return;
160 }
161 filesystem_size_ -= bytes_read;
162
163 // Kick off a write
164 read_in_flight_ = false;
165 buffer_valid_size_ = bytes_read;
166 canceller_ = g_cancellable_new();
167 g_output_stream_write_async(
168 dst_stream_,
169 &buffer_[0],
170 bytes_read,
171 G_PRIORITY_DEFAULT,
172 canceller_,
173 &FilesystemCopierAction::StaticAsyncReadyCallback,
174 this);
175 return;
176 } 274 }
177
178 ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
179 res,
180 &error);
181 if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) {
182 if (bytes_written < 0) {
183 LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error);
184 } else {
185 LOG(ERROR) << "Write was short: wrote " << bytes_written
186 << " but expected to write " << buffer_valid_size_;
187 }
188 Cleanup(false, was_cancelled);
189 return;
190 }
191
192 // Kick off a read
193 read_in_flight_ = true;
194 canceller_ = g_cancellable_new();
195 g_input_stream_read_async(
196 src_stream_,
197 &buffer_[0],
198 GetBytesToRead(),
199 G_PRIORITY_DEFAULT,
200 canceller_,
201 &FilesystemCopierAction::StaticAsyncReadyCallback,
202 this);
203 } 275 }
204 276
205 void FilesystemCopierAction::DetermineFilesystemSize(int fd) { 277 void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
206 filesystem_size_ = kint64max; 278 filesystem_size_ = kint64max;
207 int block_count = 0, block_size = 0; 279 int block_count = 0, block_size = 0;
208 if (!copying_kernel_install_path_ && 280 if (!copying_kernel_install_path_ &&
209 utils::GetFilesystemSizeFromFD(fd, &block_count, &block_size)) { 281 utils::GetFilesystemSizeFromFD(fd, &block_count, &block_size)) {
210 filesystem_size_ = static_cast<int64_t>(block_count) * block_size; 282 filesystem_size_ = static_cast<int64_t>(block_count) * block_size;
211 LOG(INFO) << "Filesystem size: " << filesystem_size_ << " bytes (" 283 LOG(INFO) << "Filesystem size: " << filesystem_size_ << " bytes ("
212 << block_count << "x" << block_size << ")."; 284 << block_count << "x" << block_size << ").";
213 } 285 }
214 } 286 }
215 287
216 int64_t FilesystemCopierAction::GetBytesToRead() { 288 int64_t FilesystemCopierAction::GetBytesToRead() {
217 return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_); 289 return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_);
218 } 290 }
219 291
220 } // namespace chromeos_update_engine 292 } // namespace chromeos_update_engine
OLDNEW
« no previous file with comments | « filesystem_copier_action.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698