Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Google Storage utility that contains methods for both CT master and worker | 1 // Google Storage utility that contains methods for both CT master and worker |
| 2 // scripts. | 2 // scripts. |
| 3 package util | 3 package util |
| 4 | 4 |
| 5 import ( | 5 import ( |
| 6 "fmt" | 6 "fmt" |
| 7 "io" | 7 "io" |
| 8 "io/ioutil" | 8 "io/ioutil" |
| 9 "net/http" | 9 "net/http" |
| 10 "os" | 10 "os" |
| 11 "path" | |
| 11 "path/filepath" | 12 "path/filepath" |
| 13 "strconv" | |
| 12 "strings" | 14 "strings" |
| 13 "sync" | 15 "sync" |
| 14 "time" | 16 "time" |
| 15 | 17 |
| 16 "github.com/skia-dev/glog" | 18 "github.com/skia-dev/glog" |
| 17 "go.skia.org/infra/go/auth" | 19 "go.skia.org/infra/go/auth" |
| 18 "go.skia.org/infra/go/gs" | 20 "go.skia.org/infra/go/gs" |
| 19 "go.skia.org/infra/go/util" | 21 "go.skia.org/infra/go/util" |
| 20 googleapi "google.golang.org/api/googleapi" | 22 googleapi "google.golang.org/api/googleapi" |
| 21 storage "google.golang.org/api/storage/v1" | 23 storage "google.golang.org/api/storage/v1" |
| (...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 221 | 223 |
| 222 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 224 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
| 223 // No need to download artifacts they already exist locally. | 225 // No need to download artifacts they already exist locally. |
| 224 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) | 226 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) |
| 225 return nil | 227 return nil |
| 226 } | 228 } |
| 227 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) | 229 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) |
| 228 return gs.downloadRemoteDir(localDir, gsDir) | 230 return gs.downloadRemoteDir(localDir, gsDir) |
| 229 } | 231 } |
| 230 | 232 |
| 231 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { | 233 func (gs *GsUtil) DeleteRemoteDir(gsDir string) error { |
| 232 // The channel where the GS filepaths to be deleted will be sent to. | 234 // The channel where the GS filepaths to be deleted will be sent to. |
| 233 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) | 235 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) |
| 234 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") | 236 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") |
| 235 for req != nil { | 237 for req != nil { |
| 236 resp, err := req.Do() | 238 resp, err := req.Do() |
| 237 if err != nil { | 239 if err != nil { |
| 238 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) | 240 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
| 239 } | 241 } |
| 240 for _, result := range resp.Items { | 242 for _, result := range resp.Items { |
| 241 chFilePaths <- result.Name | 243 chFilePaths <- result.Name |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 312 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) | 314 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) |
| 313 | 315 |
| 314 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 316 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
| 315 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) | 317 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) |
| 316 return nil | 318 return nil |
| 317 } | 319 } |
| 318 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) | 320 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) |
| 319 return gs.UploadDir(localDir, gsDir, true) | 321 return gs.UploadDir(localDir, gsDir, true) |
| 320 } | 322 } |
| 321 | 323 |
| 324 // UploadSwarmingArtifact uploads the specified local artifacts to Google Storag e. | |
| 325 func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { | |
| 326 localDir := path.Join(StorageDir, dirName, pagesetType) | |
| 327 gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) | |
| 328 | |
| 329 return gs.UploadDir(localDir, gsDir, false) | |
| 330 } | |
| 331 | |
| 332 // DownloadSwarmingArtifacts downloads the specified artifacts from Google Stora ge to a local dir. | |
| 333 // The Google storage directory is assumed to have numerical subdirs Eg: {1..100 0}. This function | |
| 334 // downloads the contents of those directories into a local directory without th e numerical | |
| 335 // subdirs. | |
| 336 // Returns the ranking/index of the downloaded artifact. | |
| 337 func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) { | |
| 338 // Empty the local dir. | |
| 339 util.RemoveAll(localDir) | |
| 340 // Create the local dir. | |
| 341 util.MkdirAll(localDir, 0700) | |
| 342 | |
| 343 gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) | |
| 344 endRange := num + startRange - 1 | |
| 345 // The channel where remote files to be downloaded will be sent to. | |
| 346 chRemoteDirs := make(chan string, num) | |
| 347 for i := startRange; i <= endRange; i++ { | |
| 348 chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i)) | |
| 349 } | |
| 350 close(chRemoteDirs) | |
| 351 | |
| 352 // Dictionary of artifacts to its rank/index. | |
| 353 artifactToIndex := map[string]int{} | |
| 354 // Mutex to control access to the above dictionary. | |
| 355 var mtx sync.Mutex | |
| 356 // Kick off goroutines to download artifacts and populate the artifactTo Index dictionary. | |
| 357 var wg sync.WaitGroup | |
| 358 for i := 0; i < GOROUTINE_POOL_SIZE; i++ { | |
| 359 wg.Add(1) | |
| 360 go func(goroutineNum int) { | |
| 361 defer wg.Done() | |
| 362 for remoteDir := range chRemoteDirs { | |
| 363 if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil { | |
| 364 glog.Error(err) | |
| 365 return | |
| 366 } | |
| 367 } | |
| 368 }(i + 1) | |
| 369 } | |
| 370 wg.Wait() | |
| 371 if len(chRemoteDirs) != 0 { | |
| 372 return artifactToIndex, fmt.Errorf("The chRemoteDirs channel was expected to be empty!") | |
|
dogben
2016/05/19 14:18:20
uber-nit: Maybe just "Unable to download all artif
rmistry
2016/05/19 14:25:08
Done.
| |
| 373 } | |
| 374 return artifactToIndex, nil | |
| 375 } | |
| 376 | |
| 377 func (gs *GsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, run ID int, mtx *sync.Mutex, artifactToIndex map[string]int) error { | |
| 378 req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/") | |
| 379 for req != nil { | |
| 380 resp, err := req.Do() | |
| 381 if err != nil { | |
| 382 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) | |
| 383 } | |
| 384 for _, result := range resp.Items { | |
| 385 fileName := filepath.Base(result.Name) | |
| 386 fileGsDir := filepath.Dir(result.Name) | |
| 387 index, err := strconv.Atoi(path.Base(fileGsDir)) | |
| 388 if err != nil { | |
| 389 return fmt.Errorf("%s was not in expected format : %s", fileGsDir, err) | |
| 390 } | |
| 391 respBody, err := getRespBody(result, gs.client) | |
| 392 if err != nil { | |
| 393 return fmt.Errorf("Could not fetch %s: %s", resu lt.MediaLink, err) | |
| 394 } | |
| 395 defer util.Close(respBody) | |
| 396 outputFile := filepath.Join(localDir, fileName) | |
| 397 out, err := os.Create(outputFile) | |
| 398 if err != nil { | |
| 399 return fmt.Errorf("Unable to create file %s: %s" , outputFile, err) | |
| 400 } | |
| 401 defer util.Close(out) | |
| 402 if _, err = io.Copy(out, respBody); err != nil { | |
| 403 return err | |
| 404 } | |
| 405 glog.Infof("Downloaded gs://%s/%s to %s with id#%d", GSB ucketName, result.Name, outputFile, runID) | |
| 406 // Sleep for a second after downloading file to avoid bo mbarding Cloud | |
| 407 // storage. | |
| 408 time.Sleep(time.Second) | |
| 409 mtx.Lock() | |
| 410 artifactToIndex[path.Join(localDir, fileName)] = index | |
| 411 mtx.Unlock() | |
| 412 } | |
| 413 if len(resp.NextPageToken) > 0 { | |
| 414 req.PageToken(resp.NextPageToken) | |
| 415 } else { | |
| 416 req = nil | |
| 417 } | |
| 418 } | |
| 419 return nil | |
| 420 } | |
| 421 | |
| 322 // UploadDir uploads the specified local dir into the specified Google Storage d ir. | 422 // UploadDir uploads the specified local dir into the specified Google Storage d ir. |
| 323 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { | 423 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
| 324 if cleanDir { | 424 if cleanDir { |
| 325 // Empty the remote dir before uploading to it. | 425 // Empty the remote dir before uploading to it. |
| 326 » » util.LogErr(gs.deleteRemoteDir(gsDir)) | 426 » » util.LogErr(gs.DeleteRemoteDir(gsDir)) |
| 327 } | 427 } |
| 328 | 428 |
| 329 // Construct a dictionary of file paths to their file infos. | 429 // Construct a dictionary of file paths to their file infos. |
| 330 pathsToFileInfos := map[string]os.FileInfo{} | 430 pathsToFileInfos := map[string]os.FileInfo{} |
| 331 visit := func(path string, f os.FileInfo, err error) error { | 431 visit := func(path string, f os.FileInfo, err error) error { |
| 332 if f.IsDir() { | 432 if f.IsDir() { |
| 333 return nil | 433 return nil |
| 334 } | 434 } |
| 335 pathsToFileInfos[path] = f | 435 pathsToFileInfos[path] = f |
| 336 return nil | 436 return nil |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 369 } | 469 } |
| 370 // Sleep for a second after uploading file to av oid bombarding Cloud | 470 // Sleep for a second after uploading file to av oid bombarding Cloud |
| 371 // storage. | 471 // storage. |
| 372 time.Sleep(time.Second) | 472 time.Sleep(time.Second) |
| 373 } | 473 } |
| 374 }(i + 1) | 474 }(i + 1) |
| 375 } | 475 } |
| 376 wg.Wait() | 476 wg.Wait() |
| 377 return nil | 477 return nil |
| 378 } | 478 } |
| OLD | NEW |