OLD | NEW |
1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
(...skipping 151 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
162 walker := partitioningWalker{fsView: fsView} | 162 walker := partitioningWalker{fsView: fsView} |
163 for _, dep := range deps { | 163 for _, dep := range deps { |
164 // Try to walk dep. If dep is a file (or symlink), the inner fun
ction is called exactly once. | 164 // Try to walk dep. If dep is a file (or symlink), the inner fun
ction is called exactly once. |
165 if err := filepath.Walk(filepath.Clean(dep), walker.walkFn); err
!= nil { | 165 if err := filepath.Walk(filepath.Clean(dep), walker.walkFn); err
!= nil { |
166 return partitionedDeps{}, err | 166 return partitionedDeps{}, err |
167 } | 167 } |
168 } | 168 } |
169 return walker.parts, nil | 169 return walker.parts, nil |
170 } | 170 } |
171 | 171 |
172 // main contains the core logic for experimental archive. | 172 func uploadDeps(parts partitionedDeps, checker *Checker, uploader *Uploader) (ma
p[string]isolated.File, error) { |
173 func (c *expArchiveRun) main() error { | |
174 » // TODO(djd): This func is long and has a lot of internal complexity (li
ke, | |
175 » // such as, archiveCallback). Refactor. | |
176 | |
177 » start := time.Now() | |
178 » archiveOpts := &c.isolateFlags.ArchiveOptions | |
179 » // Parse the incoming isolate file. | |
180 » deps, rootDir, isol, err := isolate.ProcessIsolate(archiveOpts) | |
181 » if err != nil { | |
182 » » return fmt.Errorf("failed to process isolate: %v", err) | |
183 » } | |
184 » log.Printf("Isolate referenced %d deps", len(deps)) | |
185 | |
186 » // Set up a background context which is cancelled when this function ret
urns. | |
187 » ctx, cancel := context.WithCancel(context.Background()) | |
188 » defer cancel() | |
189 | |
190 » // Create the isolated client which connects to the isolate server. | |
191 » authCl, err := c.createAuthClient() | |
192 » if err != nil { | |
193 » » return err | |
194 » } | |
195 » client := isolatedclient.New(nil, authCl, c.isolatedFlags.ServerURL, c.i
solatedFlags.Namespace, nil, nil) | |
196 | |
197 » // Set up a checker and uploader. We limit the uploader to one concurren
t | |
198 » // upload, since the uploads are all coming from disk (with the exceptio
n of | |
199 » // the isolated JSON itself) and we only want a single goroutine reading
from | |
200 » // disk at once. | |
201 » checker := NewChecker(ctx, client) | |
202 » uploader := NewUploader(ctx, client, 1) | |
203 | |
204 » parts, err := partitionDeps(deps, rootDir, c.isolateFlags.ArchiveOptions
.Blacklist) | |
205 » if err != nil { | |
206 » » return fmt.Errorf("partitioning deps: %v", err) | |
207 » } | |
208 | |
209 // Construct a map of the files that constitute the isolate. | 173 // Construct a map of the files that constitute the isolate. |
210 files := make(map[string]isolated.File) | 174 files := make(map[string]isolated.File) |
211 | 175 |
212 numFiles := len(parts.filesToArchive.items) + len(parts.indivFiles.items
) | |
213 filesSize := uint64(parts.filesToArchive.totalSize + parts.indivFiles.to
talSize) | |
214 log.Printf("Isolate expanded to %d files (total size %s) and %d symlinks
", numFiles, humanize.Bytes(filesSize), len(parts.links.items)) | |
215 log.Printf("\t%d files (%s) to be isolated individually", len(parts.indi
vFiles.items), humanize.Bytes(uint64(parts.indivFiles.totalSize))) | |
216 log.Printf("\t%d files (%s) to be isolated in archives", len(parts.files
ToArchive.items), humanize.Bytes(uint64(parts.filesToArchive.totalSize))) | |
217 | |
218 // Handle the symlinks. | 176 // Handle the symlinks. |
219 for _, item := range parts.links.items { | 177 for _, item := range parts.links.items { |
220 l, err := os.Readlink(item.Path) | 178 l, err := os.Readlink(item.Path) |
221 if err != nil { | 179 if err != nil { |
222 » » » return fmt.Errorf("unable to resolve symlink for %q: %v"
, item.Path, err) | 180 » » » return nil, fmt.Errorf("unable to resolve symlink for %q
: %v", item.Path, err) |
223 } | 181 } |
224 files[item.RelPath] = isolated.SymLink(l) | 182 files[item.RelPath] = isolated.SymLink(l) |
225 } | 183 } |
226 | 184 |
227 // Handle the small to-be-archived files. | 185 // Handle the small to-be-archived files. |
228 bundles := ShardItems(parts.filesToArchive.items, archiveMaxSize) | 186 bundles := ShardItems(parts.filesToArchive.items, archiveMaxSize) |
229 log.Printf("\t%d TAR archives to be isolated", len(bundles)) | 187 log.Printf("\t%d TAR archives to be isolated", len(bundles)) |
230 | 188 |
231 for _, bundle := range bundles { | 189 for _, bundle := range bundles { |
232 bundle := bundle | 190 bundle := bundle |
233 digest, tarSize, err := bundle.Digest() | 191 digest, tarSize, err := bundle.Digest() |
234 if err != nil { | 192 if err != nil { |
235 » » » return err | 193 » » » return nil, err |
236 } | 194 } |
237 | 195 |
238 log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes
(uint64(tarSize))) | 196 log.Printf("Created tar archive %q (%s)", digest, humanize.Bytes
(uint64(tarSize))) |
239 log.Printf("\tcontains %d files (total %s)", len(bundle.Items),
humanize.Bytes(uint64(bundle.ItemSize))) | 197 log.Printf("\tcontains %d files (total %s)", len(bundle.Items),
humanize.Bytes(uint64(bundle.ItemSize))) |
240 // Mint an item for this tar. | 198 // Mint an item for this tar. |
241 item := &Item{ | 199 item := &Item{ |
242 Path: fmt.Sprintf(".%s.tar", digest), | 200 Path: fmt.Sprintf(".%s.tar", digest), |
243 RelPath: fmt.Sprintf(".%s.tar", digest), | 201 RelPath: fmt.Sprintf(".%s.tar", digest), |
244 Size: tarSize, | 202 Size: tarSize, |
245 Mode: 0644, // Read | 203 Mode: 0644, // Read |
246 Digest: digest, | 204 Digest: digest, |
247 } | 205 } |
248 files[item.RelPath] = isolated.TarFile(item.Digest, int(item.Mod
e), item.Size) | 206 files[item.RelPath] = isolated.TarFile(item.Digest, int(item.Mod
e), item.Size) |
249 | 207 |
250 checker.AddItem(item, false, func(item *Item, ps *isolatedclient
.PushState) { | 208 checker.AddItem(item, false, func(item *Item, ps *isolatedclient
.PushState) { |
251 if ps == nil { | 209 if ps == nil { |
252 return | 210 return |
253 } | 211 } |
254 log.Printf("QUEUED %q for upload", item.RelPath) | 212 log.Printf("QUEUED %q for upload", item.RelPath) |
255 uploader.Upload(item.RelPath, bundle.Contents, ps, func(
) { | 213 uploader.Upload(item.RelPath, bundle.Contents, ps, func(
) { |
256 log.Printf("UPLOADED %q", item.RelPath) | 214 log.Printf("UPLOADED %q", item.RelPath) |
257 }) | 215 }) |
258 }) | 216 }) |
259 } | 217 } |
260 | 218 |
261 // Handle the large individually-uploaded files. | 219 // Handle the large individually-uploaded files. |
262 for _, item := range parts.indivFiles.items { | 220 for _, item := range parts.indivFiles.items { |
263 d, err := hashFile(item.Path) | 221 d, err := hashFile(item.Path) |
264 if err != nil { | 222 if err != nil { |
265 » » » return err | 223 » » » return nil, err |
266 } | 224 } |
267 item.Digest = d | 225 item.Digest = d |
268 files[item.RelPath] = isolated.BasicFile(item.Digest, int(item.M
ode), item.Size) | 226 files[item.RelPath] = isolated.BasicFile(item.Digest, int(item.M
ode), item.Size) |
269 checker.AddItem(item, false, func(item *Item, ps *isolatedclient
.PushState) { | 227 checker.AddItem(item, false, func(item *Item, ps *isolatedclient
.PushState) { |
270 if ps == nil { | 228 if ps == nil { |
271 return | 229 return |
272 } | 230 } |
273 log.Printf("QUEUED %q for upload", item.RelPath) | 231 log.Printf("QUEUED %q for upload", item.RelPath) |
274 uploader.UploadFile(item, ps, func() { | 232 uploader.UploadFile(item, ps, func() { |
275 log.Printf("UPLOADED %q", item.RelPath) | 233 log.Printf("UPLOADED %q", item.RelPath) |
276 }) | 234 }) |
277 }) | 235 }) |
278 } | 236 } |
| 237 return files, nil |
| 238 } |
| 239 |
| 240 // main contains the core logic for experimental archive. |
| 241 func (c *expArchiveRun) main() error { |
| 242 // TODO(djd): This func is long and has a lot of internal complexity (li
ke, |
| 243 // such as, archiveCallback). Refactor. |
| 244 |
| 245 start := time.Now() |
| 246 archiveOpts := &c.isolateFlags.ArchiveOptions |
| 247 // Parse the incoming isolate file. |
| 248 deps, rootDir, isol, err := isolate.ProcessIsolate(archiveOpts) |
| 249 if err != nil { |
| 250 return fmt.Errorf("failed to process isolate: %v", err) |
| 251 } |
| 252 log.Printf("Isolate referenced %d deps", len(deps)) |
| 253 |
| 254 // Set up a background context which is cancelled when this function ret
urns. |
| 255 ctx, cancel := context.WithCancel(context.Background()) |
| 256 defer cancel() |
| 257 |
| 258 // Create the isolated client which connects to the isolate server. |
| 259 authCl, err := c.createAuthClient() |
| 260 if err != nil { |
| 261 return err |
| 262 } |
| 263 client := isolatedclient.New(nil, authCl, c.isolatedFlags.ServerURL, c.i
solatedFlags.Namespace, nil, nil) |
| 264 |
| 265 // Set up a checker and uploader. We limit the uploader to one concurren
t |
| 266 // upload, since the uploads are all coming from disk (with the exceptio
n of |
| 267 // the isolated JSON itself) and we only want a single goroutine reading
from |
| 268 // disk at once. |
| 269 checker := NewChecker(ctx, client) |
| 270 uploader := NewUploader(ctx, client, 1) |
| 271 |
| 272 parts, err := partitionDeps(deps, rootDir, c.isolateFlags.ArchiveOptions
.Blacklist) |
| 273 if err != nil { |
| 274 return fmt.Errorf("partitioning deps: %v", err) |
| 275 } |
| 276 |
| 277 numFiles := len(parts.filesToArchive.items) + len(parts.indivFiles.items
) |
| 278 filesSize := uint64(parts.filesToArchive.totalSize + parts.indivFiles.to
talSize) |
| 279 log.Printf("Isolate expanded to %d files (total size %s) and %d symlinks
", numFiles, humanize.Bytes(filesSize), len(parts.links.items)) |
| 280 log.Printf("\t%d files (%s) to be isolated individually", len(parts.indi
vFiles.items), humanize.Bytes(uint64(parts.indivFiles.totalSize))) |
| 281 log.Printf("\t%d files (%s) to be isolated in archives", len(parts.files
ToArchive.items), humanize.Bytes(uint64(parts.filesToArchive.totalSize))) |
| 282 |
| 283 files, err := uploadDeps(parts, checker, uploader) |
| 284 if err != nil { |
| 285 return err |
| 286 } |
279 | 287 |
280 // Marshal the isolated file into JSON, and create an Item to describe i
t. | 288 // Marshal the isolated file into JSON, and create an Item to describe i
t. |
281 isol.Files = files | 289 isol.Files = files |
282 var isolJSON []byte | 290 var isolJSON []byte |
283 isolJSON, err = json.Marshal(isol) | 291 isolJSON, err = json.Marshal(isol) |
284 if err != nil { | 292 if err != nil { |
285 return err | 293 return err |
286 } | 294 } |
287 isolItem := &Item{ | 295 isolItem := &Item{ |
288 Path: archiveOpts.Isolated, | 296 Path: archiveOpts.Isolated, |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
386 } | 394 } |
387 | 395 |
388 func hashFile(path string) (isolated.HexDigest, error) { | 396 func hashFile(path string) (isolated.HexDigest, error) { |
389 f, err := os.Open(path) | 397 f, err := os.Open(path) |
390 if err != nil { | 398 if err != nil { |
391 return "", err | 399 return "", err |
392 } | 400 } |
393 defer f.Close() | 401 defer f.Close() |
394 return isolated.Hash(f) | 402 return isolated.Hash(f) |
395 } | 403 } |
OLD | NEW |