Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(405)

Unified Diff: ct/go/util/gs.go

Issue 1988103002: [CT] Add ability to download to/upload from swarming GS dir with numerical subdirs (Closed) Base URL: https://skia.googlesource.com/buildbot@ct-1-chromium_builds
Patch Set: Comment Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/util/gs_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ct/go/util/gs.go
diff --git a/ct/go/util/gs.go b/ct/go/util/gs.go
index 8f72e8bf74faa6d7ac101c73dea6651843bf0d59..a8e86314d9c4369909217cce355f96d815b9f769 100644
--- a/ct/go/util/gs.go
+++ b/ct/go/util/gs.go
@@ -8,7 +8,9 @@ import (
"io/ioutil"
"net/http"
"os"
+ "path"
"path/filepath"
+ "strconv"
"strings"
"sync"
"time"
@@ -228,7 +230,7 @@ func (gs *GsUtil) DownloadWorkerArtifacts(dirName, pagesetType string, workerNum
return gs.downloadRemoteDir(localDir, gsDir)
}
-func (gs *GsUtil) deleteRemoteDir(gsDir string) error {
+func (gs *GsUtil) DeleteRemoteDir(gsDir string) error {
// The channel where the GS filepaths to be deleted will be sent to.
chFilePaths := make(chan string, MAX_CHANNEL_SIZE)
req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/")
@@ -319,11 +321,109 @@ func (gs *GsUtil) UploadWorkerArtifacts(dirName, pagesetType string, workerNum i
return gs.UploadDir(localDir, gsDir, true)
}
+// UploadSwarmingArtifact uploads the specified local artifacts to Google Storage.
+func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error {
+ localDir := path.Join(StorageDir, dirName, pagesetType)
+ gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType)
+
+ return gs.UploadDir(localDir, gsDir, false)
+}
+
+// DownloadSwarmingArtifacts downloads the specified artifacts from Google Storage to a local dir.
+// The Google storage directory is assumed to have numerical subdirs Eg: {1..1000}. This function
+// downloads the contents of those directories into a local directory without the numerical
+// subdirs.
+// Returns the ranking/index of the downloaded artifact.
+func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) {
+ // Empty the local dir.
+ util.RemoveAll(localDir)
+ // Create the local dir.
+ util.MkdirAll(localDir, 0700)
+
+ gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType)
+ endRange := num + startRange - 1
+ // The channel where remote files to be downloaded will be sent to.
+ chRemoteDirs := make(chan string, num)
+ for i := startRange; i <= endRange; i++ {
+ chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i))
+ }
+ close(chRemoteDirs)
+
+ // Dictionary of artifacts to its rank/index.
+ artifactToIndex := map[string]int{}
+ // Mutex to control access to the above dictionary.
+ var mtx sync.Mutex
+ // Kick off goroutines to download artifacts and populate the artifactToIndex dictionary.
+ var wg sync.WaitGroup
+ for i := 0; i < GOROUTINE_POOL_SIZE; i++ {
+ wg.Add(1)
+ go func(goroutineNum int) {
+ defer wg.Done()
+ for remoteDir := range chRemoteDirs {
+ if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil {
+ glog.Error(err)
+ return
+ }
+ }
+ }(i + 1)
+ }
+ wg.Wait()
+ if len(chRemoteDirs) != 0 {
+ return artifactToIndex, fmt.Errorf("Unable to download all artifacts.")
+ }
+ return artifactToIndex, nil
+}
+
+func (gs *GsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, runID int, mtx *sync.Mutex, artifactToIndex map[string]int) error {
+ req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/")
+ for req != nil {
+ resp, err := req.Do()
+ if err != nil {
+ return fmt.Errorf("Error occured while listing %s: %s", gsDir, err)
+ }
+ for _, result := range resp.Items {
+ fileName := filepath.Base(result.Name)
+ fileGsDir := filepath.Dir(result.Name)
+ index, err := strconv.Atoi(path.Base(fileGsDir))
+ if err != nil {
+ return fmt.Errorf("%s was not in expected format: %s", fileGsDir, err)
+ }
+ respBody, err := getRespBody(result, gs.client)
+ if err != nil {
+ return fmt.Errorf("Could not fetch %s: %s", result.MediaLink, err)
+ }
+ defer util.Close(respBody)
+ outputFile := filepath.Join(localDir, fileName)
+ out, err := os.Create(outputFile)
+ if err != nil {
+ return fmt.Errorf("Unable to create file %s: %s", outputFile, err)
+ }
+ defer util.Close(out)
+ if _, err = io.Copy(out, respBody); err != nil {
+ return err
+ }
+ glog.Infof("Downloaded gs://%s/%s to %s with id#%d", GSBucketName, result.Name, outputFile, runID)
+ // Sleep for a second after downloading file to avoid bombarding Cloud
+ // storage.
+ time.Sleep(time.Second)
+ mtx.Lock()
+ artifactToIndex[path.Join(localDir, fileName)] = index
+ mtx.Unlock()
+ }
+ if len(resp.NextPageToken) > 0 {
+ req.PageToken(resp.NextPageToken)
+ } else {
+ req = nil
+ }
+ }
+ return nil
+}
+
// UploadDir uploads the specified local dir into the specified Google Storage dir.
func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error {
if cleanDir {
// Empty the remote dir before uploading to it.
- util.LogErr(gs.deleteRemoteDir(gsDir))
+ util.LogErr(gs.DeleteRemoteDir(gsDir))
}
// Construct a dictionary of file paths to their file infos.
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/util/gs_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698