OLD | NEW |
1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
162 walker := partitioningWalker{fsView: fsView} | 162 walker := partitioningWalker{fsView: fsView} |
163 for _, dep := range deps { | 163 for _, dep := range deps { |
164 // Try to walk dep. If dep is a file (or symlink), the inner fun
ction is called exactly once. | 164 // Try to walk dep. If dep is a file (or symlink), the inner fun
ction is called exactly once. |
165 if err := filepath.Walk(filepath.Clean(dep), walker.walkFn); err
!= nil { | 165 if err := filepath.Walk(filepath.Clean(dep), walker.walkFn); err
!= nil { |
166 return partitionedDeps{}, err | 166 return partitionedDeps{}, err |
167 } | 167 } |
168 } | 168 } |
169 return walker.parts, nil | 169 return walker.parts, nil |
170 } | 170 } |
171 | 171 |
172 type uploadTracker struct { | |
173 checker *Checker | |
174 uploader *Uploader | |
175 files map[string]isolated.File | |
176 } | |
177 | |
178 func newUploadTracker(checker *Checker, uploader *Uploader) *uploadTracker { | |
179 return &uploadTracker{ | |
180 checker: checker, | |
181 uploader: uploader, | |
182 files: make(map[string]isolated.File), | |
183 } | |
184 } | |
185 | |
186 func (ut *uploadTracker) Files() map[string]isolated.File { | |
187 return ut.files | |
188 } | |
189 | |
190 // populateSymlinks adds an isolated.File to files for each provided symlink | |
191 func (ut *uploadTracker) populateSymlinks(symlinks []*Item) error { | |
192 for _, item := range symlinks { | |
193 l, err := os.Readlink(item.Path) | |
194 if err != nil { | |
195 return fmt.Errorf("unable to resolve symlink for %q: %v"
, item.Path, err) | |
196 } | |
197 ut.files[item.RelPath] = isolated.SymLink(l) | |
198 } | |
199 return nil | |
200 } | |
201 | |
202 // tarAndUploadFiles creates bundles of files, uploads them, and adds each bundl
e to files. | |
203 func (ut *uploadTracker) tarAndUploadFiles(smallFiles []*Item) error { | |
204 bundles := ShardItems(smallFiles, archiveMaxSize) | |
205 log.Printf("\t%d TAR archives to be isolated", len(bundles)) | |
206 | |
207 for _, bundle := range bundles { | |
208 bundle := bundle | |
209 digest, tarSize, err := bundle.Digest() | |
210 if err != nil { | |
211 return err | |
212 } | |
213 | |
214 log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes
(uint64(tarSize))) | |
215 log.Printf("\tcontains %d files (total %s)", len(bundle.Items),
humanize.Bytes(uint64(bundle.ItemSize))) | |
216 // Mint an item for this tar. | |
217 item := &Item{ | |
218 Path: fmt.Sprintf(".%s.tar", digest), | |
219 RelPath: fmt.Sprintf(".%s.tar", digest), | |
220 Size: tarSize, | |
221 Mode: 0644, // Read | |
222 Digest: digest, | |
223 } | |
224 ut.files[item.RelPath] = isolated.TarFile(item.Digest, int(item.
Mode), item.Size) | |
225 | |
226 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { | |
227 if ps == nil { | |
228 return | |
229 } | |
230 log.Printf("QUEUED %q for upload", item.RelPath) | |
231 ut.uploader.Upload(item.RelPath, bundle.Contents, ps, fu
nc() { | |
232 log.Printf("UPLOADED %q", item.RelPath) | |
233 }) | |
234 }) | |
235 } | |
236 return nil | |
237 } | |
238 | |
239 // uploadFiles uploads each file and adds it to files. | |
240 func (ut *uploadTracker) uploadFiles(files []*Item) error { | |
241 // Handle the large individually-uploaded files. | |
242 for _, item := range files { | |
243 d, err := hashFile(item.Path) | |
244 if err != nil { | |
245 return err | |
246 } | |
247 item.Digest = d | |
248 ut.files[item.RelPath] = isolated.BasicFile(item.Digest, int(ite
m.Mode), item.Size) | |
249 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { | |
250 if ps == nil { | |
251 return | |
252 } | |
253 log.Printf("QUEUED %q for upload", item.RelPath) | |
254 ut.uploader.UploadFile(item, ps, func() { | |
255 log.Printf("UPLOADED %q", item.RelPath) | |
256 }) | |
257 }) | |
258 } | |
259 return nil | |
260 } | |
261 | |
262 func (ut *uploadTracker) UploadDeps(parts partitionedDeps) error { | |
263 if err := ut.populateSymlinks(parts.links.items); err != nil { | |
264 return err | |
265 } | |
266 | |
267 if err := ut.tarAndUploadFiles(parts.filesToArchive.items); err != nil { | |
268 return err | |
269 } | |
270 | |
271 if err := ut.uploadFiles(parts.indivFiles.items); err != nil { | |
272 return err | |
273 } | |
274 return nil | |
275 } | |
276 | |
277 // main contains the core logic for experimental archive. | 172 // main contains the core logic for experimental archive. |
278 func (c *expArchiveRun) main() error { | 173 func (c *expArchiveRun) main() error { |
279 // TODO(djd): This func is long and has a lot of internal complexity (li
ke, | 174 // TODO(djd): This func is long and has a lot of internal complexity (li
ke, |
280 // such as, archiveCallback). Refactor. | 175 // such as, archiveCallback). Refactor. |
281 | 176 |
282 start := time.Now() | 177 start := time.Now() |
283 archiveOpts := &c.isolateFlags.ArchiveOptions | 178 archiveOpts := &c.isolateFlags.ArchiveOptions |
284 // Parse the incoming isolate file. | 179 // Parse the incoming isolate file. |
285 deps, rootDir, isol, err := isolate.ProcessIsolate(archiveOpts) | 180 deps, rootDir, isol, err := isolate.ProcessIsolate(archiveOpts) |
286 if err != nil { | 181 if err != nil { |
(...skipping 23 matching lines...) Expand all Loading... |
310 if err != nil { | 205 if err != nil { |
311 return fmt.Errorf("partitioning deps: %v", err) | 206 return fmt.Errorf("partitioning deps: %v", err) |
312 } | 207 } |
313 | 208 |
314 numFiles := len(parts.filesToArchive.items) + len(parts.indivFiles.items
) | 209 numFiles := len(parts.filesToArchive.items) + len(parts.indivFiles.items
) |
315 filesSize := uint64(parts.filesToArchive.totalSize + parts.indivFiles.to
talSize) | 210 filesSize := uint64(parts.filesToArchive.totalSize + parts.indivFiles.to
talSize) |
316 log.Printf("Isolate expanded to %d files (total size %s) and %d symlinks
", numFiles, humanize.Bytes(filesSize), len(parts.links.items)) | 211 log.Printf("Isolate expanded to %d files (total size %s) and %d symlinks
", numFiles, humanize.Bytes(filesSize), len(parts.links.items)) |
317 log.Printf("\t%d files (%s) to be isolated individually", len(parts.indi
vFiles.items), humanize.Bytes(uint64(parts.indivFiles.totalSize))) | 212 log.Printf("\t%d files (%s) to be isolated individually", len(parts.indi
vFiles.items), humanize.Bytes(uint64(parts.indivFiles.totalSize))) |
318 log.Printf("\t%d files (%s) to be isolated in archives", len(parts.files
ToArchive.items), humanize.Bytes(uint64(parts.filesToArchive.totalSize))) | 213 log.Printf("\t%d files (%s) to be isolated in archives", len(parts.files
ToArchive.items), humanize.Bytes(uint64(parts.filesToArchive.totalSize))) |
319 | 214 |
320 » tracker := newUploadTracker(checker, uploader) | 215 » tracker := NewUploadTracker(checker, uploader) |
321 if err := tracker.UploadDeps(parts); err != nil { | 216 if err := tracker.UploadDeps(parts); err != nil { |
322 return err | 217 return err |
323 } | 218 } |
324 | 219 |
325 isol.Files = tracker.Files() | 220 isol.Files = tracker.Files() |
326 | 221 |
327 // Marshal the isolated file into JSON, and create an Item to describe i
t. | 222 // Marshal the isolated file into JSON, and create an Item to describe i
t. |
328 var isolJSON []byte | 223 var isolJSON []byte |
329 isolJSON, err = json.Marshal(isol) | 224 isolJSON, err = json.Marshal(isol) |
330 if err != nil { | 225 if err != nil { |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
432 } | 327 } |
433 | 328 |
434 func hashFile(path string) (isolated.HexDigest, error) { | 329 func hashFile(path string) (isolated.HexDigest, error) { |
435 f, err := os.Open(path) | 330 f, err := os.Open(path) |
436 if err != nil { | 331 if err != nil { |
437 return "", err | 332 return "", err |
438 } | 333 } |
439 defer f.Close() | 334 defer f.Close() |
440 return isolated.Hash(f) | 335 return isolated.Hash(f) |
441 } | 336 } |
OLD | NEW |