Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(429)

Side by Side Diff: ct/go/util/gs.go

Issue 1988103002: [CT] Add ability to download to/upload from swarming GS dir with numerical subdirs (Closed) Base URL: https://skia.googlesource.com/buildbot@ct-1-chromium_builds
Patch Set: Initial upload Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Google Storage utility that contains methods for both CT master and worker 1 // Google Storage utility that contains methods for both CT master and worker
2 // scripts. 2 // scripts.
3 package util 3 package util
4 4
5 import ( 5 import (
6 "fmt" 6 "fmt"
7 "io" 7 "io"
8 "io/ioutil" 8 "io/ioutil"
9 "net/http" 9 "net/http"
10 "os" 10 "os"
11 "path"
11 "path/filepath" 12 "path/filepath"
13 "strconv"
12 "strings" 14 "strings"
13 "sync" 15 "sync"
14 "time" 16 "time"
15 17
16 "github.com/skia-dev/glog" 18 "github.com/skia-dev/glog"
17 "go.skia.org/infra/go/auth" 19 "go.skia.org/infra/go/auth"
18 "go.skia.org/infra/go/gs" 20 "go.skia.org/infra/go/gs"
19 "go.skia.org/infra/go/util" 21 "go.skia.org/infra/go/util"
20 googleapi "google.golang.org/api/googleapi" 22 googleapi "google.golang.org/api/googleapi"
21 storage "google.golang.org/api/storage/v1" 23 storage "google.golang.org/api/storage/v1"
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after
221 223
222 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { 224 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal {
223 // No need to download artifacts they already exist locally. 225 // No need to download artifacts they already exist locally.
224 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) 226 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir)
225 return nil 227 return nil
226 } 228 }
227 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) 229 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir)
228 return gs.downloadRemoteDir(localDir, gsDir) 230 return gs.downloadRemoteDir(localDir, gsDir)
229 } 231 }
230 232
231 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { 233 func (gs *GsUtil) DeleteRemoteDir(gsDir string) error {
232 // The channel where the GS filepaths to be deleted will be sent to. 234 // The channel where the GS filepaths to be deleted will be sent to.
233 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) 235 chFilePaths := make(chan string, MAX_CHANNEL_SIZE)
234 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") 236 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/")
235 for req != nil { 237 for req != nil {
236 resp, err := req.Do() 238 resp, err := req.Do()
237 if err != nil { 239 if err != nil {
238 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) 240 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
239 } 241 }
240 for _, result := range resp.Items { 242 for _, result := range resp.Items {
241 chFilePaths <- result.Name 243 chFilePaths <- result.Name
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
312 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) 314 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum))
313 315
314 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { 316 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal {
315 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) 317 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir )
316 return nil 318 return nil
317 } 319 }
318 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) 320 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir)
319 return gs.UploadDir(localDir, gsDir, true) 321 return gs.UploadDir(localDir, gsDir, true)
320 } 322 }
321 323
324 // UploadSwarmingArtifact uploads the specified local artifacts to Google Storag e.
325 func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error {
326 localDir := path.Join(StorageDir, dirName, pagesetType)
327 gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType)
328
329 return gs.UploadDir(localDir, gsDir, false)
330 }
331
332 // DownloadSwarmingArtifacts downloads the specified artifacts from Google Stora ge to a local dir.
333 // The Google storage directory is assumed to have numerical subdirs Eg: {1..100 0}. This function
334 // downloads the contents of those directories into a local directory without th e numerical
dogben 2016/05/18 15:22:01 Aren't there files with the same name in multiple
rmistry 2016/05/19 11:54:10 Possible but unlikely. If they exist right now the
335 // subdirs.
336 // Returns the ranking/index of the downloaded artifact.
337 func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) {
338 // Empty the local dir.
339 util.RemoveAll(localDir)
340 // Create the local dir.
341 util.MkdirAll(localDir, 0700)
342
343 gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType)
344 endRange := num + startRange - 1
345 // The channel where remote files to be downloaded will be sent to.
346 chRemoteDirs := make(chan string, num)
347 for i := startRange; i <= endRange; i++ {
348 chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i))
349 }
350 close(chRemoteDirs)
351
352 // Dictionary of artifacts to its rank/index.
353 artifactToIndex := map[string]int{}
354 // Mutex to control access to the above dictionary.
355 var mtx sync.Mutex
356 // Kick off goroutines to download artifacts and populate the artifactTo Index dictionary.
357 var wg sync.WaitGroup
358 for i := 0; i < GOROUTINE_POOL_SIZE; i++ {
359 wg.Add(1)
360 go func(goroutineNum int) {
361 defer wg.Done()
362 for remoteDir := range chRemoteDirs {
363
364 req := gs.service.Objects.List(GSBucketName).Pre fix(remoteDir + "/")
365 for req != nil {
366 resp, err := req.Do()
367 if err != nil {
368 glog.Errorf("Error occured while listing %s: %s", gsDir, err)
369 return
370 }
371 for _, result := range resp.Items {
372 fileName := filepath.Base(result .Name)
373 fileGsDir := filepath.Dir(result .Name)
374 index, err := strconv.Atoi(path. Base(fileGsDir))
375 if err != nil {
376 glog.Errorf("%s was not in expected format: %s", fileGsDir, err)
377 }
dogben 2016/05/18 15:22:01 Missing return?
rmistry 2016/05/19 11:54:10 Oops. Done.
378 respBody, err := getRespBody(res ult, gs.client)
379 if err != nil {
380 glog.Errorf("Could not f etch %s: %s", result.MediaLink, err)
381 return
382 }
383 defer util.Close(respBody)
dogben 2016/05/18 15:22:01 If there are many items in chRemoteDirs, deferring
rmistry 2016/05/19 11:54:10 Done.
384 outputFile := filepath.Join(loca lDir, fileName)
385 out, err := os.Create(outputFile )
386 if err != nil {
387 glog.Errorf("Unable to c reate file %s: %s", outputFile, err)
388 return
389 }
390 defer util.Close(out)
dogben 2016/05/18 15:22:01 nit: util.Close will ignore errors on closing the
rmistry 2016/05/19 11:54:11 Yea, I figured just logging an error was enough he
391 if _, err = io.Copy(out, respBod y); err != nil {
392 glog.Error(err)
393 return
394 }
395 glog.Infof("Downloaded gs://%s/% s to %s with goroutine#%d", GSBucketName, result.Name, outputFile, goroutineNum)
396 // Sleep for a second after down loading file to avoid bombarding Cloud
397 // storage.
398 time.Sleep(time.Second)
399 mtx.Lock()
400 artifactToIndex[path.Join(localD ir, fileName)] = index
401 mtx.Unlock()
402 }
403 if len(resp.NextPageToken) > 0 {
404 req.PageToken(resp.NextPageToken )
405 } else {
406 req = nil
407 }
408 }
409 }
410 }(i + 1)
411 }
412 wg.Wait()
413 return artifactToIndex, nil
dogben 2016/05/18 15:22:01 nit: return error if chRemoteDirs is not empty?
rmistry 2016/05/19 11:54:11 Done.
414 }
415
322 // UploadDir uploads the specified local dir into the specified Google Storage d ir. 416 // UploadDir uploads the specified local dir into the specified Google Storage d ir.
323 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { 417 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error {
324 if cleanDir { 418 if cleanDir {
325 // Empty the remote dir before uploading to it. 419 // Empty the remote dir before uploading to it.
326 » » util.LogErr(gs.deleteRemoteDir(gsDir)) 420 » » util.LogErr(gs.DeleteRemoteDir(gsDir))
327 } 421 }
328 422
329 // Construct a dictionary of file paths to their file infos. 423 // Construct a dictionary of file paths to their file infos.
330 pathsToFileInfos := map[string]os.FileInfo{} 424 pathsToFileInfos := map[string]os.FileInfo{}
331 visit := func(path string, f os.FileInfo, err error) error { 425 visit := func(path string, f os.FileInfo, err error) error {
332 if f.IsDir() { 426 if f.IsDir() {
333 return nil 427 return nil
334 } 428 }
335 pathsToFileInfos[path] = f 429 pathsToFileInfos[path] = f
336 return nil 430 return nil
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
369 } 463 }
370 // Sleep for a second after uploading file to av oid bombarding Cloud 464 // Sleep for a second after uploading file to av oid bombarding Cloud
371 // storage. 465 // storage.
372 time.Sleep(time.Second) 466 time.Sleep(time.Second)
373 } 467 }
374 }(i + 1) 468 }(i + 1)
375 } 469 }
376 wg.Wait() 470 wg.Wait()
377 return nil 471 return nil
378 } 472 }
OLDNEW
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/util/gs_test.go » ('j') | ct/go/util/gs_test.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698