| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 162 walker := partitioningWalker{fsView: fsView} | 162 walker := partitioningWalker{fsView: fsView} |
| 163 for _, dep := range deps { | 163 for _, dep := range deps { |
| 164 // Try to walk dep. If dep is a file (or symlink), the inner fun
ction is called exactly once. | 164 // Try to walk dep. If dep is a file (or symlink), the inner fun
ction is called exactly once. |
| 165 if err := filepath.Walk(filepath.Clean(dep), walker.walkFn); err
!= nil { | 165 if err := filepath.Walk(filepath.Clean(dep), walker.walkFn); err
!= nil { |
| 166 return partitionedDeps{}, err | 166 return partitionedDeps{}, err |
| 167 } | 167 } |
| 168 } | 168 } |
| 169 return walker.parts, nil | 169 return walker.parts, nil |
| 170 } | 170 } |
| 171 | 171 |
| 172 type uploadTracker struct { | |
| 173 checker *Checker | |
| 174 uploader *Uploader | |
| 175 files map[string]isolated.File | |
| 176 } | |
| 177 | |
| 178 func newUploadTracker(checker *Checker, uploader *Uploader) *uploadTracker { | |
| 179 return &uploadTracker{ | |
| 180 checker: checker, | |
| 181 uploader: uploader, | |
| 182 files: make(map[string]isolated.File), | |
| 183 } | |
| 184 } | |
| 185 | |
| 186 func (ut *uploadTracker) Files() map[string]isolated.File { | |
| 187 return ut.files | |
| 188 } | |
| 189 | |
| 190 // populateSymlinks adds an isolated.File to files for each provided symlink | |
| 191 func (ut *uploadTracker) populateSymlinks(symlinks []*Item) error { | |
| 192 for _, item := range symlinks { | |
| 193 l, err := os.Readlink(item.Path) | |
| 194 if err != nil { | |
| 195 return fmt.Errorf("unable to resolve symlink for %q: %v"
, item.Path, err) | |
| 196 } | |
| 197 ut.files[item.RelPath] = isolated.SymLink(l) | |
| 198 } | |
| 199 return nil | |
| 200 } | |
| 201 | |
| 202 // tarAndUploadFiles creates bundles of files, uploads them, and adds each bundl
e to files. | |
| 203 func (ut *uploadTracker) tarAndUploadFiles(smallFiles []*Item) error { | |
| 204 bundles := ShardItems(smallFiles, archiveMaxSize) | |
| 205 log.Printf("\t%d TAR archives to be isolated", len(bundles)) | |
| 206 | |
| 207 for _, bundle := range bundles { | |
| 208 bundle := bundle | |
| 209 digest, tarSize, err := bundle.Digest() | |
| 210 if err != nil { | |
| 211 return err | |
| 212 } | |
| 213 | |
| 214 log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes
(uint64(tarSize))) | |
| 215 log.Printf("\tcontains %d files (total %s)", len(bundle.Items),
humanize.Bytes(uint64(bundle.ItemSize))) | |
| 216 // Mint an item for this tar. | |
| 217 item := &Item{ | |
| 218 Path: fmt.Sprintf(".%s.tar", digest), | |
| 219 RelPath: fmt.Sprintf(".%s.tar", digest), | |
| 220 Size: tarSize, | |
| 221 Mode: 0644, // Read | |
| 222 Digest: digest, | |
| 223 } | |
| 224 ut.files[item.RelPath] = isolated.TarFile(item.Digest, int(item.
Mode), item.Size) | |
| 225 | |
| 226 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { | |
| 227 if ps == nil { | |
| 228 return | |
| 229 } | |
| 230 log.Printf("QUEUED %q for upload", item.RelPath) | |
| 231 ut.uploader.Upload(item.RelPath, bundle.Contents, ps, fu
nc() { | |
| 232 log.Printf("UPLOADED %q", item.RelPath) | |
| 233 }) | |
| 234 }) | |
| 235 } | |
| 236 return nil | |
| 237 } | |
| 238 | |
| 239 // uploadFiles uploads each file and adds it to files. | |
| 240 func (ut *uploadTracker) uploadFiles(files []*Item) error { | |
| 241 // Handle the large individually-uploaded files. | |
| 242 for _, item := range files { | |
| 243 d, err := hashFile(item.Path) | |
| 244 if err != nil { | |
| 245 return err | |
| 246 } | |
| 247 item.Digest = d | |
| 248 ut.files[item.RelPath] = isolated.BasicFile(item.Digest, int(ite
m.Mode), item.Size) | |
| 249 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { | |
| 250 if ps == nil { | |
| 251 return | |
| 252 } | |
| 253 log.Printf("QUEUED %q for upload", item.RelPath) | |
| 254 ut.uploader.UploadFile(item, ps, func() { | |
| 255 log.Printf("UPLOADED %q", item.RelPath) | |
| 256 }) | |
| 257 }) | |
| 258 } | |
| 259 return nil | |
| 260 } | |
| 261 | |
| 262 func (ut *uploadTracker) UploadDeps(parts partitionedDeps) error { | |
| 263 if err := ut.populateSymlinks(parts.links.items); err != nil { | |
| 264 return err | |
| 265 } | |
| 266 | |
| 267 if err := ut.tarAndUploadFiles(parts.filesToArchive.items); err != nil { | |
| 268 return err | |
| 269 } | |
| 270 | |
| 271 if err := ut.uploadFiles(parts.indivFiles.items); err != nil { | |
| 272 return err | |
| 273 } | |
| 274 return nil | |
| 275 } | |
| 276 | |
| 277 // main contains the core logic for experimental archive. | 172 // main contains the core logic for experimental archive. |
| 278 func (c *expArchiveRun) main() error { | 173 func (c *expArchiveRun) main() error { |
| 279 // TODO(djd): This func is long and has a lot of internal complexity (li
ke, | 174 // TODO(djd): This func is long and has a lot of internal complexity (li
ke, |
| 280 // such as, archiveCallback). Refactor. | 175 // such as, archiveCallback). Refactor. |
| 281 | 176 |
| 282 start := time.Now() | 177 start := time.Now() |
| 283 archiveOpts := &c.isolateFlags.ArchiveOptions | 178 archiveOpts := &c.isolateFlags.ArchiveOptions |
| 284 // Parse the incoming isolate file. | 179 // Parse the incoming isolate file. |
| 285 deps, rootDir, isol, err := isolate.ProcessIsolate(archiveOpts) | 180 deps, rootDir, isol, err := isolate.ProcessIsolate(archiveOpts) |
| 286 if err != nil { | 181 if err != nil { |
| (...skipping 23 matching lines...) Expand all Loading... |
| 310 if err != nil { | 205 if err != nil { |
| 311 return fmt.Errorf("partitioning deps: %v", err) | 206 return fmt.Errorf("partitioning deps: %v", err) |
| 312 } | 207 } |
| 313 | 208 |
| 314 numFiles := len(parts.filesToArchive.items) + len(parts.indivFiles.items
) | 209 numFiles := len(parts.filesToArchive.items) + len(parts.indivFiles.items
) |
| 315 filesSize := uint64(parts.filesToArchive.totalSize + parts.indivFiles.to
talSize) | 210 filesSize := uint64(parts.filesToArchive.totalSize + parts.indivFiles.to
talSize) |
| 316 log.Printf("Isolate expanded to %d files (total size %s) and %d symlinks
", numFiles, humanize.Bytes(filesSize), len(parts.links.items)) | 211 log.Printf("Isolate expanded to %d files (total size %s) and %d symlinks
", numFiles, humanize.Bytes(filesSize), len(parts.links.items)) |
| 317 log.Printf("\t%d files (%s) to be isolated individually", len(parts.indi
vFiles.items), humanize.Bytes(uint64(parts.indivFiles.totalSize))) | 212 log.Printf("\t%d files (%s) to be isolated individually", len(parts.indi
vFiles.items), humanize.Bytes(uint64(parts.indivFiles.totalSize))) |
| 318 log.Printf("\t%d files (%s) to be isolated in archives", len(parts.files
ToArchive.items), humanize.Bytes(uint64(parts.filesToArchive.totalSize))) | 213 log.Printf("\t%d files (%s) to be isolated in archives", len(parts.files
ToArchive.items), humanize.Bytes(uint64(parts.filesToArchive.totalSize))) |
| 319 | 214 |
| 320 » tracker := newUploadTracker(checker, uploader) | 215 » tracker := NewUploadTracker(checker, uploader) |
| 321 if err := tracker.UploadDeps(parts); err != nil { | 216 if err := tracker.UploadDeps(parts); err != nil { |
| 322 return err | 217 return err |
| 323 } | 218 } |
| 324 | 219 |
| 325 isol.Files = tracker.Files() | 220 isol.Files = tracker.Files() |
| 326 | 221 |
| 327 // Marshal the isolated file into JSON, and create an Item to describe i
t. | 222 // Marshal the isolated file into JSON, and create an Item to describe i
t. |
| 328 var isolJSON []byte | 223 var isolJSON []byte |
| 329 isolJSON, err = json.Marshal(isol) | 224 isolJSON, err = json.Marshal(isol) |
| 330 if err != nil { | 225 if err != nil { |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 432 } | 327 } |
| 433 | 328 |
| 434 func hashFile(path string) (isolated.HexDigest, error) { | 329 func hashFile(path string) (isolated.HexDigest, error) { |
| 435 f, err := os.Open(path) | 330 f, err := os.Open(path) |
| 436 if err != nil { | 331 if err != nil { |
| 437 return "", err | 332 return "", err |
| 438 } | 333 } |
| 439 defer f.Close() | 334 defer f.Close() |
| 440 return isolated.Hash(f) | 335 return isolated.Hash(f) |
| 441 } | 336 } |
| OLD | NEW |