| OLD | NEW |
| 1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 package main | 15 package main |
| 16 | 16 |
| 17 import ( | 17 import ( |
| 18 "encoding/json" |
| 18 "fmt" | 19 "fmt" |
| 19 "log" | 20 "log" |
| 20 "os" | 21 "os" |
| 22 "path/filepath" |
| 21 | 23 |
| 22 humanize "github.com/dustin/go-humanize" | 24 humanize "github.com/dustin/go-humanize" |
| 23 "github.com/luci/luci-go/common/isolated" | 25 "github.com/luci/luci-go/common/isolated" |
| 24 "github.com/luci/luci-go/common/isolatedclient" | 26 "github.com/luci/luci-go/common/isolatedclient" |
| 25 ) | 27 ) |
| 26 | 28 |
| 27 // UploadTracker uploads and keeps track of files. | 29 // UploadTracker uploads and keeps track of files. |
| 28 type UploadTracker struct { | 30 type UploadTracker struct { |
| 29 checker *Checker | 31 checker *Checker |
| 30 uploader *Uploader | 32 uploader *Uploader |
| 31 » files map[string]isolated.File | 33 » isol *isolated.Isolated |
| 32 } | 34 } |
| 33 | 35 |
| 34 // NewUploadTracker constructs an UploadTracker. | 36 // NewUploadTracker constructs an UploadTracker. It tracks uploaded files in is
ol.Files. |
| 35 func NewUploadTracker(checker *Checker, uploader *Uploader) *UploadTracker { | 37 func NewUploadTracker(checker *Checker, uploader *Uploader, isol *isolated.Isola
ted) *UploadTracker { |
| 38 » isol.Files = make(map[string]isolated.File) |
| 36 return &UploadTracker{ | 39 return &UploadTracker{ |
| 37 checker: checker, | 40 checker: checker, |
| 38 uploader: uploader, | 41 uploader: uploader, |
| 39 » » files: make(map[string]isolated.File), | 42 » » isol: isol, |
| 40 } | 43 } |
| 41 } | 44 } |
| 42 | 45 |
| 43 // UploadDeps uploads all of the items in parts. | 46 // UploadDeps uploads all of the items in parts. |
| 44 func (ut *UploadTracker) UploadDeps(parts partitionedDeps) error { | 47 func (ut *UploadTracker) UploadDeps(parts partitionedDeps) error { |
| 45 if err := ut.populateSymlinks(parts.links.items); err != nil { | 48 if err := ut.populateSymlinks(parts.links.items); err != nil { |
| 46 return err | 49 return err |
| 47 } | 50 } |
| 48 | 51 |
| 49 if err := ut.tarAndUploadFiles(parts.filesToArchive.items); err != nil { | 52 if err := ut.tarAndUploadFiles(parts.filesToArchive.items); err != nil { |
| 50 return err | 53 return err |
| 51 } | 54 } |
| 52 | 55 |
| 53 if err := ut.uploadFiles(parts.indivFiles.items); err != nil { | 56 if err := ut.uploadFiles(parts.indivFiles.items); err != nil { |
| 54 return err | 57 return err |
| 55 } | 58 } |
| 56 return nil | 59 return nil |
| 57 } | 60 } |
| 58 | 61 |
| 59 // Files returns the files which have been uploaded. | 62 // Files returns the files which have been uploaded. |
| 60 // Note: files may not have completed uploading until the tracker's Checker and | 63 // Note: files may not have completed uploading until the tracker's Checker and |
| 61 // Uploader have been closed. | 64 // Uploader have been closed. |
| 62 func (ut *UploadTracker) Files() map[string]isolated.File { | 65 func (ut *UploadTracker) Files() map[string]isolated.File { |
| 63 » return ut.files | 66 » return ut.isol.Files |
| 64 } | 67 } |
| 65 | 68 |
| 66 // populateSymlinks adds an isolated.File to files for each provided symlink | 69 // populateSymlinks adds an isolated.File to files for each provided symlink |
| 67 func (ut *UploadTracker) populateSymlinks(symlinks []*Item) error { | 70 func (ut *UploadTracker) populateSymlinks(symlinks []*Item) error { |
| 68 for _, item := range symlinks { | 71 for _, item := range symlinks { |
| 69 l, err := os.Readlink(item.Path) | 72 l, err := os.Readlink(item.Path) |
| 70 if err != nil { | 73 if err != nil { |
| 71 return fmt.Errorf("unable to resolve symlink for %q: %v"
, item.Path, err) | 74 return fmt.Errorf("unable to resolve symlink for %q: %v"
, item.Path, err) |
| 72 } | 75 } |
| 73 » » ut.files[item.RelPath] = isolated.SymLink(l) | 76 » » ut.isol.Files[item.RelPath] = isolated.SymLink(l) |
| 74 } | 77 } |
| 75 return nil | 78 return nil |
| 76 } | 79 } |
| 77 | 80 |
| 78 // tarAndUploadFiles creates bundles of files, uploads them, and adds each bundl
e to files. | 81 // tarAndUploadFiles creates bundles of files, uploads them, and adds each bundl
e to files. |
| 79 func (ut *UploadTracker) tarAndUploadFiles(smallFiles []*Item) error { | 82 func (ut *UploadTracker) tarAndUploadFiles(smallFiles []*Item) error { |
| 80 bundles := ShardItems(smallFiles, archiveMaxSize) | 83 bundles := ShardItems(smallFiles, archiveMaxSize) |
| 81 log.Printf("\t%d TAR archives to be isolated", len(bundles)) | 84 log.Printf("\t%d TAR archives to be isolated", len(bundles)) |
| 82 | 85 |
| 83 for _, bundle := range bundles { | 86 for _, bundle := range bundles { |
| 84 bundle := bundle | 87 bundle := bundle |
| 85 digest, tarSize, err := bundle.Digest() | 88 digest, tarSize, err := bundle.Digest() |
| 86 if err != nil { | 89 if err != nil { |
| 87 return err | 90 return err |
| 88 } | 91 } |
| 89 | 92 |
| 90 log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes
(uint64(tarSize))) | 93 log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes
(uint64(tarSize))) |
| 91 log.Printf("\tcontains %d files (total %s)", len(bundle.Items),
humanize.Bytes(uint64(bundle.ItemSize))) | 94 log.Printf("\tcontains %d files (total %s)", len(bundle.Items),
humanize.Bytes(uint64(bundle.ItemSize))) |
| 92 // Mint an item for this tar. | 95 // Mint an item for this tar. |
| 93 item := &Item{ | 96 item := &Item{ |
| 94 Path: fmt.Sprintf(".%s.tar", digest), | 97 Path: fmt.Sprintf(".%s.tar", digest), |
| 95 RelPath: fmt.Sprintf(".%s.tar", digest), | 98 RelPath: fmt.Sprintf(".%s.tar", digest), |
| 96 Size: tarSize, | 99 Size: tarSize, |
| 97 Mode: 0644, // Read | 100 Mode: 0644, // Read |
| 98 Digest: digest, | 101 Digest: digest, |
| 99 } | 102 } |
| 100 » » ut.files[item.RelPath] = isolated.TarFile(item.Digest, int(item.
Mode), item.Size) | 103 » » ut.isol.Files[item.RelPath] = isolated.TarFile(item.Digest, int(
item.Mode), item.Size) |
| 101 | 104 |
| 102 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { | 105 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { |
| 103 if ps == nil { | 106 if ps == nil { |
| 104 return | 107 return |
| 105 } | 108 } |
| 106 log.Printf("QUEUED %q for upload", item.RelPath) | 109 log.Printf("QUEUED %q for upload", item.RelPath) |
| 107 ut.uploader.Upload(item.RelPath, bundle.Contents, ps, fu
nc() { | 110 ut.uploader.Upload(item.RelPath, bundle.Contents, ps, fu
nc() { |
| 108 log.Printf("UPLOADED %q", item.RelPath) | 111 log.Printf("UPLOADED %q", item.RelPath) |
| 109 }) | 112 }) |
| 110 }) | 113 }) |
| 111 } | 114 } |
| 112 return nil | 115 return nil |
| 113 } | 116 } |
| 114 | 117 |
| 115 // uploadFiles uploads each file and adds it to files. | 118 // uploadFiles uploads each file and adds it to files. |
| 116 func (ut *UploadTracker) uploadFiles(files []*Item) error { | 119 func (ut *UploadTracker) uploadFiles(files []*Item) error { |
| 117 // Handle the large individually-uploaded files. | 120 // Handle the large individually-uploaded files. |
| 118 for _, item := range files { | 121 for _, item := range files { |
| 119 d, err := hashFile(item.Path) | 122 d, err := hashFile(item.Path) |
| 120 if err != nil { | 123 if err != nil { |
| 121 return err | 124 return err |
| 122 } | 125 } |
| 123 item.Digest = d | 126 item.Digest = d |
| 124 » » ut.files[item.RelPath] = isolated.BasicFile(item.Digest, int(ite
m.Mode), item.Size) | 127 » » ut.isol.Files[item.RelPath] = isolated.BasicFile(item.Digest, in
t(item.Mode), item.Size) |
| 125 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { | 128 ut.checker.AddItem(item, false, func(item *Item, ps *isolatedcli
ent.PushState) { |
| 126 if ps == nil { | 129 if ps == nil { |
| 127 return | 130 return |
| 128 } | 131 } |
| 129 log.Printf("QUEUED %q for upload", item.RelPath) | 132 log.Printf("QUEUED %q for upload", item.RelPath) |
| 130 ut.uploader.UploadFile(item, ps, func() { | 133 ut.uploader.UploadFile(item, ps, func() { |
| 131 log.Printf("UPLOADED %q", item.RelPath) | 134 log.Printf("UPLOADED %q", item.RelPath) |
| 132 }) | 135 }) |
| 133 }) | 136 }) |
| 134 } | 137 } |
| 135 return nil | 138 return nil |
| 136 } | 139 } |
| 140 |
| 141 // Finalize creates and uploads the isolate JSON at the isolatePath. |
| 142 // It returns the corresponding Item and its contents. |
| 143 // Finalize should only be called after UploadDeps. |
| 144 func (ut *UploadTracker) Finalize(isolatedPath string) (*Item, []byte, error) { |
| 145 // Marshal the isolated file into JSON, and create an Item to describe i
t. |
| 146 isolJSON, err := json.Marshal(ut.isol) |
| 147 if err != nil { |
| 148 return nil, []byte{}, err |
| 149 } |
| 150 isolItem := &Item{ |
| 151 Path: isolatedPath, |
| 152 RelPath: filepath.Base(isolatedPath), |
| 153 Digest: isolated.HashBytes(isolJSON), |
| 154 Size: int64(len(isolJSON)), |
| 155 } |
| 156 |
| 157 // Check and upload isolate JSON. |
| 158 ut.checker.AddItem(isolItem, true, func(item *Item, ps *isolatedclient.P
ushState) { |
| 159 if ps == nil { |
| 160 return |
| 161 } |
| 162 log.Printf("QUEUED %q for upload", item.RelPath) |
| 163 ut.uploader.UploadBytes(item.RelPath, isolJSON, ps, func() { |
| 164 log.Printf("UPLOADED %q", item.RelPath) |
| 165 }) |
| 166 }) |
| 167 |
| 168 return isolItem, isolJSON, nil |
| 169 } |
| OLD | NEW |