OLD | NEW |
1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | 1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "update_engine/filesystem_copier_action.h" | 5 #include "update_engine/filesystem_copier_action.h" |
6 | 6 |
7 #include <sys/stat.h> | 7 #include <sys/stat.h> |
8 #include <sys/types.h> | 8 #include <sys/types.h> |
9 #include <errno.h> | 9 #include <errno.h> |
10 #include <fcntl.h> | 10 #include <fcntl.h> |
(...skipping 14 matching lines...) Expand all Loading... |
25 #include "update_engine/utils.h" | 25 #include "update_engine/utils.h" |
26 | 26 |
27 using std::map; | 27 using std::map; |
28 using std::min; | 28 using std::min; |
29 using std::string; | 29 using std::string; |
30 using std::vector; | 30 using std::vector; |
31 | 31 |
32 namespace chromeos_update_engine { | 32 namespace chromeos_update_engine { |
33 | 33 |
34 namespace { | 34 namespace { |
35 const off_t kCopyFileBufferSize = 2 * 1024 * 1024; | 35 const off_t kCopyFileBufferSize = 512 * 1024; |
36 } // namespace {} | 36 } // namespace {} |
37 | 37 |
| 38 FilesystemCopierAction::FilesystemCopierAction( |
| 39 bool copying_kernel_install_path) |
| 40 : copying_kernel_install_path_(copying_kernel_install_path), |
| 41 src_stream_(NULL), |
| 42 dst_stream_(NULL), |
| 43 read_done_(false), |
| 44 failed_(false), |
| 45 cancelled_(false), |
| 46 filesystem_size_(kint64max) { |
| 47 // A lot of code works on the implicit assumption that processing is done on |
| 48 // exactly 2 ping-pong buffers. |
| 49 COMPILE_ASSERT(arraysize(buffer_) == 2 && |
| 50 arraysize(buffer_state_) == 2 && |
| 51 arraysize(buffer_valid_size_) == 2 && |
| 52 arraysize(canceller_) == 2, |
| 53 ping_pong_buffers_not_two); |
| 54 for (int i = 0; i < 2; ++i) { |
| 55 buffer_state_[i] = kBufferStateEmpty; |
| 56 buffer_valid_size_[i] = 0; |
| 57 canceller_[i] = NULL; |
| 58 } |
| 59 } |
| 60 |
38 void FilesystemCopierAction::PerformAction() { | 61 void FilesystemCopierAction::PerformAction() { |
39 // Will tell the ActionProcessor we've failed if we return. | 62 // Will tell the ActionProcessor we've failed if we return. |
40 ScopedActionCompleter abort_action_completer(processor_, this); | 63 ScopedActionCompleter abort_action_completer(processor_, this); |
41 | 64 |
42 if (!HasInputObject()) { | 65 if (!HasInputObject()) { |
43 LOG(ERROR) << "FilesystemCopierAction missing input object."; | 66 LOG(ERROR) << "FilesystemCopierAction missing input object."; |
44 return; | 67 return; |
45 } | 68 } |
46 install_plan_ = GetInputObject(); | 69 install_plan_ = GetInputObject(); |
47 | 70 |
(...skipping 29 matching lines...) Expand all Loading... |
77 PLOG(ERROR) << "Unable to open " << install_plan_.install_path | 100 PLOG(ERROR) << "Unable to open " << install_plan_.install_path |
78 << " for writing:"; | 101 << " for writing:"; |
79 return; | 102 return; |
80 } | 103 } |
81 | 104 |
82 DetermineFilesystemSize(src_fd); | 105 DetermineFilesystemSize(src_fd); |
83 | 106 |
84 src_stream_ = g_unix_input_stream_new(src_fd, TRUE); | 107 src_stream_ = g_unix_input_stream_new(src_fd, TRUE); |
85 dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE); | 108 dst_stream_ = g_unix_output_stream_new(dst_fd, TRUE); |
86 | 109 |
87 buffer_.resize(kCopyFileBufferSize); | 110 for (int i = 0; i < 2; i++) { |
| 111 buffer_[i].resize(kCopyFileBufferSize); |
| 112 canceller_[i] = g_cancellable_new(); |
| 113 } |
88 | 114 |
89 // Set up the first read | 115 // Start the first read. |
90 canceller_ = g_cancellable_new(); | 116 SpawnAsyncActions(); |
91 | |
92 g_input_stream_read_async(src_stream_, | |
93 &buffer_[0], | |
94 GetBytesToRead(), | |
95 G_PRIORITY_DEFAULT, | |
96 canceller_, | |
97 &FilesystemCopierAction::StaticAsyncReadyCallback, | |
98 this); | |
99 read_in_flight_ = true; | |
100 | 117 |
101 abort_action_completer.set_should_complete(false); | 118 abort_action_completer.set_should_complete(false); |
102 } | 119 } |
103 | 120 |
104 void FilesystemCopierAction::TerminateProcessing() { | 121 void FilesystemCopierAction::TerminateProcessing() { |
105 if (canceller_) { | 122 for (int i = 0; i < 2; i++) { |
106 g_cancellable_cancel(canceller_); | 123 if (canceller_[i]) { |
| 124 g_cancellable_cancel(canceller_[i]); |
| 125 } |
107 } | 126 } |
108 } | 127 } |
109 | 128 |
110 void FilesystemCopierAction::Cleanup(bool success, bool was_cancelled) { | 129 void FilesystemCopierAction::Cleanup(bool success) { |
| 130 for (int i = 0; i < 2; i++) { |
| 131 g_object_unref(canceller_[i]); |
| 132 canceller_[i] = NULL; |
| 133 } |
111 g_object_unref(src_stream_); | 134 g_object_unref(src_stream_); |
112 src_stream_ = NULL; | 135 src_stream_ = NULL; |
113 g_object_unref(dst_stream_); | 136 g_object_unref(dst_stream_); |
114 dst_stream_ = NULL; | 137 dst_stream_ = NULL; |
115 if (was_cancelled) | 138 if (cancelled_) |
116 return; | 139 return; |
117 if (success && HasOutputPipe()) | 140 if (success && HasOutputPipe()) |
118 SetOutputObject(install_plan_); | 141 SetOutputObject(install_plan_); |
119 processor_->ActionComplete( | 142 processor_->ActionComplete( |
120 this, | 143 this, |
121 success ? kActionCodeSuccess : kActionCodeError); | 144 success ? kActionCodeSuccess : kActionCodeError); |
122 } | 145 } |
123 | 146 |
124 void FilesystemCopierAction::AsyncReadyCallback(GObject *source_object, | 147 void FilesystemCopierAction::AsyncReadReadyCallback(GObject *source_object, |
125 GAsyncResult *res) { | 148 GAsyncResult *res) { |
| 149 int index = buffer_state_[0] == kBufferStateReading ? 0 : 1; |
| 150 CHECK(buffer_state_[index] == kBufferStateReading); |
| 151 |
126 GError* error = NULL; | 152 GError* error = NULL; |
127 CHECK(canceller_); | 153 CHECK(canceller_[index]); |
128 bool was_cancelled = g_cancellable_is_cancelled(canceller_) == TRUE; | 154 cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE; |
129 g_object_unref(canceller_); | |
130 canceller_ = NULL; | |
131 | 155 |
132 if (read_in_flight_) { | 156 ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); |
133 ssize_t bytes_read = g_input_stream_read_finish(src_stream_, res, &error); | 157 if (bytes_read < 0) { |
134 if (bytes_read < 0) { | 158 LOG(ERROR) << "Read failed: " << utils::GetGErrorMessage(error); |
135 LOG(ERROR) << "Read failed:" << utils::GetGErrorMessage(error); | 159 failed_ = true; |
136 Cleanup(false, was_cancelled); | 160 buffer_state_[index] = kBufferStateEmpty; |
137 return; | 161 } else if (bytes_read == 0) { |
| 162 read_done_ = true; |
| 163 buffer_state_[index] = kBufferStateEmpty; |
| 164 } else { |
| 165 buffer_valid_size_[index] = bytes_read; |
| 166 buffer_state_[index] = kBufferStateFull; |
| 167 } |
| 168 |
| 169 if (bytes_read > 0) { |
| 170 filesystem_size_ -= bytes_read; |
| 171 } |
| 172 |
| 173 SpawnAsyncActions(); |
| 174 |
| 175 if (bytes_read > 0) { |
| 176 if (!hasher_.Update(buffer_[index].data(), bytes_read)) { |
| 177 LOG(ERROR) << "Unable to update the hash."; |
| 178 failed_ = true; |
138 } | 179 } |
| 180 } |
| 181 } |
139 | 182 |
140 if (bytes_read == 0) { | 183 void FilesystemCopierAction::StaticAsyncReadReadyCallback( |
141 // We're done! | 184 GObject *source_object, |
142 if (!hasher_.Finalize()) { | 185 GAsyncResult *res, |
143 LOG(ERROR) << "Unable to finalize the hash."; | 186 gpointer user_data) { |
144 Cleanup(false, was_cancelled); | 187 reinterpret_cast<FilesystemCopierAction*>(user_data)-> |
145 return; | 188 AsyncReadReadyCallback(source_object, res); |
146 } | 189 } |
| 190 |
| 191 void FilesystemCopierAction::AsyncWriteReadyCallback(GObject *source_object, |
| 192 GAsyncResult *res) { |
| 193 int index = buffer_state_[0] == kBufferStateWriting ? 0 : 1; |
| 194 CHECK(buffer_state_[index] == kBufferStateWriting); |
| 195 buffer_state_[index] = kBufferStateEmpty; |
| 196 |
| 197 GError* error = NULL; |
| 198 CHECK(canceller_[index]); |
| 199 cancelled_ = g_cancellable_is_cancelled(canceller_[index]) == TRUE; |
| 200 |
| 201 ssize_t bytes_written = g_output_stream_write_finish(dst_stream_, |
| 202 res, |
| 203 &error); |
| 204 if (bytes_written < static_cast<ssize_t>(buffer_valid_size_[index])) { |
| 205 if (bytes_written < 0) { |
| 206 LOG(ERROR) << "Write failed: " << utils::GetGErrorMessage(error); |
| 207 } else { |
| 208 LOG(ERROR) << "Write was short: wrote " << bytes_written |
| 209 << " but expected to write " << buffer_valid_size_[index]; |
| 210 } |
| 211 failed_ = true; |
| 212 } |
| 213 |
| 214 SpawnAsyncActions(); |
| 215 } |
| 216 |
| 217 void FilesystemCopierAction::StaticAsyncWriteReadyCallback( |
| 218 GObject *source_object, |
| 219 GAsyncResult *res, |
| 220 gpointer user_data) { |
| 221 reinterpret_cast<FilesystemCopierAction*>(user_data)-> |
| 222 AsyncWriteReadyCallback(source_object, res); |
| 223 } |
| 224 |
| 225 void FilesystemCopierAction::SpawnAsyncActions() { |
| 226 bool reading = false; |
| 227 bool writing = false; |
| 228 for (int i = 0; i < 2; i++) { |
| 229 if (buffer_state_[i] == kBufferStateReading) { |
| 230 reading = true; |
| 231 } |
| 232 if (buffer_state_[i] == kBufferStateWriting) { |
| 233 writing = true; |
| 234 } |
| 235 } |
| 236 if (failed_ || cancelled_) { |
| 237 if (!reading && !writing) { |
| 238 Cleanup(false); |
| 239 } |
| 240 return; |
| 241 } |
| 242 for (int i = 0; i < 2; i++) { |
| 243 if (!reading && !read_done_ && buffer_state_[i] == kBufferStateEmpty) { |
| 244 g_input_stream_read_async( |
| 245 src_stream_, |
| 246 buffer_[i].data(), |
| 247 GetBytesToRead(), |
| 248 G_PRIORITY_DEFAULT, |
| 249 canceller_[i], |
| 250 &FilesystemCopierAction::StaticAsyncReadReadyCallback, |
| 251 this); |
| 252 reading = true; |
| 253 buffer_state_[i] = kBufferStateReading; |
| 254 } else if (!writing && buffer_state_[i] == kBufferStateFull) { |
| 255 g_output_stream_write_async( |
| 256 dst_stream_, |
| 257 buffer_[i].data(), |
| 258 buffer_valid_size_[i], |
| 259 G_PRIORITY_DEFAULT, |
| 260 canceller_[i], |
| 261 &FilesystemCopierAction::StaticAsyncWriteReadyCallback, |
| 262 this); |
| 263 writing = true; |
| 264 buffer_state_[i] = kBufferStateWriting; |
| 265 } |
| 266 } |
| 267 if (!reading && !writing) { |
| 268 // We're done! |
| 269 if (hasher_.Finalize()) { |
147 LOG(INFO) << "hash: " << hasher_.hash(); | 270 LOG(INFO) << "hash: " << hasher_.hash(); |
148 if (copying_kernel_install_path_) { | 271 if (copying_kernel_install_path_) { |
149 install_plan_.current_kernel_hash = hasher_.raw_hash(); | 272 install_plan_.current_kernel_hash = hasher_.raw_hash(); |
150 } else { | 273 } else { |
151 install_plan_.current_rootfs_hash = hasher_.raw_hash(); | 274 install_plan_.current_rootfs_hash = hasher_.raw_hash(); |
152 } | 275 } |
153 Cleanup(true, was_cancelled); | 276 Cleanup(true); |
154 return; | 277 } else { |
| 278 LOG(ERROR) << "Unable to finalize the hash."; |
| 279 Cleanup(false); |
155 } | 280 } |
156 if (!hasher_.Update(buffer_.data(), bytes_read)) { | |
157 LOG(ERROR) << "Unable to update the hash."; | |
158 Cleanup(false, was_cancelled); | |
159 return; | |
160 } | |
161 filesystem_size_ -= bytes_read; | |
162 | |
163 // Kick off a write | |
164 read_in_flight_ = false; | |
165 buffer_valid_size_ = bytes_read; | |
166 canceller_ = g_cancellable_new(); | |
167 g_output_stream_write_async( | |
168 dst_stream_, | |
169 &buffer_[0], | |
170 bytes_read, | |
171 G_PRIORITY_DEFAULT, | |
172 canceller_, | |
173 &FilesystemCopierAction::StaticAsyncReadyCallback, | |
174 this); | |
175 return; | |
176 } | 281 } |
177 | |
178 ssize_t bytes_written = g_output_stream_write_finish(dst_stream_, | |
179 res, | |
180 &error); | |
181 if (bytes_written < static_cast<ssize_t>(buffer_valid_size_)) { | |
182 if (bytes_written < 0) { | |
183 LOG(ERROR) << "Write failed:" << utils::GetGErrorMessage(error); | |
184 } else { | |
185 LOG(ERROR) << "Write was short: wrote " << bytes_written | |
186 << " but expected to write " << buffer_valid_size_; | |
187 } | |
188 Cleanup(false, was_cancelled); | |
189 return; | |
190 } | |
191 | |
192 // Kick off a read | |
193 read_in_flight_ = true; | |
194 canceller_ = g_cancellable_new(); | |
195 g_input_stream_read_async( | |
196 src_stream_, | |
197 &buffer_[0], | |
198 GetBytesToRead(), | |
199 G_PRIORITY_DEFAULT, | |
200 canceller_, | |
201 &FilesystemCopierAction::StaticAsyncReadyCallback, | |
202 this); | |
203 } | 282 } |
204 | 283 |
205 void FilesystemCopierAction::DetermineFilesystemSize(int fd) { | 284 void FilesystemCopierAction::DetermineFilesystemSize(int fd) { |
206 filesystem_size_ = kint64max; | 285 filesystem_size_ = kint64max; |
207 int block_count = 0, block_size = 0; | 286 int block_count = 0, block_size = 0; |
208 if (!copying_kernel_install_path_ && | 287 if (!copying_kernel_install_path_ && |
209 utils::GetFilesystemSizeFromFD(fd, &block_count, &block_size)) { | 288 utils::GetFilesystemSizeFromFD(fd, &block_count, &block_size)) { |
210 filesystem_size_ = static_cast<int64_t>(block_count) * block_size; | 289 filesystem_size_ = static_cast<int64_t>(block_count) * block_size; |
211 LOG(INFO) << "Filesystem size: " << filesystem_size_ << " bytes (" | 290 LOG(INFO) << "Filesystem size: " << filesystem_size_ << " bytes (" |
212 << block_count << "x" << block_size << ")."; | 291 << block_count << "x" << block_size << ")."; |
213 } | 292 } |
214 } | 293 } |
215 | 294 |
216 int64_t FilesystemCopierAction::GetBytesToRead() { | 295 int64_t FilesystemCopierAction::GetBytesToRead() { |
217 return std::min(static_cast<int64_t>(buffer_.size()), filesystem_size_); | 296 return std::min(static_cast<int64_t>(buffer_[0].size()), filesystem_size_); |
218 } | 297 } |
219 | 298 |
220 } // namespace chromeos_update_engine | 299 } // namespace chromeos_update_engine |
OLD | NEW |