OLD | NEW |
1 // Google Storage utility that contains methods for both CT master and worker | 1 // Google Storage utility that contains methods for both CT master and worker |
2 // scripts. | 2 // scripts. |
3 package util | 3 package util |
4 | 4 |
5 import ( | 5 import ( |
6 "fmt" | 6 "fmt" |
7 "io" | 7 "io" |
8 "io/ioutil" | 8 "io/ioutil" |
9 "net/http" | 9 "net/http" |
10 "os" | 10 "os" |
| 11 "path" |
11 "path/filepath" | 12 "path/filepath" |
| 13 "strconv" |
12 "strings" | 14 "strings" |
13 "sync" | 15 "sync" |
14 "time" | 16 "time" |
15 | 17 |
16 "github.com/skia-dev/glog" | 18 "github.com/skia-dev/glog" |
17 "go.skia.org/infra/go/auth" | 19 "go.skia.org/infra/go/auth" |
18 "go.skia.org/infra/go/gs" | 20 "go.skia.org/infra/go/gs" |
19 "go.skia.org/infra/go/util" | 21 "go.skia.org/infra/go/util" |
20 googleapi "google.golang.org/api/googleapi" | 22 googleapi "google.golang.org/api/googleapi" |
21 storage "google.golang.org/api/storage/v1" | 23 storage "google.golang.org/api/storage/v1" |
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
221 | 223 |
222 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 224 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
223 // No need to download artifacts they already exist locally. | 225 // No need to download artifacts they already exist locally. |
224 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) | 226 glog.Infof("Not downloading %s because TIMESTAMPS match", gsDir) |
225 return nil | 227 return nil |
226 } | 228 } |
227 glog.Infof("Timestamps between %s and %s are different. Downloading from
Google Storage", localDir, gsDir) | 229 glog.Infof("Timestamps between %s and %s are different. Downloading from
Google Storage", localDir, gsDir) |
228 return gs.downloadRemoteDir(localDir, gsDir) | 230 return gs.downloadRemoteDir(localDir, gsDir) |
229 } | 231 } |
230 | 232 |
231 func (gs *GsUtil) deleteRemoteDir(gsDir string) error { | 233 func (gs *GsUtil) DeleteRemoteDir(gsDir string) error { |
232 // The channel where the GS filepaths to be deleted will be sent to. | 234 // The channel where the GS filepaths to be deleted will be sent to. |
233 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) | 235 chFilePaths := make(chan string, MAX_CHANNEL_SIZE) |
234 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") | 236 req := gs.service.Objects.List(GSBucketName).Prefix(gsDir + "/") |
235 for req != nil { | 237 for req != nil { |
236 resp, err := req.Do() | 238 resp, err := req.Do() |
237 if err != nil { | 239 if err != nil { |
238 return fmt.Errorf("Error occured while listing %s: %s",
gsDir, err) | 240 return fmt.Errorf("Error occured while listing %s: %s",
gsDir, err) |
239 } | 241 } |
240 for _, result := range resp.Items { | 242 for _, result := range resp.Items { |
241 chFilePaths <- result.Name | 243 chFilePaths <- result.Name |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
312 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work
erNum)) | 314 gsDir := filepath.Join(dirName, pagesetType, fmt.Sprintf("slave%d", work
erNum)) |
313 | 315 |
314 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { | 316 if equal, _ := gs.AreTimeStampsEqual(localDir, gsDir); equal { |
315 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir
) | 317 glog.Infof("Not uploading %s because TIMESTAMPS match", localDir
) |
316 return nil | 318 return nil |
317 } | 319 } |
318 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo
gle Storage", localDir, gsDir) | 320 glog.Infof("Timestamps between %s and %s are different. Uploading to Goo
gle Storage", localDir, gsDir) |
319 return gs.UploadDir(localDir, gsDir, true) | 321 return gs.UploadDir(localDir, gsDir, true) |
320 } | 322 } |
321 | 323 |
| 324 // UploadSwarmingArtifact uploads the specified local artifacts to Google Storag
e. |
| 325 func (gs *GsUtil) UploadSwarmingArtifacts(dirName, pagesetType string) error { |
| 326 localDir := path.Join(StorageDir, dirName, pagesetType) |
| 327 gsDir := path.Join(SWARMING_DIR_NAME, dirName, pagesetType) |
| 328 |
| 329 return gs.UploadDir(localDir, gsDir, false) |
| 330 } |
| 331 |
| 332 // DownloadSwarmingArtifacts downloads the specified artifacts from Google Stora
ge to a local dir. |
| 333 // The Google storage directory is assumed to have numerical subdirs Eg: {1..100
0}. This function |
| 334 // downloads the contents of those directories into a local directory without th
e numerical |
| 335 // subdirs. |
| 336 // Returns the ranking/index of the downloaded artifact. |
| 337 func (gs *GsUtil) DownloadSwarmingArtifacts(localDir, remoteDirName, pagesetType
string, startRange, num int) (map[string]int, error) { |
| 338 // Empty the local dir. |
| 339 util.RemoveAll(localDir) |
| 340 // Create the local dir. |
| 341 util.MkdirAll(localDir, 0700) |
| 342 |
| 343 gsDir := filepath.Join(SWARMING_DIR_NAME, remoteDirName, pagesetType) |
| 344 endRange := num + startRange - 1 |
| 345 // The channel where remote files to be downloaded will be sent to. |
| 346 chRemoteDirs := make(chan string, num) |
| 347 for i := startRange; i <= endRange; i++ { |
| 348 chRemoteDirs <- filepath.Join(gsDir, strconv.Itoa(i)) |
| 349 } |
| 350 close(chRemoteDirs) |
| 351 |
| 352 // Dictionary of artifacts to its rank/index. |
| 353 artifactToIndex := map[string]int{} |
| 354 // Mutex to control access to the above dictionary. |
| 355 var mtx sync.Mutex |
| 356 // Kick off goroutines to download artifacts and populate the artifactTo
Index dictionary. |
| 357 var wg sync.WaitGroup |
| 358 for i := 0; i < GOROUTINE_POOL_SIZE; i++ { |
| 359 wg.Add(1) |
| 360 go func(goroutineNum int) { |
| 361 defer wg.Done() |
| 362 for remoteDir := range chRemoteDirs { |
| 363 if err := gs.downloadFromSwarmingDir(remoteDir,
gsDir, localDir, goroutineNum, &mtx, artifactToIndex); err != nil { |
| 364 glog.Error(err) |
| 365 return |
| 366 } |
| 367 } |
| 368 }(i + 1) |
| 369 } |
| 370 wg.Wait() |
| 371 if len(chRemoteDirs) != 0 { |
| 372 return artifactToIndex, fmt.Errorf("Unable to download all artif
acts.") |
| 373 } |
| 374 return artifactToIndex, nil |
| 375 } |
| 376 |
| 377 func (gs *GsUtil) downloadFromSwarmingDir(remoteDir, gsDir, localDir string, run
ID int, mtx *sync.Mutex, artifactToIndex map[string]int) error { |
| 378 req := gs.service.Objects.List(GSBucketName).Prefix(remoteDir + "/") |
| 379 for req != nil { |
| 380 resp, err := req.Do() |
| 381 if err != nil { |
| 382 return fmt.Errorf("Error occured while listing %s: %s",
gsDir, err) |
| 383 } |
| 384 for _, result := range resp.Items { |
| 385 fileName := filepath.Base(result.Name) |
| 386 fileGsDir := filepath.Dir(result.Name) |
| 387 index, err := strconv.Atoi(path.Base(fileGsDir)) |
| 388 if err != nil { |
| 389 return fmt.Errorf("%s was not in expected format
: %s", fileGsDir, err) |
| 390 } |
| 391 respBody, err := getRespBody(result, gs.client) |
| 392 if err != nil { |
| 393 return fmt.Errorf("Could not fetch %s: %s", resu
lt.MediaLink, err) |
| 394 } |
| 395 defer util.Close(respBody) |
| 396 outputFile := filepath.Join(localDir, fileName) |
| 397 out, err := os.Create(outputFile) |
| 398 if err != nil { |
| 399 return fmt.Errorf("Unable to create file %s: %s"
, outputFile, err) |
| 400 } |
| 401 defer util.Close(out) |
| 402 if _, err = io.Copy(out, respBody); err != nil { |
| 403 return err |
| 404 } |
| 405 glog.Infof("Downloaded gs://%s/%s to %s with id#%d", GSB
ucketName, result.Name, outputFile, runID) |
| 406 // Sleep for a second after downloading file to avoid bo
mbarding Cloud |
| 407 // storage. |
| 408 time.Sleep(time.Second) |
| 409 mtx.Lock() |
| 410 artifactToIndex[path.Join(localDir, fileName)] = index |
| 411 mtx.Unlock() |
| 412 } |
| 413 if len(resp.NextPageToken) > 0 { |
| 414 req.PageToken(resp.NextPageToken) |
| 415 } else { |
| 416 req = nil |
| 417 } |
| 418 } |
| 419 return nil |
| 420 } |
| 421 |
322 // UploadDir uploads the specified local dir into the specified Google Storage d
ir. | 422 // UploadDir uploads the specified local dir into the specified Google Storage d
ir. |
323 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { | 423 func (gs *GsUtil) UploadDir(localDir, gsDir string, cleanDir bool) error { |
324 if cleanDir { | 424 if cleanDir { |
325 // Empty the remote dir before uploading to it. | 425 // Empty the remote dir before uploading to it. |
326 » » util.LogErr(gs.deleteRemoteDir(gsDir)) | 426 » » util.LogErr(gs.DeleteRemoteDir(gsDir)) |
327 } | 427 } |
328 | 428 |
329 // Construct a dictionary of file paths to their file infos. | 429 // Construct a dictionary of file paths to their file infos. |
330 pathsToFileInfos := map[string]os.FileInfo{} | 430 pathsToFileInfos := map[string]os.FileInfo{} |
331 visit := func(path string, f os.FileInfo, err error) error { | 431 visit := func(path string, f os.FileInfo, err error) error { |
332 if f.IsDir() { | 432 if f.IsDir() { |
333 return nil | 433 return nil |
334 } | 434 } |
335 pathsToFileInfos[path] = f | 435 pathsToFileInfos[path] = f |
336 return nil | 436 return nil |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
369 } | 469 } |
370 // Sleep for a second after uploading file to av
oid bombarding Cloud | 470 // Sleep for a second after uploading file to av
oid bombarding Cloud |
371 // storage. | 471 // storage. |
372 time.Sleep(time.Second) | 472 time.Sleep(time.Second) |
373 } | 473 } |
374 }(i + 1) | 474 }(i + 1) |
375 } | 475 } |
376 wg.Wait() | 476 wg.Wait() |
377 return nil | 477 return nil |
378 } | 478 } |
OLD | NEW |