Index: ct/go/util/gs.go |
diff --git a/ct/go/util/gs.go b/ct/go/util/gs.go |
index 8f72e8bf74faa6d7ac101c73dea6651843bf0d59..a8e86314d9c4369909217cce355f96d815b9f769 100644 |
--- a/ct/go/util/gs.go |
+++ b/ct/go/util/gs.go |
@@ -8,7 +8,9 @@ import ( |
"io/ioutil" |
"net/http" |
"os" |
+ "path" |
"path/filepath" |
+ "strconv" |
"strings" |
"sync" |
"time" |
@@ -228,7 +230,7 @@ func (gs *GsUtil) DownloadWorkerArtifacts(dirName, pagesetType string, workerNum |
return gs.downloadRemoteDir(localDir, gsDir) |
} |
-func (gs *GsUtil) deleteRemoteDir(gsDir string) error { |
+func (gs *GsUtil) DeleteRemoteDir(gsDir string) error { |
// The channel where the GS filepaths to be deleted will be sent to. |
chFilePaths := make(chan string, MAX_CHANNEL_SIZE) |
req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") |
@@ -319,11 +321,109 @@ func (gs *GsUtil) UploadWorkerArtifacts(dirName, pagesetType string, workerNum i |
return gs.UploadDir(localDir, gsDir, true) |
} |
+// UploadSwarmingArtifact uploads the specified local artifacts to Google Storage. |
+func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { |
+ localDir := path.Join(StorageDir, dirName, pagesetType) |
+ gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) |
+ |
+ return gs.UploadDir(localDir, gsDir, false) |
+} |
+ |
+// DownloadSwarmingArtifacts downloads the specified artifacts from Google Storage to a local dir. |
+// The Google storage directory is assumed to have numerical subdirs Eg: {1..1000}. This function |
+// downloads the contents of those directories into a local directory without the numerical |
+// subdirs. |
+// Returns the ranking/index of the downloaded artifact. |
+func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) { |
+ // Empty the local dir. |
+ util.RemoveAll(localDir) |
+ // Create the local dir. |
+ util.MkdirAll(localDir, 0700) |
+ |
+ gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) |
+ endRange := num + startRange - 1 |
+ // The channel where remote files to be downloaded will be sent to. |
+ chRemoteDirs := make(chan string, num) |
+ for i := startRange; i <= endRange; i++ { |
+ chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i)) |
+ } |
+ close(chRemoteDirs) |
+ |
+ // Dictionary of artifacts to its rank/index. |
+ artifactToIndex := map[string]int{} |
+ // Mutex to control access to the above dictionary. |
+ var mtx sync.Mutex |
+ // Kick off goroutines to download artifacts and populate the artifactToIndex dictionary. |
+ var wg sync.WaitGroup |
+ for i := 0; i < GOROUTINE_POOL_SIZE; i++ { |
+ wg.Add(1) |
+ go func(goroutineNum int) { |
+ defer wg.Done() |
+ for remoteDir := range chRemoteDirs { |
+ if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil { |
+ glog.Error(err) |
+ return |
+ } |
+ } |
+ }(i + 1) |
+ } |
+ wg.Wait() |
+ if len(chRemoteDirs) != 0 { |
+ return artifactToIndex, fmt.Errorf("Unable to download all artifacts.") |
+ } |
+ return artifactToIndex, nil |
+} |
+ |
+func (gs *GsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, runID int, mtx *sync.Mutex, artifactToIndex map[string]int) error { |
+ req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/") |
+ for req != nil { |
+ resp, err := req.Do() |
+ if err != nil { |
+ return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
+ } |
+ for _, result := range resp.Items { |
+ fileName := filepath.Base(result.Name) |
+ fileGsDir := filepath.Dir(result.Name) |
+ index, err := strconv.Atoi(path.Base(fileGsDir)) |
+ if err != nil { |
+ return fmt.Errorf("%s was not in expected format: %s", fileGsDir, err) |
+ } |
+ respBody, err := getRespBody(result, gs.client) |
+ if err != nil { |
+ return fmt.Errorf("Could not fetch %s: %s", result.MediaLink, err) |
+ } |
+ defer util.Close(respBody) |
+ outputFile := filepath.Join(localDir, fileName) |
+ out, err := os.Create(outputFile) |
+ if err != nil { |
+ return fmt.Errorf("Unable to create file %s: %s", outputFile, err) |
+ } |
+ defer util.Close(out) |
+ if _, err = io.Copy(out, respBody); err != nil { |
+ return err |
+ } |
+ glog.Infof("Downloaded gs://%s/%s to %s with id#%d", GSBucketName, result.Name, outputFile, runID) |
+ // Sleep for a second after downloading file to avoid bombarding Cloud |
+ // storage. |
+ time.Sleep(time.Second) |
+ mtx.Lock() |
+ artifactToIndex[path.Join(localDir, fileName)] = index |
+ mtx.Unlock() |
+ } |
+ if len(resp.NextPageToken) > 0 { |
+ req.PageToken(resp.NextPageToken) |
+ } else { |
+ req = nil |
+ } |
+ } |
+ return nil |
+} |
+ |
// UploadDir uploads the specified local dir into the specified Google Storage dir. |
func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
if cleanDir { |
// Empty the remote dir before uploading to it. |
- util.LogErr(gs.deleteRemoteDir(gsDir)) |
+ util.LogErr(gs.DeleteRemoteDir(gsDir)) |
} |
// Construct a dictionary of file paths to their file infos. |