Index: client/cmd/isolate/upload_tracker.go |
diff --git a/client/cmd/isolate/upload_tracker.go b/client/cmd/isolate/upload_tracker.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..75f37b8cecc572ae216329874e1b0d7ada2972ed |
--- /dev/null |
+++ b/client/cmd/isolate/upload_tracker.go |
@@ -0,0 +1,136 @@ |
+// Copyright 2017 The LUCI Authors. |
+// |
+// Licensed under the Apache License, Version 2.0 (the "License"); |
+// you may not use this file except in compliance with the License. |
+// You may obtain a copy of the License at |
+// |
+// http://www.apache.org/licenses/LICENSE-2.0 |
+// |
+// Unless required by applicable law or agreed to in writing, software |
+// distributed under the License is distributed on an "AS IS" BASIS, |
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+// See the License for the specific language governing permissions and |
+// limitations under the License. |
+ |
+package main |
+ |
+import ( |
+ "fmt" |
+ "log" |
+ "os" |
+ |
+ humanize "github.com/dustin/go-humanize" |
+ "github.com/luci/luci-go/common/isolated" |
+ "github.com/luci/luci-go/common/isolatedclient" |
+) |
+ |
+// UploadTracker uploads and keeps track of files. |
+type UploadTracker struct { |
+ checker *Checker |
+ uploader *Uploader |
+ files map[string]isolated.File |
+} |
+ |
+// NewUploadTracker constructs an UploadTracker. |
+func NewUploadTracker(checker *Checker, uploader *Uploader) *UploadTracker { |
+ return &UploadTracker{ |
+ checker: checker, |
+ uploader: uploader, |
+ files: make(map[string]isolated.File), |
+ } |
+} |
+ |
+// UploadDeps uploads all of the items in parts. |
+func (ut *UploadTracker) UploadDeps(parts partitionedDeps) error { |
+ if err := ut.populateSymlinks(parts.links.items); err != nil { |
+ return err |
+ } |
+ |
+ if err := ut.tarAndUploadFiles(parts.filesToArchive.items); err != nil { |
+ return err |
+ } |
+ |
+ if err := ut.uploadFiles(parts.indivFiles.items); err != nil { |
+ return err |
+ } |
+ return nil |
+} |
+ |
+// Files returns the files which have been uploaded. |
+// Note: files may not have completed uploading until the tracker's Checker and |
+// Uploader have been closed. |
+func (ut *UploadTracker) Files() map[string]isolated.File { |
+ return ut.files |
+} |
+ |
+// populateSymlinks adds an isolated.File to files for each provided symlink |
+func (ut *UploadTracker) populateSymlinks(symlinks []*Item) error { |
+ for _, item := range symlinks { |
+ l, err := os.Readlink(item.Path) |
+ if err != nil { |
+ return fmt.Errorf("unable to resolve symlink for %q: %v", item.Path, err) |
+ } |
+ ut.files[item.RelPath] = isolated.SymLink(l) |
+ } |
+ return nil |
+} |
+ |
+// tarAndUploadFiles creates bundles of files, uploads them, and adds each bundle to files. |
+func (ut *UploadTracker) tarAndUploadFiles(smallFiles []*Item) error { |
+ bundles := ShardItems(smallFiles, archiveMaxSize) |
+ log.Printf("\t%d TAR archives to be isolated", len(bundles)) |
+ |
+ for _, bundle := range bundles { |
+ bundle := bundle |
+ digest, tarSize, err := bundle.Digest() |
+ if err != nil { |
+ return err |
+ } |
+ |
+ log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes(uint64(tarSize))) |
+ log.Printf("\tcontains %d files (total %s)", len(bundle.Items), humanize.Bytes(uint64(bundle.ItemSize))) |
+ // Mint an item for this tar. |
+ item := &Item{ |
+ Path: fmt.Sprintf(".%s.tar", digest), |
+ RelPath: fmt.Sprintf(".%s.tar", digest), |
+ Size: tarSize, |
+ Mode: 0644, // Read |
+ Digest: digest, |
+ } |
+ ut.files[item.RelPath] = isolated.TarFile(item.Digest, int(item.Mode), item.Size) |
+ |
+ ut.checker.AddItem(item, false, func(item *Item, ps *isolatedclient.PushState) { |
+ if ps == nil { |
+ return |
+ } |
+ log.Printf("QUEUED %q for upload", item.RelPath) |
+ ut.uploader.Upload(item.RelPath, bundle.Contents, ps, func() { |
+ log.Printf("UPLOADED %q", item.RelPath) |
+ }) |
+ }) |
+ } |
+ return nil |
+} |
+ |
+// uploadFiles uploads each file and adds it to files. |
+func (ut *UploadTracker) uploadFiles(files []*Item) error { |
+ // Handle the large individually-uploaded files. |
+ for _, item := range files { |
+ d, err := hashFile(item.Path) |
+ if err != nil { |
+ return err |
+ } |
+ item.Digest = d |
+ ut.files[item.RelPath] = isolated.BasicFile(item.Digest, int(item.Mode), item.Size) |
+ ut.checker.AddItem(item, false, func(item *Item, ps *isolatedclient.PushState) { |
+ if ps == nil { |
+ return |
+ } |
+ log.Printf("QUEUED %q for upload", item.RelPath) |
+ ut.uploader.UploadFile(item, ps, func() { |
+ log.Printf("UPLOADED %q", item.RelPath) |
+ }) |
+ }) |
+ } |
+ return nil |
+} |