OLD | NEW |
---|---|
1 // Google Storage utility that contains methods for both CT master and worker | 1 // Google Storage utility that contains methods for both CT master and worker |
2 // scripts. | 2 // scripts. |
3 package util | 3 package util |
4 | 4 |
5 import ( | 5 import ( |
6 "fmt" | 6 "fmt" |
7 "io" | 7 "io" |
8 "io/ioutil" | 8 "io/ioutil" |
9 "net/http" | 9 "net/http" |
10 "os" | 10 "os" |
11 "path" | |
11 "path/filepath" | 12 "path/filepath" |
13 "strconv" | |
12 "strings" | 14 "strings" |
13 "sync" | 15 "sync" |
14 "time" | 16 "time" |
15 | 17 |
16 "github.com/skia-dev/glog" | 18 "github.com/skia-dev/glog" |
17 "go.skia.org/infra/go/auth" | 19 "go.skia.org/infra/go/auth" |
18 "go.skia.org/infra/go/gs" | 20 "go.skia.org/infra/go/gs" |
19 "go.skia.org/infra/go/util" | 21 "go.skia.org/infra/go/util" |
20 googleapi "google.golang.org/api/googleapi" | 22 googleapi "google.golang.org/api/googleapi" |
21 storage "google.golang.org/api/storage/v1" | 23 storage "google.golang.org/api/storage/v1" |
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
221 | 223 |
222 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 224 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
223 // No need to download artifacts they already exist locally. | 225 // No need to download artifacts they already exist locally. |
224 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) | 226 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) |
225 return nil | 227 return nil |
226 } | 228 } |
227 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) | 229 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) |
228 return gs.downloadRemoteDir(localDir, gsDir) | 230 return gs.downloadRemoteDir(localDir, gsDir) |
229 } | 231 } |
230 | 232 |
231 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { | 233 func (gs *GsUtil) DeleteRemoteDir(gsDir string) error { |
232 // The channel where the GS filepaths to be deleted will be sent to. | 234 // The channel where the GS filepaths to be deleted will be sent to. |
233 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) | 235 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) |
234 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") | 236 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") |
235 for req != nil { | 237 for req != nil { |
236 resp, err := req.Do() | 238 resp, err := req.Do() |
237 if err != nil { | 239 if err != nil { |
238 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) | 240 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
239 } | 241 } |
240 for _, result := range resp.Items { | 242 for _, result := range resp.Items { |
241 chFilePaths <- result.Name | 243 chFilePaths <- result.Name |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
312 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) | 314 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) |
313 | 315 |
314 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 316 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
315 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) | 317 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) |
316 return nil | 318 return nil |
317 } | 319 } |
318 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) | 320 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) |
319 return gs.UploadDir(localDir, gsDir, true) | 321 return gs.UploadDir(localDir, gsDir, true) |
320 } | 322 } |
321 | 323 |
324 // UploadSwarmingArtifact uploads the specified local artifacts to Google Storag e. | |
325 func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { | |
326 localDir := path.Join(StorageDir, dirName, pagesetType) | |
327 gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) | |
328 | |
329 return gs.UploadDir(localDir, gsDir, false) | |
330 } | |
331 | |
332 // DownloadSwarmingArtifacts downloads the specified artifacts from Google Stora ge to a local dir. | |
333 // The Google storage directory is assumed to have numerical subdirs Eg: {1..100 0}. This function | |
334 // downloads the contents of those directories into a local directory without th e numerical | |
335 // subdirs. | |
336 // Returns the ranking/index of the downloaded artifact. | |
337 func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) { | |
338 // Empty the local dir. | |
339 util.RemoveAll(localDir) | |
340 // Create the local dir. | |
341 util.MkdirAll(localDir, 0700) | |
342 | |
343 gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) | |
344 endRange := num + startRange - 1 | |
345 // The channel where remote files to be downloaded will be sent to. | |
346 chRemoteDirs := make(chan string, num) | |
347 for i := startRange; i <= endRange; i++ { | |
348 chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i)) | |
349 } | |
350 close(chRemoteDirs) | |
351 | |
352 // Dictionary of artifacts to its rank/index. | |
353 artifactToIndex := map[string]int{} | |
354 // Mutex to control access to the above dictionary. | |
355 var mtx sync.Mutex | |
356 // Kick off goroutines to download artifacts and populate the artifactTo Index dictionary. | |
357 var wg sync.WaitGroup | |
358 for i := 0; i < GOROUTINE_POOL_SIZE; i++ { | |
359 wg.Add(1) | |
360 go func(goroutineNum int) { | |
361 defer wg.Done() | |
362 for remoteDir := range chRemoteDirs { | |
363 if err := gs.downloadFromSwarmingDir(remoteDir, gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil { | |
364 glog.Error(err) | |
365 return | |
366 } | |
367 } | |
368 }(i + 1) | |
369 } | |
370 wg.Wait() | |
371 if len(chRemoteDirs) != 0 { | |
372 return artifactToIndex, fmt.Errorf("The chRemoteDirs channel was expected to be empty!") | |
dogben
2016/05/19 14:18:20
uber-nit: Maybe just "Unable to download all artif
rmistry
2016/05/19 14:25:08
Done.
| |
373 } | |
374 return artifactToIndex, nil | |
375 } | |
376 | |
377 func (gs *GsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, run ID int, mtx *sync.Mutex, artifactToIndex map[string]int) error { | |
378 req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/") | |
379 for req != nil { | |
380 resp, err := req.Do() | |
381 if err != nil { | |
382 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) | |
383 } | |
384 for _, result := range resp.Items { | |
385 fileName := filepath.Base(result.Name) | |
386 fileGsDir := filepath.Dir(result.Name) | |
387 index, err := strconv.Atoi(path.Base(fileGsDir)) | |
388 if err != nil { | |
389 return fmt.Errorf("%s was not in expected format : %s", fileGsDir, err) | |
390 } | |
391 respBody, err := getRespBody(result, gs.client) | |
392 if err != nil { | |
393 return fmt.Errorf("Could not fetch %s: %s", resu lt.MediaLink, err) | |
394 } | |
395 defer util.Close(respBody) | |
396 outputFile := filepath.Join(localDir, fileName) | |
397 out, err := os.Create(outputFile) | |
398 if err != nil { | |
399 return fmt.Errorf("Unable to create file %s: %s" , outputFile, err) | |
400 } | |
401 defer util.Close(out) | |
402 if _, err = io.Copy(out, respBody); err != nil { | |
403 return err | |
404 } | |
405 glog.Infof("Downloaded gs://%s/%s to %s with id#%d", GSB ucketName, result.Name, outputFile, runID) | |
406 // Sleep for a second after downloading file to avoid bo mbarding Cloud | |
407 // storage. | |
408 time.Sleep(time.Second) | |
409 mtx.Lock() | |
410 artifactToIndex[path.Join(localDir, fileName)] = index | |
411 mtx.Unlock() | |
412 } | |
413 if len(resp.NextPageToken) > 0 { | |
414 req.PageToken(resp.NextPageToken) | |
415 } else { | |
416 req = nil | |
417 } | |
418 } | |
419 return nil | |
420 } | |
421 | |
322 // UploadDir uploads the specified local dir into the specified Google Storage d ir. | 422 // UploadDir uploads the specified local dir into the specified Google Storage d ir. |
323 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { | 423 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
324 if cleanDir { | 424 if cleanDir { |
325 // Empty the remote dir before uploading to it. | 425 // Empty the remote dir before uploading to it. |
326 » » util.LogErr(gs.deleteRemoteDir(gsDir)) | 426 » » util.LogErr(gs.DeleteRemoteDir(gsDir)) |
327 } | 427 } |
328 | 428 |
329 // Construct a dictionary of file paths to their file infos. | 429 // Construct a dictionary of file paths to their file infos. |
330 pathsToFileInfos := map[string]os.FileInfo{} | 430 pathsToFileInfos := map[string]os.FileInfo{} |
331 visit := func(path string, f os.FileInfo, err error) error { | 431 visit := func(path string, f os.FileInfo, err error) error { |
332 if f.IsDir() { | 432 if f.IsDir() { |
333 return nil | 433 return nil |
334 } | 434 } |
335 pathsToFileInfos[path] = f | 435 pathsToFileInfos[path] = f |
336 return nil | 436 return nil |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
369 } | 469 } |
370 // Sleep for a second after uploading file to av oid bombarding Cloud | 470 // Sleep for a second after uploading file to av oid bombarding Cloud |
371 // storage. | 471 // storage. |
372 time.Sleep(time.Second) | 472 time.Sleep(time.Second) |
373 } | 473 } |
374 }(i + 1) | 474 }(i + 1) |
375 } | 475 } |
376 wg.Wait() | 476 wg.Wait() |
377 return nil | 477 return nil |
378 } | 478 } |
OLD | NEW |