OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 package main |
| 16 |
| 17 import ( |
| 18 "fmt" |
| 19 "log" |
| 20 "os" |
| 21 |
| 22 humanize "github.com/dustin/go-humanize" |
| 23 "github.com/luci/luci-go/common/isolated" |
| 24 "github.com/luci/luci-go/common/isolatedclient" |
| 25 ) |
| 26 |
| 27 // UploadTracker uploads and keeps track of files. |
| 28 type UploadTracker struct { |
| 29 checker *Checker |
| 30 uploader *Uploader |
| 31 files map[string]isolated.File |
| 32 } |
| 33 |
| 34 // NewUploadTracker constructs an UploadTracker. |
| 35 func NewUploadTracker(checker *Checker, uploader *Uploader) *UploadTracker { |
| 36 return &UploadTracker{ |
| 37 checker: checker, |
| 38 uploader: uploader, |
| 39 files: make(map[string]isolated.File), |
| 40 } |
| 41 } |
| 42 |
| 43 // UploadDeps uploads all of the items in parts. |
| 44 func (ut *UploadTracker) UploadDeps(parts partitionedDeps) error { |
| 45 if err := ut.populateSymlinks(parts.links.items); err != nil { |
| 46 return err |
| 47 } |
| 48 |
| 49 if err := ut.tarAndUploadFiles(parts.filesToArchive.items); err != nil { |
| 50 return err |
| 51 } |
| 52 |
| 53 if err := ut.uploadFiles(parts.indivFiles.items); err != nil { |
| 54 return err |
| 55 } |
| 56 return nil |
| 57 } |
| 58 |
| 59 // Files returns the files which have been uploaded. |
| 60 // Note: files may not have completed uploading until the tracker's Checker and |
| 61 // Uploader have been closed. |
| 62 func (ut *UploadTracker) Files() map[string]isolated.File { |
| 63 return ut.files |
| 64 } |
| 65 |
| 66 // populateSymlinks adds an isolated.File to files for each provided symlink |
| 67 func (ut *UploadTracker) populateSymlinks(symlinks []*Item) error { |
| 68 for _, item := range symlinks { |
| 69 l, err := os.Readlink(item.Path) |
| 70 if err != nil { |
| 71 return fmt.Errorf("unable to resolve symlink for %q: %v"
, item.Path, err) |
| 72 } |
| 73 ut.files[item.RelPath] = isolated.SymLink(l) |
| 74 } |
| 75 return nil |
| 76 } |
| 77 |
| 78 // tarAndUploadFiles creates bundles of files, uploads them, and adds each bundl
e to files. |
| 79 func (ut *UploadTracker) tarAndUploadFiles(smallFiles []*Item) error { |
| 80 bundles := ShardItems(smallFiles, archiveMaxSize) |
| 81 log.Printf("\t%d TAR archives to be isolated", len(bundles)) |
| 82 |
| 83 for _, bundle := range bundles { |
| 84 bundle := bundle |
| 85 digest, tarSize, err := bundle.Digest() |
| 86 if err != nil { |
| 87 return err |
| 88 } |
| 89 |
| 90 log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes
(uint64(tarSize))) |
| 91 log.Printf("\tcontains %d files (total %s)", len(bundle.Items),
humanize.Bytes(uint64(bundle.ItemSize))) |
| 92 // Mint an item for this tar. |
| 93 item := &Item{ |
| 94 Path: fmt.Sprintf(".%s.tar", digest), |
| 95 RelPath: fmt.Sprintf(".%s.tar", digest), |
| 96 Size: tarSize, |
| 97 Mode: 0644, // Read |
| 98 Digest: digest, |
| 99 } |
| 100 ut.files[item.RelPath] = isolated.TarFile(item.Digest, int(item.
Mode), item.Size) |
| 101 |
| 102 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { |
| 103 if ps == nil { |
| 104 return |
| 105 } |
| 106 log.Printf("QUEUED %q for upload", item.RelPath) |
| 107 ut.uploader.Upload(item.RelPath, bundle.Contents, ps, fu
nc() { |
| 108 log.Printf("UPLOADED %q", item.RelPath) |
| 109 }) |
| 110 }) |
| 111 } |
| 112 return nil |
| 113 } |
| 114 |
| 115 // uploadFiles uploads each file and adds it to files. |
| 116 func (ut *UploadTracker) uploadFiles(files []*Item) error { |
| 117 // Handle the large individually-uploaded files. |
| 118 for _, item := range files { |
| 119 d, err := hashFile(item.Path) |
| 120 if err != nil { |
| 121 return err |
| 122 } |
| 123 item.Digest = d |
| 124 ut.files[item.RelPath] = isolated.BasicFile(item.Digest, int(ite
m.Mode), item.Size) |
| 125 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { |
| 126 if ps == nil { |
| 127 return |
| 128 } |
| 129 log.Printf("QUEUED %q for upload", item.RelPath) |
| 130 ut.uploader.UploadFile(item, ps, func() { |
| 131 log.Printf("UPLOADED %q", item.RelPath) |
| 132 }) |
| 133 }) |
| 134 } |
| 135 return nil |
| 136 } |
OLD | NEW |