OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "update_engine/full_update_generator.h" | |
6 | |
7 #include <inttypes.h> | |
8 #include <fcntl.h> | |
9 | |
10 #include <tr1/memory> | |
11 | |
12 #include <base/string_util.h> | |
13 | |
14 #include "update_engine/bzip.h" | |
15 #include "update_engine/utils.h" | |
16 | |
17 using std::deque; | |
18 using std::min; | |
19 using std::max; | |
20 using std::string; | |
21 using std::tr1::shared_ptr; | |
22 using std::vector; | |
23 | |
24 namespace chromeos_update_engine { | |
25 | |
26 namespace { | |
27 | |
28 // This class encapsulates a full update chunk processing thread. The processor | |
29 // reads a chunk of data from the input file descriptor and compresses it. The | |
30 // processor needs to be started through Start() then waited on through Wait(). | |
31 class ChunkProcessor { | |
32 public: | |
33 // Read a chunk of |size| bytes from |fd| starting at offset |offset|. | |
34 ChunkProcessor(int fd, off_t offset, size_t size) | |
35 : thread_(NULL), | |
36 fd_(fd), | |
37 offset_(offset), | |
38 buffer_in_(size) {} | |
39 ~ChunkProcessor() { Wait(); } | |
40 | |
41 off_t offset() const { return offset_; } | |
42 const vector<char>& buffer_in() const { return buffer_in_; } | |
43 const vector<char>& buffer_compressed() const { return buffer_compressed_; } | |
44 | |
45 // Starts the processor. Returns true on success, false on failure. | |
46 bool Start(); | |
47 | |
48 // Waits for the processor to complete. Returns true on success, false on | |
49 // failure. | |
50 bool Wait(); | |
51 | |
52 bool ShouldCompress() const { | |
53 return buffer_compressed_.size() < buffer_in_.size(); | |
54 } | |
55 | |
56 private: | |
57 // Reads the input data into |buffer_in_| and compresses it into | |
58 // |buffer_compressed_|. Returns true on success, false otherwise. | |
59 bool ReadAndCompress(); | |
60 static gpointer ReadAndCompressThread(gpointer data); | |
61 | |
62 GThread* thread_; | |
63 int fd_; | |
64 off_t offset_; | |
65 vector<char> buffer_in_; | |
66 vector<char> buffer_compressed_; | |
67 | |
68 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor); | |
69 }; | |
70 | |
71 bool ChunkProcessor::Start() { | |
72 thread_ = g_thread_create(ReadAndCompressThread, this, TRUE, NULL); | |
73 TEST_AND_RETURN_FALSE(thread_ != NULL); | |
74 return true; | |
75 } | |
76 | |
77 bool ChunkProcessor::Wait() { | |
78 if (!thread_) { | |
79 return false; | |
80 } | |
81 gpointer result = g_thread_join(thread_); | |
82 thread_ = NULL; | |
83 TEST_AND_RETURN_FALSE(result == this); | |
84 return true; | |
85 } | |
86 | |
87 gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) { | |
88 return | |
89 reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? data : NULL; | |
90 } | |
91 | |
92 bool ChunkProcessor::ReadAndCompress() { | |
93 ssize_t bytes_read = -1; | |
94 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_, | |
95 buffer_in_.data(), | |
96 buffer_in_.size(), | |
97 offset_, | |
98 &bytes_read)); | |
99 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size())); | |
100 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_)); | |
101 return true; | |
102 } | |
103 | |
104 } // namespace | |
105 | |
106 bool FullUpdateGenerator::Run( | |
107 Graph* graph, | |
108 const std::string& new_kernel_part, | |
109 const std::string& new_image, | |
110 off_t image_size, | |
111 int fd, | |
112 off_t* data_file_size, | |
113 off_t chunk_size, | |
114 off_t block_size, | |
115 vector<DeltaArchiveManifest_InstallOperation>* kernel_ops, | |
116 std::vector<Vertex::Index>* final_order) { | |
117 TEST_AND_RETURN_FALSE(chunk_size > 0); | |
118 TEST_AND_RETURN_FALSE((chunk_size % block_size) == 0); | |
119 | |
120 size_t max_threads = max(sysconf(_SC_NPROCESSORS_ONLN), 4L); | |
121 LOG(INFO) << "Max threads: " << max_threads; | |
122 | |
123 // Get the sizes early in the function, so we can fail fast if the user | |
124 // passed us bad paths. | |
125 TEST_AND_RETURN_FALSE(image_size >= 0 && | |
126 image_size <= utils::FileSize(new_image)); | |
127 const off_t kernel_size = utils::FileSize(new_kernel_part); | |
128 TEST_AND_RETURN_FALSE(kernel_size >= 0); | |
129 | |
130 off_t part_sizes[] = { image_size, kernel_size }; | |
131 string paths[] = { new_image, new_kernel_part }; | |
132 | |
133 for (int partition = 0; partition < 2; ++partition) { | |
134 const string& path = paths[partition]; | |
135 LOG(INFO) << "compressing " << path; | |
136 | |
137 int in_fd = open(path.c_str(), O_RDONLY, 0); | |
138 TEST_AND_RETURN_FALSE(in_fd >= 0); | |
139 ScopedFdCloser in_fd_closer(&in_fd); | |
140 | |
141 deque<shared_ptr<ChunkProcessor> > threads; | |
142 | |
143 off_t bytes_left = part_sizes[partition], counter = 0, offset = 0; | |
144 while (bytes_left > 0 || !threads.empty()) { | |
145 // Check and start new chunk processors if possible. | |
146 while (threads.size() < max_threads && bytes_left > 0) { | |
147 shared_ptr<ChunkProcessor> processor( | |
148 new ChunkProcessor(in_fd, offset, min(bytes_left, chunk_size))); | |
149 threads.push_back(processor); | |
150 TEST_AND_RETURN_FALSE(processor->Start()); | |
151 bytes_left -= chunk_size; | |
152 offset += chunk_size; | |
153 } | |
154 | |
155 // Need to wait for a chunk processor to complete and process its ouput | |
156 // before spawning new processors. | |
157 shared_ptr<ChunkProcessor> processor = threads.front(); | |
158 threads.pop_front(); | |
adlr
2010/11/08 21:27:08
might be faster to wait on all of the threads, but
petkov
2010/11/08 21:45:21
Maybe... We could also make the separate threads w
| |
159 TEST_AND_RETURN_FALSE(processor->Wait()); | |
160 | |
161 DeltaArchiveManifest_InstallOperation* op = NULL; | |
162 if (partition == 0) { | |
163 graph->resize(graph->size() + 1); | |
164 graph->back().file_name = | |
165 StringPrintf("<rootfs-operation-%" PRIi64 ">", counter++); | |
166 op = &graph->back().op; | |
167 final_order->push_back(graph->size() - 1); | |
168 } else { | |
169 kernel_ops->resize(kernel_ops->size() + 1); | |
170 op = &kernel_ops->back(); | |
171 } | |
172 | |
173 const bool compress = processor->ShouldCompress(); | |
174 const vector<char>& use_buf = | |
175 compress ? processor->buffer_compressed() : processor->buffer_in(); | |
176 op->set_type(compress ? | |
177 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ : | |
178 DeltaArchiveManifest_InstallOperation_Type_REPLACE); | |
179 op->set_data_offset(*data_file_size); | |
180 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, &use_buf[0], use_buf.size())); | |
181 *data_file_size += use_buf.size(); | |
182 op->set_data_length(use_buf.size()); | |
183 Extent* dst_extent = op->add_dst_extents(); | |
184 dst_extent->set_start_block(processor->offset() / block_size); | |
185 dst_extent->set_num_blocks(chunk_size / block_size); | |
186 | |
187 LOG(INFO) | |
188 << StringPrintf("%.1f", | |
189 processor->offset() * 100.0 / part_sizes[partition]) | |
190 << "% complete (output size: " << *data_file_size << ")"; | |
191 } | |
192 } | |
193 | |
194 return true; | |
195 } | |
196 | |
197 } // namespace chromeos_update_engine | |
OLD | NEW |