OLD | NEW |
---|---|
1 // Google Storage utility that contains methods for both CT master and worker | 1 // Google Storage utility that contains methods for both CT master and worker |
2 // scripts. | 2 // scripts. |
3 package util | 3 package util |
4 | 4 |
5 import ( | 5 import ( |
6 "fmt" | 6 "fmt" |
7 "io" | 7 "io" |
8 "io/ioutil" | 8 "io/ioutil" |
9 "net/http" | 9 "net/http" |
10 "os" | 10 "os" |
11 "path" | |
11 "path/filepath" | 12 "path/filepath" |
13 "strconv" | |
12 "strings" | 14 "strings" |
13 "sync" | 15 "sync" |
14 "time" | 16 "time" |
15 | 17 |
16 "github.com/skia-dev/glog" | 18 "github.com/skia-dev/glog" |
17 "go.skia.org/infra/go/auth" | 19 "go.skia.org/infra/go/auth" |
18 "go.skia.org/infra/go/gs" | 20 "go.skia.org/infra/go/gs" |
19 "go.skia.org/infra/go/util" | 21 "go.skia.org/infra/go/util" |
20 googleapi "google.golang.org/api/googleapi" | 22 googleapi "google.golang.org/api/googleapi" |
21 storage "google.golang.org/api/storage/v1" | 23 storage "google.golang.org/api/storage/v1" |
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
221 | 223 |
222 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 224 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
223 // No need to download artifacts they already exist locally. | 225 // No need to download artifacts they already exist locally. |
224 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) | 226 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) |
225 return nil | 227 return nil |
226 } | 228 } |
227 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) | 229 glog.Infof("Timestamps between %s and %s are different. Downloading from Google Storage", localDir, gsDir) |
228 return gs.downloadRemoteDir(localDir, gsDir) | 230 return gs.downloadRemoteDir(localDir, gsDir) |
229 } | 231 } |
230 | 232 |
231 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { | 233 func (gs *GsUtil) DeleteRemoteDir(gsDir string) error { |
232 // The channel where the GS filepaths to be deleted will be sent to. | 234 // The channel where the GS filepaths to be deleted will be sent to. |
233 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) | 235 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) |
234 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") | 236 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") |
235 for req != nil { | 237 for req != nil { |
236 resp, err := req.Do() | 238 resp, err := req.Do() |
237 if err != nil { | 239 if err != nil { |
238 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) | 240 return fmt.Errorf("Error occured while listing %s: %s", gsDir, err) |
239 } | 241 } |
240 for _, result := range resp.Items { | 242 for _, result := range resp.Items { |
241 chFilePaths <- result.Name | 243 chFilePaths <- result.Name |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
312 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) | 314 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work erNum)) |
313 | 315 |
314 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 316 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
315 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) | 317 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir ) |
316 return nil | 318 return nil |
317 } | 319 } |
318 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) | 320 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo gle Storage", localDir, gsDir) |
319 return gs.UploadDir(localDir, gsDir, true) | 321 return gs.UploadDir(localDir, gsDir, true) |
320 } | 322 } |
321 | 323 |
324 // UploadSwarmingArtifact uploads the specified local artifacts to Google Storag e. | |
325 func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { | |
326 localDir := path.Join(StorageDir, dirName, pagesetType) | |
327 gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) | |
328 | |
329 return gs.UploadDir(localDir, gsDir, false) | |
330 } | |
331 | |
332 // DownloadSwarmingArtifacts downloads the specified artifacts from Google Stora ge to a local dir. | |
333 // The Google storage directory is assumed to have numerical subdirs Eg: {1..100 0}. This function | |
334 // downloads the contents of those directories into a local directory without th e numerical | |
dogben
2016/05/18 15:22:01
Aren't there files with the same name in multiple
rmistry
2016/05/19 11:54:10
Possible but unlikely. If they exist right now the
| |
335 // subdirs. | |
336 // Returns the ranking/index of the downloaded artifact. | |
337 func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType string, startRange, num int) (map[string]int, error) { | |
338 // Empty the local dir. | |
339 util.RemoveAll(localDir) | |
340 // Create the local dir. | |
341 util.MkdirAll(localDir, 0700) | |
342 | |
343 gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) | |
344 endRange := num + startRange - 1 | |
345 // The channel where remote files to be downloaded will be sent to. | |
346 chRemoteDirs := make(chan string, num) | |
347 for i := startRange; i <= endRange; i++ { | |
348 chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i)) | |
349 } | |
350 close(chRemoteDirs) | |
351 | |
352 // Dictionary of artifacts to its rank/index. | |
353 artifactToIndex := map[string]int{} | |
354 // Mutex to control access to the above dictionary. | |
355 var mtx sync.Mutex | |
356 // Kick off goroutines to download artifacts and populate the artifactTo Index dictionary. | |
357 var wg sync.WaitGroup | |
358 for i := 0; i < GOROUTINE_POOL_SIZE; i++ { | |
359 wg.Add(1) | |
360 go func(goroutineNum int) { | |
361 defer wg.Done() | |
362 for remoteDir := range chRemoteDirs { | |
363 | |
364 req := gs.service.Objects.List(GSBucketName).Pre fix(remoteDir + "/") | |
365 for req != nil { | |
366 resp, err := req.Do() | |
367 if err != nil { | |
368 glog.Errorf("Error occured while listing %s: %s", gsDir, err) | |
369 return | |
370 } | |
371 for _, result := range resp.Items { | |
372 fileName := filepath.Base(result .Name) | |
373 fileGsDir := filepath.Dir(result .Name) | |
374 index, err := strconv.Atoi(path. Base(fileGsDir)) | |
375 if err != nil { | |
376 glog.Errorf("%s was not in expected format: %s", fileGsDir, err) | |
377 } | |
dogben
2016/05/18 15:22:01
Missing return?
rmistry
2016/05/19 11:54:10
Oops. Done.
| |
378 respBody, err := getRespBody(res ult, gs.client) | |
379 if err != nil { | |
380 glog.Errorf("Could not f etch %s: %s", result.MediaLink, err) | |
381 return | |
382 } | |
383 defer util.Close(respBody) | |
dogben
2016/05/18 15:22:01
If there are many items in chRemoteDirs, deferring
rmistry
2016/05/19 11:54:10
Done.
| |
384 outputFile := filepath.Join(loca lDir, fileName) | |
385 out, err := os.Create(outputFile ) | |
386 if err != nil { | |
387 glog.Errorf("Unable to c reate file %s: %s", outputFile, err) | |
388 return | |
389 } | |
390 defer util.Close(out) | |
dogben
2016/05/18 15:22:01
nit: util.Close will ignore errors on closing the
rmistry
2016/05/19 11:54:11
Yea, I figured just logging an error was enough he
| |
391 if _, err = io.Copy(out, respBod y); err != nil { | |
392 glog.Error(err) | |
393 return | |
394 } | |
395 glog.Infof("Downloaded gs://%s/% s to %s with goroutine#%d", GSBucketName, result.Name, outputFile, goroutineNum) | |
396 // Sleep for a second after down loading file to avoid bombarding Cloud | |
397 // storage. | |
398 time.Sleep(time.Second) | |
399 mtx.Lock() | |
400 artifactToIndex[path.Join(localD ir, fileName)] = index | |
401 mtx.Unlock() | |
402 } | |
403 if len(resp.NextPageToken) > 0 { | |
404 req.PageToken(resp.NextPageToken ) | |
405 } else { | |
406 req = nil | |
407 } | |
408 } | |
409 } | |
410 }(i + 1) | |
411 } | |
412 wg.Wait() | |
413 return artifactToIndex, nil | |
dogben
2016/05/18 15:22:01
nit: return error if chRemoteDirs is not empty?
rmistry
2016/05/19 11:54:11
Done.
| |
414 } | |
415 | |
322 // UploadDir uploads the specified local dir into the specified Google Storage d ir. | 416 // UploadDir uploads the specified local dir into the specified Google Storage d ir. |
323 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { | 417 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
324 if cleanDir { | 418 if cleanDir { |
325 // Empty the remote dir before uploading to it. | 419 // Empty the remote dir before uploading to it. |
326 » » util.LogErr(gs.deleteRemoteDir(gsDir)) | 420 » » util.LogErr(gs.DeleteRemoteDir(gsDir)) |
327 } | 421 } |
328 | 422 |
329 // Construct a dictionary of file paths to their file infos. | 423 // Construct a dictionary of file paths to their file infos. |
330 pathsToFileInfos := map[string]os.FileInfo{} | 424 pathsToFileInfos := map[string]os.FileInfo{} |
331 visit := func(path string, f os.FileInfo, err error) error { | 425 visit := func(path string, f os.FileInfo, err error) error { |
332 if f.IsDir() { | 426 if f.IsDir() { |
333 return nil | 427 return nil |
334 } | 428 } |
335 pathsToFileInfos[path] = f | 429 pathsToFileInfos[path] = f |
336 return nil | 430 return nil |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
369 } | 463 } |
370 // Sleep for a second after uploading file to av oid bombarding Cloud | 464 // Sleep for a second after uploading file to av oid bombarding Cloud |
371 // storage. | 465 // storage. |
372 time.Sleep(time.Second) | 466 time.Sleep(time.Second) |
373 } | 467 } |
374 }(i + 1) | 468 }(i + 1) |
375 } | 469 } |
376 wg.Wait() | 470 wg.Wait() |
377 return nil | 471 return nil |
378 } | 472 } |
OLD | NEW |