Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(51)

Side by Side Diff: full_update_generator.cc

Issue 4610001: AU: Speed up full update payload generation by running multiple threads. (Closed) Base URL: ssh://git@gitrw.chromium.org:9222/update_engine.git@master
Patch Set: doc strings Created 10 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « full_update_generator.h ('k') | full_update_generator_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "update_engine/full_update_generator.h"
6
7 #include <inttypes.h>
8 #include <fcntl.h>
9
10 #include <tr1/memory>
11
12 #include <base/string_util.h>
13
14 #include "update_engine/bzip.h"
15 #include "update_engine/utils.h"
16
17 using std::deque;
18 using std::min;
19 using std::max;
20 using std::string;
21 using std::tr1::shared_ptr;
22 using std::vector;
23
24 namespace chromeos_update_engine {
25
26 namespace {
27
28 // This class encapsulates a full update chunk processing thread. The processor
29 // reads a chunk of data from the input file descriptor and compresses it. The
30 // processor needs to be started through Start() then waited on through Wait().
31 class ChunkProcessor {
32 public:
33 // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
34 ChunkProcessor(int fd, off_t offset, size_t size)
35 : thread_(NULL),
36 fd_(fd),
37 offset_(offset),
38 buffer_in_(size) {}
39 ~ChunkProcessor() { Wait(); }
40
41 off_t offset() const { return offset_; }
42 const vector<char>& buffer_in() const { return buffer_in_; }
43 const vector<char>& buffer_compressed() const { return buffer_compressed_; }
44
45 // Starts the processor. Returns true on success, false on failure.
46 bool Start();
47
48 // Waits for the processor to complete. Returns true on success, false on
49 // failure.
50 bool Wait();
51
52 bool ShouldCompress() const {
53 return buffer_compressed_.size() < buffer_in_.size();
54 }
55
56 private:
57 // Reads the input data into |buffer_in_| and compresses it into
58 // |buffer_compressed_|. Returns true on success, false otherwise.
59 bool ReadAndCompress();
60 static gpointer ReadAndCompressThread(gpointer data);
61
62 GThread* thread_;
63 int fd_;
64 off_t offset_;
65 vector<char> buffer_in_;
66 vector<char> buffer_compressed_;
67
68 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
69 };
70
71 bool ChunkProcessor::Start() {
72 thread_ = g_thread_create(ReadAndCompressThread, this, TRUE, NULL);
73 TEST_AND_RETURN_FALSE(thread_ != NULL);
74 return true;
75 }
76
77 bool ChunkProcessor::Wait() {
78 if (!thread_) {
79 return false;
80 }
81 gpointer result = g_thread_join(thread_);
82 thread_ = NULL;
83 TEST_AND_RETURN_FALSE(result == this);
84 return true;
85 }
86
87 gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) {
88 return
89 reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? data : NULL;
90 }
91
92 bool ChunkProcessor::ReadAndCompress() {
93 ssize_t bytes_read = -1;
94 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
95 buffer_in_.data(),
96 buffer_in_.size(),
97 offset_,
98 &bytes_read));
99 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size()));
100 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_));
101 return true;
102 }
103
104 } // namespace
105
106 bool FullUpdateGenerator::Run(
107 Graph* graph,
108 const std::string& new_kernel_part,
109 const std::string& new_image,
110 off_t image_size,
111 int fd,
112 off_t* data_file_size,
113 off_t chunk_size,
114 off_t block_size,
115 vector<DeltaArchiveManifest_InstallOperation>* kernel_ops,
116 std::vector<Vertex::Index>* final_order) {
117 TEST_AND_RETURN_FALSE(chunk_size > 0);
118 TEST_AND_RETURN_FALSE((chunk_size % block_size) == 0);
119
120 size_t max_threads = max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
121 LOG(INFO) << "Max threads: " << max_threads;
122
123 // Get the sizes early in the function, so we can fail fast if the user
124 // passed us bad paths.
125 TEST_AND_RETURN_FALSE(image_size >= 0 &&
126 image_size <= utils::FileSize(new_image));
127 const off_t kernel_size = utils::FileSize(new_kernel_part);
128 TEST_AND_RETURN_FALSE(kernel_size >= 0);
129
130 off_t part_sizes[] = { image_size, kernel_size };
131 string paths[] = { new_image, new_kernel_part };
132
133 for (int partition = 0; partition < 2; ++partition) {
134 const string& path = paths[partition];
135 LOG(INFO) << "compressing " << path;
136
137 int in_fd = open(path.c_str(), O_RDONLY, 0);
138 TEST_AND_RETURN_FALSE(in_fd >= 0);
139 ScopedFdCloser in_fd_closer(&in_fd);
140
141 deque<shared_ptr<ChunkProcessor> > threads;
142
143 off_t bytes_left = part_sizes[partition], counter = 0, offset = 0;
144 while (bytes_left > 0 || !threads.empty()) {
145 // Check and start new chunk processors if possible.
146 while (threads.size() < max_threads && bytes_left > 0) {
147 shared_ptr<ChunkProcessor> processor(
148 new ChunkProcessor(in_fd, offset, min(bytes_left, chunk_size)));
149 threads.push_back(processor);
150 TEST_AND_RETURN_FALSE(processor->Start());
151 bytes_left -= chunk_size;
152 offset += chunk_size;
153 }
154
155 // Need to wait for a chunk processor to complete and process its ouput
156 // before spawning new processors.
157 shared_ptr<ChunkProcessor> processor = threads.front();
158 threads.pop_front();
adlr 2010/11/08 21:27:08 might be faster to wait on all of the threads, but
petkov 2010/11/08 21:45:21 Maybe... We could also make the separate threads w
159 TEST_AND_RETURN_FALSE(processor->Wait());
160
161 DeltaArchiveManifest_InstallOperation* op = NULL;
162 if (partition == 0) {
163 graph->resize(graph->size() + 1);
164 graph->back().file_name =
165 StringPrintf("<rootfs-operation-%" PRIi64 ">", counter++);
166 op = &graph->back().op;
167 final_order->push_back(graph->size() - 1);
168 } else {
169 kernel_ops->resize(kernel_ops->size() + 1);
170 op = &kernel_ops->back();
171 }
172
173 const bool compress = processor->ShouldCompress();
174 const vector<char>& use_buf =
175 compress ? processor->buffer_compressed() : processor->buffer_in();
176 op->set_type(compress ?
177 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ :
178 DeltaArchiveManifest_InstallOperation_Type_REPLACE);
179 op->set_data_offset(*data_file_size);
180 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, &use_buf[0], use_buf.size()));
181 *data_file_size += use_buf.size();
182 op->set_data_length(use_buf.size());
183 Extent* dst_extent = op->add_dst_extents();
184 dst_extent->set_start_block(processor->offset() / block_size);
185 dst_extent->set_num_blocks(chunk_size / block_size);
186
187 LOG(INFO)
188 << StringPrintf("%.1f",
189 processor->offset() * 100.0 / part_sizes[partition])
190 << "% complete (output size: " << *data_file_size << ")";
191 }
192 }
193
194 return true;
195 }
196
197 } // namespace chromeos_update_engine
OLDNEW
« no previous file with comments | « full_update_generator.h ('k') | full_update_generator_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698