Chromium Code Reviews| Index: ct/go/util/gs.go |
| diff --git a/ct/go/util/gs.go b/ct/go/util/gs.go |
| index 8f72e8bf74faa6d7ac101c73dea6651843bf0d59..5cc9f6555012b9f8cb83f8976a88954d885641db 100644 |
| --- a/ct/go/util/gs.go |
| +++ b/ct/go/util/gs.go |
| @@ -8,7 +8,9 @@ import ( |
| "io/ioutil" |
| "net/http" |
| "os" |
| + "path" |
| "path/filepath" |
| + "strconv" |
| "strings" |
| "sync" |
| "time" |
| @@ -228,7 +230,7 @@ func (gs *GsUtil) DownloadWorkerArtifacts(dirName, pagesetType string, workerNum |
| return gs.downloadRemoteDir(localDir, gsDir) |
| } |
| -func (gs *GsUtil) deleteRemoteDir(gsDir string) error { |
| +func (gs *GsUtil) DeleteRemoteDir(gsDir string) error { |
| // The channel where the GS filepaths to be deleted will be sent to. |
| chFilePaths := make(chan string, MAX_CHANNEL_SIZE) |
| req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") |
| @@ -319,11 +321,103 @@ func (gs *GsUtil) UploadWorkerArtifacts(dirName, pagesetType string, workerNum i |
| return gs.UploadDir(localDir, gsDir, true) |
| } |
| +// UploadSwarmingArtifact uploads the specified local artifacts to Google Storage. |
| +func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { |
| + localDir := path.Join(StorageDir, dirName, pagesetType) |
| + gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) |
| + |
| + return gs.UploadDir(localDir, gsDir, false) |
| +} |
| + |
| +// DownloadSwarmingArtifacts downloads the specified artifacts from Google Storage to a local dir. |
| +// The Google storage directory is assumed to have numerical subdirs Eg: {1..1000}. This function |
| +// downloads the contents of those directories into a local directory without the numerical |
|
dogben
2016/05/18 15:22:01
Aren't there files with the same name in multiple
rmistry
2016/05/19 11:54:10
Possible but unlikely. If they exist right now the
|
| +// subdirs. |
| +// Returns the ranking/index of the downloaded artifact. |
| +func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) { |
| + // Empty the local dir. |
| + util.RemoveAll(localDir) |
| + // Create the local dir. |
| + util.MkdirAll(localDir, 0700) |
| + |
| + gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) |
| + endRange := num + startRange - 1 |
| + // The channel where remote files to be downloaded will be sent to. |
| + chRemoteDirs := make(chan string, num) |
| + for i := startRange; i <= endRange; i++ { |
| + chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i)) |
| + } |
| + close(chRemoteDirs) |
| + |
| + // Dictionary of artifacts to its rank/index. |
| + artifactToIndex := map[string]int{} |
| + // Mutex to control access to the above dictionary. |
| + var mtx sync.Mutex |
| + // Kick off goroutines to download artifacts and populate the artifactToIndex dictionary. |
| + var wg sync.WaitGroup |
| + for i := 0; i < GOROUTINE_POOL_SIZE; i++ { |
| + wg.Add(1) |
| + go func(goroutineNum int) { |
| + defer wg.Done() |
| + for remoteDir := range chRemoteDirs { |
| + |
| + req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/") |
| + for req != nil { |
| + resp, err := req.Do() |
| + if err != nil { |
| + glog.Errorf("Error occured while listing %s: %s", gsDir, err) |
| + return |
| + } |
| + for _, result := range resp.Items { |
| + fileName := filepath.Base(result.Name) |
| + fileGsDir := filepath.Dir(result.Name) |
| + index, err := strconv.Atoi(path.Base(fileGsDir)) |
| + if err != nil { |
| + glog.Errorf("%s was not in expected format: %s", fileGsDir, err) |
| + } |
|
dogben
2016/05/18 15:22:01
Missing return?
rmistry
2016/05/19 11:54:10
Oops. Done.
|
| + respBody, err := getRespBody(result, gs.client) |
| + if err != nil { |
| + glog.Errorf("Could not fetch %s: %s", result.MediaLink, err) |
| + return |
| + } |
| + defer util.Close(respBody) |
|
dogben
2016/05/18 15:22:01
If there are many items in chRemoteDirs, deferring
rmistry
2016/05/19 11:54:10
Done.
|
| + outputFile := filepath.Join(localDir, fileName) |
| + out, err := os.Create(outputFile) |
| + if err != nil { |
| + glog.Errorf("Unable to create file %s: %s", outputFile, err) |
| + return |
| + } |
| + defer util.Close(out) |
|
dogben
2016/05/18 15:22:01
nit: util.Close will ignore errors on closing the
rmistry
2016/05/19 11:54:11
Yea, I figured just logging an error was enough he
|
| + if _, err = io.Copy(out, respBody); err != nil { |
| + glog.Error(err) |
| + return |
| + } |
| + glog.Infof("Downloaded gs://%s/%s to %s with goroutine#%d", GSBucketName, result.Name, outputFile, goroutineNum) |
| + // Sleep for a second after downloading file to avoid bombarding Cloud |
| + // storage. |
| + time.Sleep(time.Second) |
| + mtx.Lock() |
| + artifactToIndex[path.Join(localDir, fileName)] = index |
| + mtx.Unlock() |
| + } |
| + if len(resp.NextPageToken) > 0 { |
| + req.PageToken(resp.NextPageToken) |
| + } else { |
| + req = nil |
| + } |
| + } |
| + } |
| + }(i + 1) |
| + } |
| + wg.Wait() |
| + return artifactToIndex, nil |
|
dogben
2016/05/18 15:22:01
nit: return error if chRemoteDirs is not empty?
rmistry
2016/05/19 11:54:11
Done.
|
| +} |
| + |
| // UploadDir uploads the specified local dir into the specified Google Storage dir. |
| func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
| if cleanDir { |
| // Empty the remote dir before uploading to it. |
| - util.LogErr(gs.deleteRemoteDir(gsDir)) |
| + util.LogErr(gs.DeleteRemoteDir(gsDir)) |
| } |
| // Construct a dictionary of file paths to their file infos. |