Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(261)

Side by Side Diff: filesystem_copier_action.cc

Issue 5603001: Restructure FilesystemCopierAction to use ping-pong buffers. (Closed) Base URL: ssh://git@gitrw.chromium.org:9222/update_engine.git@master
Patch Set: review comments Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « filesystem_copier_action.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "update_engine/filesystem_copier_action.h" 5 #include "update_engine/filesystem_copier_action.h"
6 6
7 #include <sys/stat.h> 7 #include <sys/stat.h>
8 #include <sys/types.h> 8 #include <sys/types.h>
9 #include <errno.h> 9 #include <errno.h>
10 #include <fcntl.h> 10 #include <fcntl.h>
(...skipping 14 matching lines...) Expand all
25 #include "update_engine/utils.h" 25 #include "update_engine/utils.h"
26 26
27 using std::map; 27 using std::map;
28 using std::min; 28 using std::min;
29 using std::string; 29 using std::string;
30 using std::vector; 30 using std::vector;
31 31
32 namespace chromeos_update_engine { 32 namespace chromeos_update_engine {
33 33
34 namespace { 34 namespace {
35 const off_t kCopyFileBufferSize = 2 * 1024 * 1024; 35 const off_t kCopyFileBufferSize = 512 * 1024;
36 } // namespace {} 36 } // namespace {}
37 37
38 FilesystemCopierAction::FilesystemCopierAction(
39 bool copying_kernel_install_path)
40 : copying_kernel_install_path_(copying_kernel_install_path),
41 src_stream_(NULL),
42 dst_stream_(NULL),
43 read_done_(false),
44 failed_(false),
45 cancelled_(false),
46 filesystem_size_(kint64max) {
47 // A lot of code works on the implicit assumption that processing is done on
48 // exactly 2 ping-pong buffers.
49 COMPILE_ASSERT(arraysize(buffer_) == 2 &&
50 arraysize(buffer_state_) == 2 &&
51 arraysize(buffer_valid_size_) == 2 &&
52 arraysize(canceller_) == 2,
53 ping_pong_buffers_not_two);
54 for (int i = 0; i < 2; ++i) {
55 buffer_state_[i] = kBufferStateEmpty;
56 buffer_valid_size_[i] = 0;
57 canceller_[i] = NULL;
58 }
59 }
60
38 void FilesystemCopierAction::PerformAction() { 61 void FilesystemCopierAction::PerformAction() {
39 // Will tell the ActionProcessor we've failed if we return. 62 // Will tell the ActionProcessor we've failed if we return.
40 ScopedActionCompleter abort_action_completer(processor_, this); 63 ScopedActionCompleter abort_action_completer(processor_, this);
41 64
42 if (!HasInputObject()) { 65 if (!HasInputObject()) {
43 LOG(ERROR) << "FilesystemCopierAction missing input object."; 66 LOG(ERROR) << "FilesystemCopierAction missing input object.";
44 return; 67 return;
45 } 68 }
46 install_plan_ = GetInputObject(); 69 install_plan_ = GetInputObject();
47 70
(...skipping 29 matching lines...) Expand all
77 PLOG(ERROR) << "Unable to open " << install_plan_.install_path 100 PLOG(ERROR) << "Unable to open " << install_plan_.install_path
78 << " for writing:"; 101 << " for writing:";
79 return; 102 return;
80 } 103 }
81 104
82 DetermineFilesystemSize(src_fd); 105 DetermineFilesystemSize(src_fd);
83 106
84 src_stream_ = g_unix_input_stream_new(src_fd, TRUE); 107 src_stream_ = g_unix_input_stream_new(src_fd, TRUE);
85 dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE); 108 dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE);
86 109
87 buffer_.resize(kCopyFileBufferSize); 110 for (int i = 0; i < 2; i++) {
111 buffer_[i].resize(kCopyFileBufferSize);
112 canceller_[i] = g_cancellable_new();
113 }
88 114
89 // Set up the first read 115 // Start the first read.
90 canceller_ = g_cancellable_new(); 116 SpawnAsyncActions();
91
92 g_input_stream_read_async(src_stream_,
93 &buffer_[0],
94 GetBytesToRead(),
95 G_PRIORITY_DEFAULT,
96 canceller_,
97 &FilesystemCopierAction::StaticAsyncReadyCallback,
98 this);
99 read_in_flight_ = true;
100 117
101 abort_action_completer.set_should_complete(false); 118 abort_action_completer.set_should_complete(false);
102 } 119 }
103 120
104 void FilesystemCopierAction::TerminateProcessing() { 121 void FilesystemCopierAction::TerminateProcessing() {
105 if (canceller_) { 122 for (int i = 0; i < 2; i++) {
106 g_cancellable_cancel(canceller_); 123 if (canceller_[i]) {
124 g_cancellable_cancel(canceller_[i]);
125 }
107 } 126 }
108 } 127 }
109 128
110 void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) { 129 void FilesystemCopierAction::Cleanup(bool success) {
130 for (int i = 0; i < 2; i++) {
131 g_object_unref(canceller_[i]);
132 canceller_[i] = NULL;
133 }
111 g_object_unref(src_stream_); 134 g_object_unref(src_stream_);
112 src_stream_ = NULL; 135 src_stream_ = NULL;
113 g_object_unref(dst_stream_); 136 g_object_unref(dst_stream_);
114 dst_stream_ = NULL; 137 dst_stream_ = NULL;
115 if (was_cancelled) 138 if (cancelled_)
116 return; 139 return;
117 if (success && HasOutputPipe()) 140 if (success && HasOutputPipe())
118 SetOutputObject(install_plan_); 141 SetOutputObject(install_plan_);
119 processor_->ActionComplete( 142 processor_->ActionComplete(
120 this, 143 this,
121 success ? kActionCodeSuccess : kActionCodeError); 144 success ? kActionCodeSuccess : kActionCodeError);
122 } 145 }
123 146
124 void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object, 147 void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object,
125 GAsyncResult *res) { 148 GAsyncResult *res) {
149 int index = buffer_state_[0] == kBufferStateReading ? 0 : 1;
150 CHECK(buffer_state_[index] == kBufferStateReading);
151
126 GError* error = NULL; 152 GError* error = NULL;
127 CHECK(canceller_); 153 CHECK(canceller_[index]);
128 bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE; 154 cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
129 g_object_unref(canceller_);
130 canceller_ = NULL;
131 155
132 if (read_in_flight_) { 156 ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error);
133 ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); 157 if (bytes_read < 0) {
134 if (bytes_read < 0) { 158 LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error);
135 LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error); 159 failed_ = true;
136 Cleanup(false, was_cancelled); 160 buffer_state_[index] = kBufferStateEmpty;
137 return; 161 } else if (bytes_read == 0) {
162 read_done_ = true;
163 buffer_state_[index] = kBufferStateEmpty;
164 } else {
165 buffer_valid_size_[index] = bytes_read;
166 buffer_state_[index] = kBufferStateFull;
167 }
168
169 if (bytes_read > 0) {
170 filesystem_size_ -= bytes_read;
171 }
172
173 SpawnAsyncActions();
174
175 if (bytes_read > 0) {
176 if (!hasher_.Update(buffer_[index].data(), bytes_read)) {
177 LOG(ERROR) << "Unable to update the hash.";
178 failed_ = true;
138 } 179 }
180 }
181 }
139 182
140 if (bytes_read == 0) { 183 void FilesystemCopierAction::StaticAsyncReadReadyCallback(
141 // We're done! 184 GObject *source_object,
142 if (!hasher_.Finalize()) { 185 GAsyncResult *res,
143 LOG(ERROR) << "Unable to finalize the hash."; 186 gpointer user_data) {
144 Cleanup(false, was_cancelled); 187 reinterpret_cast<FilesystemCopierAction*>(user_data)->
145 return; 188 AsyncReadReadyCallback(source_object, res);
146 } 189 }
190
191 void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object,
192 GAsyncResult *res) {
193 int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1;
194 CHECK(buffer_state_[index] == kBufferStateWriting);
195 buffer_state_[index] = kBufferStateEmpty;
196
197 GError* error = NULL;
198 CHECK(canceller_[index]);
199 cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE;
200
201 ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
202 res,
203 &error);
204 if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) {
205 if (bytes_written < 0) {
206 LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error);
207 } else {
208 LOG(ERROR) << "Write was short: wrote " << bytes_written
209 << " but expected to write " << buffer_valid_size_[index];
210 }
211 failed_ = true;
212 }
213
214 SpawnAsyncActions();
215 }
216
217 void FilesystemCopierAction::StaticAsyncWriteReadyCallback(
218 GObject *source_object,
219 GAsyncResult *res,
220 gpointer user_data) {
221 reinterpret_cast<FilesystemCopierAction*>(user_data)->
222 AsyncWriteReadyCallback(source_object, res);
223 }
224
225 void FilesystemCopierAction::SpawnAsyncActions() {
226 bool reading = false;
227 bool writing = false;
228 for (int i = 0; i < 2; i++) {
229 if (buffer_state_[i] == kBufferStateReading) {
230 reading = true;
231 }
232 if (buffer_state_[i] == kBufferStateWriting) {
233 writing = true;
234 }
235 }
236 if (failed_ || cancelled_) {
237 if (!reading && !writing) {
238 Cleanup(false);
239 }
240 return;
241 }
242 for (int i = 0; i < 2; i++) {
243 if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) {
244 g_input_stream_read_async(
245 src_stream_,
246 buffer_[i].data(),
247 GetBytesToRead(),
248 G_PRIORITY_DEFAULT,
249 canceller_[i],
250 &FilesystemCopierAction::StaticAsyncReadReadyCallback,
251 this);
252 reading = true;
253 buffer_state_[i] = kBufferStateReading;
254 } else if (!writing && buffer_state_[i] == kBufferStateFull) {
255 g_output_stream_write_async(
256 dst_stream_,
257 buffer_[i].data(),
258 buffer_valid_size_[i],
259 G_PRIORITY_DEFAULT,
260 canceller_[i],
261 &FilesystemCopierAction::StaticAsyncWriteReadyCallback,
262 this);
263 writing = true;
264 buffer_state_[i] = kBufferStateWriting;
265 }
266 }
267 if (!reading && !writing) {
268 // We're done!
269 if (hasher_.Finalize()) {
147 LOG(INFO) << "hash: " << hasher_.hash(); 270 LOG(INFO) << "hash: " << hasher_.hash();
148 if (copying_kernel_install_path_) { 271 if (copying_kernel_install_path_) {
149 install_plan_.current_kernel_hash = hasher_.raw_hash(); 272 install_plan_.current_kernel_hash = hasher_.raw_hash();
150 } else { 273 } else {
151 install_plan_.current_rootfs_hash = hasher_.raw_hash(); 274 install_plan_.current_rootfs_hash = hasher_.raw_hash();
152 } 275 }
153 Cleanup(true, was_cancelled); 276 Cleanup(true);
154 return; 277 } else {
278 LOG(ERROR) << "Unable to finalize the hash.";
279 Cleanup(false);
155 } 280 }
156 if (!hasher_.Update(buffer_.data(), bytes_read)) {
157 LOG(ERROR) << "Unable to update the hash.";
158 Cleanup(false, was_cancelled);
159 return;
160 }
161 filesystem_size_ -= bytes_read;
162
163 // Kick off a write
164 read_in_flight_ = false;
165 buffer_valid_size_ = bytes_read;
166 canceller_ = g_cancellable_new();
167 g_output_stream_write_async(
168 dst_stream_,
169 &buffer_[0],
170 bytes_read,
171 G_PRIORITY_DEFAULT,
172 canceller_,
173 &FilesystemCopierAction::StaticAsyncReadyCallback,
174 this);
175 return;
176 } 281 }
177
178 ssize_t bytes_written = g_output_stream_write_finish(dst_stream_,
179 res,
180 &error);
181 if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) {
182 if (bytes_written < 0) {
183 LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error);
184 } else {
185 LOG(ERROR) << "Write was short: wrote " << bytes_written
186 << " but expected to write " << buffer_valid_size_;
187 }
188 Cleanup(false, was_cancelled);
189 return;
190 }
191
192 // Kick off a read
193 read_in_flight_ = true;
194 canceller_ = g_cancellable_new();
195 g_input_stream_read_async(
196 src_stream_,
197 &buffer_[0],
198 GetBytesToRead(),
199 G_PRIORITY_DEFAULT,
200 canceller_,
201 &FilesystemCopierAction::StaticAsyncReadyCallback,
202 this);
203 } 282 }
204 283
205 void FilesystemCopierAction::DetermineFilesystemSize(int fd) { 284 void FilesystemCopierAction::DetermineFilesystemSize(int fd) {
206 filesystem_size_ = kint64max; 285 filesystem_size_ = kint64max;
207 int block_count = 0, block_size = 0; 286 int block_count = 0, block_size = 0;
208 if (!copying_kernel_install_path_ && 287 if (!copying_kernel_install_path_ &&
209 utils::GetFilesystemSizeFromFD(fd, &block_count, &block_size)) { 288 utils::GetFilesystemSizeFromFD(fd, &block_count, &block_size)) {
210 filesystem_size_ = static_cast<int64_t>(block_count) * block_size; 289 filesystem_size_ = static_cast<int64_t>(block_count) * block_size;
211 LOG(INFO) << "Filesystem size: " << filesystem_size_ << " bytes (" 290 LOG(INFO) << "Filesystem size: " << filesystem_size_ << " bytes ("
212 << block_count << "x" << block_size << ")."; 291 << block_count << "x" << block_size << ").";
213 } 292 }
214 } 293 }
215 294
216 int64_t FilesystemCopierAction::GetBytesToRead() { 295 int64_t FilesystemCopierAction::GetBytesToRead() {
217 return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_); 296 return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_);
218 } 297 }
219 298
220 } // namespace chromeos_update_engine 299 } // namespace chromeos_update_engine
OLDNEW
« no previous file with comments | « filesystem_copier_action.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698