| Index: ct/go/util/gs.go
|
| diff --git a/ct/go/util/gs.go b/ct/go/util/gs.go
|
| index 8f72e8bf74faa6d7ac101c73dea6651843bf0d59..a8e86314d9c4369909217cce355f96d815b9f769 100644
|
| --- a/ct/go/util/gs.go
|
| +++ b/ct/go/util/gs.go
|
| @@ -8,7 +8,9 @@ import (
|
| "io/ioutil"
|
| "net/http"
|
| "os"
|
| + "path"
|
| "path/filepath"
|
| + "strconv"
|
| "strings"
|
| "sync"
|
| "time"
|
| @@ -228,7 +230,7 @@ func (gs *GsUtil) DownloadWorkerArtifacts(dirName, pagesetType string, workerNum
|
| return gs.downloadRemoteDir(localDir, gsDir)
|
| }
|
|
|
| -func (gs *GsUtil) deleteRemoteDir(gsDir string) error {
|
| +func (gs *GsUtil) DeleteRemoteDir(gsDir string) error {
|
| // The channel where the GS filepaths to be deleted will be sent to.
|
| chFilePaths := make(chan string, MAX_CHANNEL_SIZE)
|
| req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/")
|
| @@ -319,11 +321,109 @@ func (gs *GsUtil) UploadWorkerArtifacts(dirName, pagesetType string, workerNum i
|
| return gs.UploadDir(localDir, gsDir, true)
|
| }
|
|
|
| +// UploadSwarmingArtifact uploads the specified local artifacts to Google Storage.
|
| +func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error {
|
| + localDir := path.Join(StorageDir, dirName, pagesetType)
|
| + gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType)
|
| +
|
| + return gs.UploadDir(localDir, gsDir, false)
|
| +}
|
| +
|
| +// DownloadSwarmingArtifacts downloads the specified artifacts from Google Storage to a local dir.
|
| +// The Google storage directory is assumed to have numerical subdirs Eg: {1..1000}. This function
|
| +// downloads the contents of those directories into a local directory without the numerical
|
| +// subdirs.
|
| +// Returns the ranking/index of the downloaded artifact.
|
| +func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) {
|
| + // Empty the local dir.
|
| + util.RemoveAll(localDir)
|
| + // Create the local dir.
|
| + util.MkdirAll(localDir, 0700)
|
| +
|
| + gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType)
|
| + endRange := num + startRange - 1
|
| + // The channel where remote files to be downloaded will be sent to.
|
| + chRemoteDirs := make(chan string, num)
|
| + for i := startRange; i <= endRange; i++ {
|
| + chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i))
|
| + }
|
| + close(chRemoteDirs)
|
| +
|
| + // Dictionary of artifacts to its rank/index.
|
| + artifactToIndex := map[string]int{}
|
| + // Mutex to control access to the above dictionary.
|
| + var mtx sync.Mutex
|
| + // Kick off goroutines to download artifacts and populate the artifactToIndex dictionary.
|
| + var wg sync.WaitGroup
|
| + for i := 0; i < GOROUTINE_POOL_SIZE; i++ {
|
| + wg.Add(1)
|
| + go func(goroutineNum int) {
|
| + defer wg.Done()
|
| + for remoteDir := range chRemoteDirs {
|
| + if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil {
|
| + glog.Error(err)
|
| + return
|
| + }
|
| + }
|
| + }(i + 1)
|
| + }
|
| + wg.Wait()
|
| + if len(chRemoteDirs) != 0 {
|
| + return artifactToIndex, fmt.Errorf("Unable to download all artifacts.")
|
| + }
|
| + return artifactToIndex, nil
|
| +}
|
| +
|
| +func (gs *GsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, runID int, mtx *sync.Mutex, artifactToIndex map[string]int) error {
|
| + req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/")
|
| + for req != nil {
|
| + resp, err := req.Do()
|
| + if err != nil {
|
| + return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
|
| + }
|
| + for _, result := range resp.Items {
|
| + fileName := filepath.Base(result.Name)
|
| + fileGsDir := filepath.Dir(result.Name)
|
| + index, err := strconv.Atoi(path.Base(fileGsDir))
|
| + if err != nil {
|
| + return fmt.Errorf("%s was not in expected format: %s", fileGsDir, err)
|
| + }
|
| + respBody, err := getRespBody(result, gs.client)
|
| + if err != nil {
|
| + return fmt.Errorf("Could not fetch %s: %s", result.MediaLink, err)
|
| + }
|
| + defer util.Close(respBody)
|
| + outputFile := filepath.Join(localDir, fileName)
|
| + out, err := os.Create(outputFile)
|
| + if err != nil {
|
| + return fmt.Errorf("Unable to create file %s: %s", outputFile, err)
|
| + }
|
| + defer util.Close(out)
|
| + if _, err = io.Copy(out, respBody); err != nil {
|
| + return err
|
| + }
|
| + glog.Infof("Downloaded gs://%s/%s to %s with id#%d", GSBucketName, result.Name, outputFile, runID)
|
| + // Sleep for a second after downloading file to avoid bombarding Cloud
|
| + // storage.
|
| + time.Sleep(time.Second)
|
| + mtx.Lock()
|
| + artifactToIndex[path.Join(localDir, fileName)] = index
|
| + mtx.Unlock()
|
| + }
|
| + if len(resp.NextPageToken) > 0 {
|
| + req.PageToken(resp.NextPageToken)
|
| + } else {
|
| + req = nil
|
| + }
|
| + }
|
| + return nil
|
| +}
|
| +
|
| // UploadDir uploads the specified local dir into the specified Google Storage dir.
|
| func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error {
|
| if cleanDir {
|
| // Empty the remote dir before uploading to it.
|
| - util.LogErr(gs.deleteRemoteDir(gsDir))
|
| + util.LogErr(gs.DeleteRemoteDir(gsDir))
|
| }
|
|
|
| // Construct a dictionary of file paths to their file infos.
|
|
|