Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1519)

Side by Side Diff: ct/go/util/gs.go

Issue 1988103002: [CT] Add ability to download to/upload from swarming GS dir with numerical subdirs (Closed) Base URL: https://skia.googlesource.com/buildbot@ct-1-chromium_builds
Patch Set: Fix vet Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/util/gs_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Google Storage utility that contains methods for both CT master and worker 1 // Google Storage utility that contains methods for both CT master and worker
2 // scripts. 2 // scripts.
3 package util 3 package util
4 4
5 import ( 5 import (
6 "fmt" 6 "fmt"
7 "io" 7 "io"
8 "io/ioutil" 8 "io/ioutil"
9 "net/http" 9 "net/http"
10 "os" 10 "os"
11 "path"
11 "path/filepath" 12 "path/filepath"
13 "strconv"
12 "strings" 14 "strings"
13 "sync" 15 "sync"
14 "time" 16 "time"
15 17
16 "github.com/skia-dev/glog" 18 "github.com/skia-dev/glog"
17 "go.skia.org/infra/go/auth" 19 "go.skia.org/infra/go/auth"
18 "go.skia.org/infra/go/gs" 20 "go.skia.org/infra/go/gs"
19 "go.skia.org/infra/go/util" 21 "go.skia.org/infra/go/util"
20 googleapi "google.golang.org/api/googleapi" 22 googleapi "google.golang.org/api/googleapi"
21 storage "google.golang.org/api/storage/v1" 23 storage "google.golang.org/api/storage/v1"
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 223
222 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { 224 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal {
223 // No need to download artifacts they already exist locally. 225 // No need to download artifacts they already exist locally.
224 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) 226 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir)
225 return nil 227 return nil
226 } 228 }
227 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) 229 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir)
228 return gs.downloadRemoteDir(localDir, gsDir) 230 return gs.downloadRemoteDir(localDir, gsDir)
229 } 231 }
230 232
231 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { 233 func (gs *GsUtil) DeleteRemoteDir(gsDir string) error {
232 // The channel where the GS filepaths to be deleted will be sent to. 234 // The channel where the GS filepaths to be deleted will be sent to.
233 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) 235 chFilePaths := make(chan string, MAX_CHANNEL_SIZE)
234 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") 236 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/")
235 for req != nil { 237 for req != nil {
236 resp, err := req.Do() 238 resp, err := req.Do()
237 if err != nil { 239 if err != nil {
238 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) 240 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
239 } 241 }
240 for _, result := range resp.Items { 242 for _, result := range resp.Items {
241 chFilePaths <- result.Name 243 chFilePaths <- result.Name
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
312 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) 314 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum))
313 315
314 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { 316 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal {
315 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) 317 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir )
316 return nil 318 return nil
317 } 319 }
318 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) 320 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir)
319 return gs.UploadDir(localDir, gsDir, true) 321 return gs.UploadDir(localDir, gsDir, true)
320 } 322 }
321 323
324 // UploadSwarmingArtifact uploads the specified local artifacts to Google Storag e.
325 func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error {
326 localDir := path.Join(StorageDir, dirName, pagesetType)
327 gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType)
328
329 return gs.UploadDir(localDir, gsDir, false)
330 }
331
332 // DownloadSwarmingArtifacts downloads the specified artifacts from Google Stora ge to a local dir.
333 // The Google storage directory is assumed to have numerical subdirs Eg: {1..100 0}. This function
334 // downloads the contents of those directories into a local directory without th e numerical
335 // subdirs.
336 // Returns the ranking/index of the downloaded artifact.
337 func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) {
338 // Empty the local dir.
339 util.RemoveAll(localDir)
340 // Create the local dir.
341 util.MkdirAll(localDir, 0700)
342
343 gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType)
344 endRange := num + startRange - 1
345 // The channel where remote files to be downloaded will be sent to.
346 chRemoteDirs := make(chan string, num)
347 for i := startRange; i <= endRange; i++ {
348 chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i))
349 }
350 close(chRemoteDirs)
351
352 // Dictionary of artifacts to its rank/index.
353 artifactToIndex := map[string]int{}
354 // Mutex to control access to the above dictionary.
355 var mtx sync.Mutex
356 // Kick off goroutines to download artifacts and populate the artifactTo Index dictionary.
357 var wg sync.WaitGroup
358 for i := 0; i < GOROUTINE_POOL_SIZE; i++ {
359 wg.Add(1)
360 go func(goroutineNum int) {
361 defer wg.Done()
362 for remoteDir := range chRemoteDirs {
363 if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil {
364 glog.Error(err)
365 return
366 }
367 }
368 }(i + 1)
369 }
370 wg.Wait()
371 if len(chRemoteDirs) != 0 {
372 return artifactToIndex, fmt.Errorf("The chRemoteDirs channel was expected to be empty!")
dogben 2016/05/19 14:18:20 uber-nit: Maybe just "Unable to download all artif
rmistry 2016/05/19 14:25:08 Done.
373 }
374 return artifactToIndex, nil
375 }
376
377 func (gs *GsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, run ID int, mtx *sync.Mutex, artifactToIndex map[string]int) error {
378 req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/")
379 for req != nil {
380 resp, err := req.Do()
381 if err != nil {
382 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
383 }
384 for _, result := range resp.Items {
385 fileName := filepath.Base(result.Name)
386 fileGsDir := filepath.Dir(result.Name)
387 index, err := strconv.Atoi(path.Base(fileGsDir))
388 if err != nil {
389 return fmt.Errorf("%s was not in expected format : %s", fileGsDir, err)
390 }
391 respBody, err := getRespBody(result, gs.client)
392 if err != nil {
393 return fmt.Errorf("Could not fetch %s: %s", resu lt.MediaLink, err)
394 }
395 defer util.Close(respBody)
396 outputFile := filepath.Join(localDir, fileName)
397 out, err := os.Create(outputFile)
398 if err != nil {
399 return fmt.Errorf("Unable to create file %s: %s" , outputFile, err)
400 }
401 defer util.Close(out)
402 if _, err = io.Copy(out, respBody); err != nil {
403 return err
404 }
405 glog.Infof("Downloaded gs://%s/%s to %s with id#%d", GSB ucketName, result.Name, outputFile, runID)
406 // Sleep for a second after downloading file to avoid bo mbarding Cloud
407 // storage.
408 time.Sleep(time.Second)
409 mtx.Lock()
410 artifactToIndex[path.Join(localDir, fileName)] = index
411 mtx.Unlock()
412 }
413 if len(resp.NextPageToken) > 0 {
414 req.PageToken(resp.NextPageToken)
415 } else {
416 req = nil
417 }
418 }
419 return nil
420 }
421
322 // UploadDir uploads the specified local dir into the specified Google Storage d ir. 422 // UploadDir uploads the specified local dir into the specified Google Storage d ir.
323 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { 423 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error {
324 if cleanDir { 424 if cleanDir {
325 // Empty the remote dir before uploading to it. 425 // Empty the remote dir before uploading to it.
326 » » util.LogErr(gs.deleteRemoteDir(gsDir)) 426 » » util.LogErr(gs.DeleteRemoteDir(gsDir))
327 } 427 }
328 428
329 // Construct a dictionary of file paths to their file infos. 429 // Construct a dictionary of file paths to their file infos.
330 pathsToFileInfos := map[string]os.FileInfo{} 430 pathsToFileInfos := map[string]os.FileInfo{}
331 visit := func(path string, f os.FileInfo, err error) error { 431 visit := func(path string, f os.FileInfo, err error) error {
332 if f.IsDir() { 432 if f.IsDir() {
333 return nil 433 return nil
334 } 434 }
335 pathsToFileInfos[path] = f 435 pathsToFileInfos[path] = f
336 return nil 436 return nil
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
369 } 469 }
370 // Sleep for a second after uploading file to av oid bombarding Cloud 470 // Sleep for a second after uploading file to av oid bombarding Cloud
371 // storage. 471 // storage.
372 time.Sleep(time.Second) 472 time.Sleep(time.Second)
373 } 473 }
374 }(i + 1) 474 }(i + 1)
375 } 475 }
376 wg.Wait() 476 wg.Wait()
377 return nil 477 return nil
378 } 478 }
OLDNEW
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/util/gs_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698