Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "update_engine/full_update_generator.h" | |
| 6 | |
| 7 #include <inttypes.h> | |
| 8 #include <fcntl.h> | |
| 9 | |
| 10 #include <tr1/memory> | |
| 11 | |
| 12 #include <base/string_util.h> | |
| 13 | |
| 14 #include "update_engine/bzip.h" | |
| 15 #include "update_engine/utils.h" | |
| 16 | |
| 17 using std::deque; | |
| 18 using std::min; | |
| 19 using std::max; | |
| 20 using std::string; | |
| 21 using std::tr1::shared_ptr; | |
| 22 using std::vector; | |
| 23 | |
| 24 namespace chromeos_update_engine { | |
| 25 | |
| 26 namespace { | |
| 27 | |
| 28 // This class encapsulates a full update chunk processing thread. The processor | |
| 29 // reads a chunk of data from the input file descriptor and compresses it. The | |
| 30 // processor needs to be started through Start() then waited on through Wait(). | |
| 31 class ChunkProcessor { | |
| 32 public: | |
| 33 // Read a chunk of |size| bytes from |fd| starting at offset |offset|. | |
| 34 ChunkProcessor(int fd, off_t offset, size_t size) | |
| 35 : thread_(NULL), | |
| 36 fd_(fd), | |
| 37 offset_(offset), | |
| 38 buffer_in_(size) {} | |
| 39 ~ChunkProcessor() { Wait(); } | |
| 40 | |
| 41 off_t offset() const { return offset_; } | |
| 42 const vector<char>& buffer_in() const { return buffer_in_; } | |
| 43 const vector<char>& buffer_compressed() const { return buffer_compressed_; } | |
| 44 | |
| 45 // Starts the processor. Returns true on success, false on failure. | |
| 46 bool Start(); | |
| 47 | |
| 48 // Waits for the processor to complete. Returns true on success, false on | |
| 49 // failure. | |
| 50 bool Wait(); | |
| 51 | |
| 52 bool ShouldCompress() const { | |
| 53 return buffer_compressed_.size() < buffer_in_.size(); | |
| 54 } | |
| 55 | |
| 56 private: | |
| 57 // Reads the input data into |buffer_in_| and compresses it into | |
| 58 // |buffer_compressed_|. Returns true on success, false otherwise. | |
| 59 bool ReadAndCompress(); | |
| 60 static gpointer ReadAndCompressThread(gpointer data); | |
| 61 | |
| 62 GThread* thread_; | |
| 63 int fd_; | |
| 64 off_t offset_; | |
| 65 vector<char> buffer_in_; | |
| 66 vector<char> buffer_compressed_; | |
| 67 | |
| 68 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor); | |
| 69 }; | |
| 70 | |
| 71 bool ChunkProcessor::Start() { | |
| 72 thread_ = g_thread_create(ReadAndCompressThread, this, TRUE, NULL); | |
| 73 TEST_AND_RETURN_FALSE(thread_ != NULL); | |
| 74 return true; | |
| 75 } | |
| 76 | |
| 77 bool ChunkProcessor::Wait() { | |
| 78 if (!thread_) { | |
| 79 return false; | |
| 80 } | |
| 81 gpointer result = g_thread_join(thread_); | |
| 82 thread_ = NULL; | |
| 83 TEST_AND_RETURN_FALSE(result == this); | |
| 84 return true; | |
| 85 } | |
| 86 | |
| 87 gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) { | |
| 88 return | |
| 89 reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? data : NULL; | |
| 90 } | |
| 91 | |
| 92 bool ChunkProcessor::ReadAndCompress() { | |
| 93 ssize_t bytes_read = -1; | |
| 94 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_, | |
| 95 buffer_in_.data(), | |
| 96 buffer_in_.size(), | |
| 97 offset_, | |
| 98 &bytes_read)); | |
| 99 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size())); | |
| 100 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_)); | |
| 101 return true; | |
| 102 } | |
| 103 | |
| 104 } // namespace | |
| 105 | |
| 106 bool FullUpdateGenerator::Run( | |
| 107 Graph* graph, | |
| 108 const std::string& new_kernel_part, | |
| 109 const std::string& new_image, | |
| 110 off_t image_size, | |
| 111 int fd, | |
| 112 off_t* data_file_size, | |
| 113 off_t chunk_size, | |
| 114 off_t block_size, | |
| 115 vector<DeltaArchiveManifest_InstallOperation>* kernel_ops, | |
| 116 std::vector<Vertex::Index>* final_order) { | |
| 117 TEST_AND_RETURN_FALSE(chunk_size > 0); | |
| 118 TEST_AND_RETURN_FALSE((chunk_size % block_size) == 0); | |
| 119 | |
| 120 size_t max_threads = max(sysconf(_SC_NPROCESSORS_ONLN), 4L); | |
| 121 LOG(INFO) << "Max threads: " << max_threads; | |
| 122 | |
| 123 // Get the sizes early in the function, so we can fail fast if the user | |
| 124 // passed us bad paths. | |
| 125 TEST_AND_RETURN_FALSE(image_size >= 0 && | |
| 126 image_size <= utils::FileSize(new_image)); | |
| 127 const off_t kernel_size = utils::FileSize(new_kernel_part); | |
| 128 TEST_AND_RETURN_FALSE(kernel_size >= 0); | |
| 129 | |
| 130 off_t part_sizes[] = { image_size, kernel_size }; | |
| 131 string paths[] = { new_image, new_kernel_part }; | |
| 132 | |
| 133 for (int partition = 0; partition < 2; ++partition) { | |
| 134 const string& path = paths[partition]; | |
| 135 LOG(INFO) << "compressing " << path; | |
| 136 | |
| 137 int in_fd = open(path.c_str(), O_RDONLY, 0); | |
| 138 TEST_AND_RETURN_FALSE(in_fd >= 0); | |
| 139 ScopedFdCloser in_fd_closer(&in_fd); | |
| 140 | |
| 141 deque<shared_ptr<ChunkProcessor> > threads; | |
| 142 | |
| 143 off_t bytes_left = part_sizes[partition], counter = 0, offset = 0; | |
| 144 while (bytes_left > 0 || !threads.empty()) { | |
| 145 // Check and start new chunk processors if possible. | |
| 146 while (threads.size() < max_threads && bytes_left > 0) { | |
| 147 shared_ptr<ChunkProcessor> processor( | |
| 148 new ChunkProcessor(in_fd, offset, min(bytes_left, chunk_size))); | |
| 149 threads.push_back(processor); | |
| 150 TEST_AND_RETURN_FALSE(processor->Start()); | |
| 151 bytes_left -= chunk_size; | |
| 152 offset += chunk_size; | |
| 153 } | |
| 154 | |
| 155 // Need to wait for a chunk processor to complete and process its ouput | |
| 156 // before spawning new processors. | |
| 157 shared_ptr<ChunkProcessor> processor = threads.front(); | |
| 158 threads.pop_front(); | |
|
adlr
2010/11/08 21:27:08
might be faster to wait on all of the threads, but
petkov
2010/11/08 21:45:21
Maybe... We could also make the separate threads w
| |
| 159 TEST_AND_RETURN_FALSE(processor->Wait()); | |
| 160 | |
| 161 DeltaArchiveManifest_InstallOperation* op = NULL; | |
| 162 if (partition == 0) { | |
| 163 graph->resize(graph->size() + 1); | |
| 164 graph->back().file_name = | |
| 165 StringPrintf("<rootfs-operation-%" PRIi64 ">", counter++); | |
| 166 op = &graph->back().op; | |
| 167 final_order->push_back(graph->size() - 1); | |
| 168 } else { | |
| 169 kernel_ops->resize(kernel_ops->size() + 1); | |
| 170 op = &kernel_ops->back(); | |
| 171 } | |
| 172 | |
| 173 const bool compress = processor->ShouldCompress(); | |
| 174 const vector<char>& use_buf = | |
| 175 compress ? processor->buffer_compressed() : processor->buffer_in(); | |
| 176 op->set_type(compress ? | |
| 177 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ : | |
| 178 DeltaArchiveManifest_InstallOperation_Type_REPLACE); | |
| 179 op->set_data_offset(*data_file_size); | |
| 180 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, &use_buf[0], use_buf.size())); | |
| 181 *data_file_size += use_buf.size(); | |
| 182 op->set_data_length(use_buf.size()); | |
| 183 Extent* dst_extent = op->add_dst_extents(); | |
| 184 dst_extent->set_start_block(processor->offset() / block_size); | |
| 185 dst_extent->set_num_blocks(chunk_size / block_size); | |
| 186 | |
| 187 LOG(INFO) | |
| 188 << StringPrintf("%.1f", | |
| 189 processor->offset() * 100.0 / part_sizes[partition]) | |
| 190 << "% complete (output size: " << *data_file_size << ")"; | |
| 191 } | |
| 192 } | |
| 193 | |
| 194 return true; | |
| 195 } | |
| 196 | |
| 197 } // namespace chromeos_update_engine | |
| OLD | NEW |