Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Google Storage utility that contains methods for both CT master and worker | 1 // Google Storage utility that contains methods for both CT master and worker |
| 2 // scripts. | 2 // scripts. |
| 3 package util | 3 package util |
| 4 | 4 |
| 5 import ( | 5 import ( |
| 6 "fmt" | 6 "fmt" |
| 7 "io" | 7 "io" |
| 8 "io/ioutil" | 8 "io/ioutil" |
| 9 "net/http" | 9 "net/http" |
| 10 "os" | 10 "os" |
| 11 "path" | |
| 11 "path/filepath" | 12 "path/filepath" |
| 13 "strconv" | |
| 12 "strings" | 14 "strings" |
| 13 "sync" | 15 "sync" |
| 14 "time" | 16 "time" |
| 15 | 17 |
| 16 "github.com/skia-dev/glog" | 18 "github.com/skia-dev/glog" |
| 17 "go.skia.org/infra/go/auth" | 19 "go.skia.org/infra/go/auth" |
| 18 "go.skia.org/infra/go/gs" | 20 "go.skia.org/infra/go/gs" |
| 19 "go.skia.org/infra/go/util" | 21 "go.skia.org/infra/go/util" |
| 20 googleapi "google.golang.org/api/googleapi" | 22 googleapi "google.golang.org/api/googleapi" |
| 21 storage "google.golang.org/api/storage/v1" | 23 storage "google.golang.org/api/storage/v1" |
| (...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 221 | 223 |
| 222 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 224 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
| 223 // No need to download artifacts they already exist locally. | 225 // No need to download artifacts they already exist locally. |
| 224 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) | 226 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) |
| 225 return nil | 227 return nil |
| 226 } | 228 } |
| 227 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) | 229 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) |
| 228 return gs.downloadRemoteDir(localDir, gsDir) | 230 return gs.downloadRemoteDir(localDir, gsDir) |
| 229 } | 231 } |
| 230 | 232 |
| 231 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { | 233 func (gs *GsUtil) DeleteRemoteDir(gsDir string) error { |
| 232 // The channel where the GS filepaths to be deleted will be sent to. | 234 // The channel where the GS filepaths to be deleted will be sent to. |
| 233 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) | 235 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) |
| 234 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") | 236 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") |
| 235 for req != nil { | 237 for req != nil { |
| 236 resp, err := req.Do() | 238 resp, err := req.Do() |
| 237 if err != nil { | 239 if err != nil { |
| 238 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) | 240 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
| 239 } | 241 } |
| 240 for _, result := range resp.Items { | 242 for _, result := range resp.Items { |
| 241 chFilePaths <- result.Name | 243 chFilePaths <- result.Name |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 312 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) | 314 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) |
| 313 | 315 |
| 314 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 316 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
| 315 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) | 317 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) |
| 316 return nil | 318 return nil |
| 317 } | 319 } |
| 318 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) | 320 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) |
| 319 return gs.UploadDir(localDir, gsDir, true) | 321 return gs.UploadDir(localDir, gsDir, true) |
| 320 } | 322 } |
| 321 | 323 |
| 324 // UploadSwarmingArtifact uploads the specified local artifacts to Google Storag e. | |
| 325 func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { | |
| 326 localDir := path.Join(StorageDir, dirName, pagesetType) | |
| 327 gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) | |
| 328 | |
| 329 return gs.UploadDir(localDir, gsDir, false) | |
| 330 } | |
| 331 | |
| 332 // DownloadSwarmingArtifacts downloads the specified artifacts from Google Stora ge to a local dir. | |
| 333 // The Google storage directory is assumed to have numerical subdirs Eg: {1..100 0}. This function | |
| 334 // downloads the contents of those directories into a local directory without th e numerical | |
|
dogben
2016/05/18 15:22:01
Aren't there files with the same name in multiple
rmistry
2016/05/19 11:54:10
Possible but unlikely. If they exist right now the
| |
| 335 // subdirs. | |
| 336 // Returns the ranking/index of the downloaded artifact. | |
| 337 func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) { | |
| 338 // Empty the local dir. | |
| 339 util.RemoveAll(localDir) | |
| 340 // Create the local dir. | |
| 341 util.MkdirAll(localDir, 0700) | |
| 342 | |
| 343 gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) | |
| 344 endRange := num + startRange - 1 | |
| 345 // The channel where remote files to be downloaded will be sent to. | |
| 346 chRemoteDirs := make(chan string, num) | |
| 347 for i := startRange; i <= endRange; i++ { | |
| 348 chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i)) | |
| 349 } | |
| 350 close(chRemoteDirs) | |
| 351 | |
| 352 // Dictionary of artifacts to its rank/index. | |
| 353 artifactToIndex := map[string]int{} | |
| 354 // Mutex to control access to the above dictionary. | |
| 355 var mtx sync.Mutex | |
| 356 // Kick off goroutines to download artifacts and populate the artifactTo Index dictionary. | |
| 357 var wg sync.WaitGroup | |
| 358 for i := 0; i < GOROUTINE_POOL_SIZE; i++ { | |
| 359 wg.Add(1) | |
| 360 go func(goroutineNum int) { | |
| 361 defer wg.Done() | |
| 362 for remoteDir := range chRemoteDirs { | |
| 363 | |
| 364 req := gs.service.Objects.List(GSBucketName).Pre fix(remoteDir + "/") | |
| 365 for req != nil { | |
| 366 resp, err := req.Do() | |
| 367 if err != nil { | |
| 368 glog.Errorf("Error occured while listing %s: %s", gsDir, err) | |
| 369 return | |
| 370 } | |
| 371 for _, result := range resp.Items { | |
| 372 fileName := filepath.Base(result .Name) | |
| 373 fileGsDir := filepath.Dir(result .Name) | |
| 374 index, err := strconv.Atoi(path. Base(fileGsDir)) | |
| 375 if err != nil { | |
| 376 glog.Errorf("%s was not in expected format: %s", fileGsDir, err) | |
| 377 } | |
|
dogben
2016/05/18 15:22:01
Missing return?
rmistry
2016/05/19 11:54:10
Oops. Done.
| |
| 378 respBody, err := getRespBody(res ult, gs.client) | |
| 379 if err != nil { | |
| 380 glog.Errorf("Could not f etch %s: %s", result.MediaLink, err) | |
| 381 return | |
| 382 } | |
| 383 defer util.Close(respBody) | |
|
dogben
2016/05/18 15:22:01
If there are many items in chRemoteDirs, deferring
rmistry
2016/05/19 11:54:10
Done.
| |
| 384 outputFile := filepath.Join(loca lDir, fileName) | |
| 385 out, err := os.Create(outputFile ) | |
| 386 if err != nil { | |
| 387 glog.Errorf("Unable to c reate file %s: %s", outputFile, err) | |
| 388 return | |
| 389 } | |
| 390 defer util.Close(out) | |
|
dogben
2016/05/18 15:22:01
nit: util.Close will ignore errors on closing the
rmistry
2016/05/19 11:54:11
Yea, I figured just logging an error was enough he
| |
| 391 if _, err = io.Copy(out, respBod y); err != nil { | |
| 392 glog.Error(err) | |
| 393 return | |
| 394 } | |
| 395 glog.Infof("Downloaded gs://%s/% s to %s with goroutine#%d", GSBucketName, result.Name, outputFile, goroutineNum) | |
| 396 // Sleep for a second after down loading file to avoid bombarding Cloud | |
| 397 // storage. | |
| 398 time.Sleep(time.Second) | |
| 399 mtx.Lock() | |
| 400 artifactToIndex[path.Join(localD ir, fileName)] = index | |
| 401 mtx.Unlock() | |
| 402 } | |
| 403 if len(resp.NextPageToken) > 0 { | |
| 404 req.PageToken(resp.NextPageToken ) | |
| 405 } else { | |
| 406 req = nil | |
| 407 } | |
| 408 } | |
| 409 } | |
| 410 }(i + 1) | |
| 411 } | |
| 412 wg.Wait() | |
| 413 return artifactToIndex, nil | |
|
dogben
2016/05/18 15:22:01
nit: return error if chRemoteDirs is not empty?
rmistry
2016/05/19 11:54:11
Done.
| |
| 414 } | |
| 415 | |
| 322 // UploadDir uploads the specified local dir into the specified Google Storage d ir. | 416 // UploadDir uploads the specified local dir into the specified Google Storage d ir. |
| 323 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { | 417 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
| 324 if cleanDir { | 418 if cleanDir { |
| 325 // Empty the remote dir before uploading to it. | 419 // Empty the remote dir before uploading to it. |
| 326 » » util.LogErr(gs.deleteRemoteDir(gsDir)) | 420 » » util.LogErr(gs.DeleteRemoteDir(gsDir)) |
| 327 } | 421 } |
| 328 | 422 |
| 329 // Construct a dictionary of file paths to their file infos. | 423 // Construct a dictionary of file paths to their file infos. |
| 330 pathsToFileInfos := map[string]os.FileInfo{} | 424 pathsToFileInfos := map[string]os.FileInfo{} |
| 331 visit := func(path string, f os.FileInfo, err error) error { | 425 visit := func(path string, f os.FileInfo, err error) error { |
| 332 if f.IsDir() { | 426 if f.IsDir() { |
| 333 return nil | 427 return nil |
| 334 } | 428 } |
| 335 pathsToFileInfos[path] = f | 429 pathsToFileInfos[path] = f |
| 336 return nil | 430 return nil |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 369 } | 463 } |
| 370 // Sleep for a second after uploading file to av oid bombarding Cloud | 464 // Sleep for a second after uploading file to av oid bombarding Cloud |
| 371 // storage. | 465 // storage. |
| 372 time.Sleep(time.Second) | 466 time.Sleep(time.Second) |
| 373 } | 467 } |
| 374 }(i + 1) | 468 }(i + 1) |
| 375 } | 469 } |
| 376 wg.Wait() | 470 wg.Wait() |
| 377 return nil | 471 return nil |
| 378 } | 472 } |
| OLD | NEW |