OLD | NEW |
1 // Utility that contains methods for both CT master and worker scripts. | 1 // Utility that contains methods for both CT master and worker scripts. |
2 package util | 2 package util |
3 | 3 |
4 import ( | 4 import ( |
5 "bufio" | 5 "bufio" |
| 6 "encoding/csv" |
6 "encoding/json" | 7 "encoding/json" |
7 "fmt" | 8 "fmt" |
8 "io" | 9 "io" |
9 "io/ioutil" | 10 "io/ioutil" |
10 "os" | 11 "os" |
11 "path" | 12 "path" |
12 "path/filepath" | 13 "path/filepath" |
13 "runtime" | 14 "runtime" |
14 "strconv" | 15 "strconv" |
| 16 "strings" |
15 "sync" | 17 "sync" |
16 "time" | 18 "time" |
17 | 19 |
18 "go.skia.org/infra/go/exec" | 20 "go.skia.org/infra/go/exec" |
19 "go.skia.org/infra/go/swarming" | 21 "go.skia.org/infra/go/swarming" |
20 "go.skia.org/infra/go/util" | 22 "go.skia.org/infra/go/util" |
21 | 23 |
22 "github.com/skia-dev/glog" | 24 "github.com/skia-dev/glog" |
23 ) | 25 ) |
24 | 26 |
(...skipping 435 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
460 | 462 |
461 // GetPathToPyFiles returns the location of CT's python scripts. | 463 // GetPathToPyFiles returns the location of CT's python scripts. |
462 func GetPathToPyFiles(runOnSwarming bool) string { | 464 func GetPathToPyFiles(runOnSwarming bool) string { |
463 if runOnSwarming { | 465 if runOnSwarming { |
464 return filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "sr
c", "go.skia.org", "infra", "ct", "py") | 466 return filepath.Join(filepath.Dir(filepath.Dir(os.Args[0])), "sr
c", "go.skia.org", "infra", "ct", "py") |
465 } else { | 467 } else { |
466 _, currentFile, _, _ := runtime.Caller(0) | 468 _, currentFile, _, _ := runtime.Caller(0) |
467 return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(curr
entFile))), "py") | 469 return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(curr
entFile))), "py") |
468 } | 470 } |
469 } | 471 } |
| 472 |
| 473 func MergeUploadCSVFiles(runID, pathToPyFiles string, gs *GsUtil, totalPages, nu
mPerWorker int) ([]string, error) { |
| 474 localOutputDir := filepath.Join(StorageDir, BenchmarkRunsDir, runID) |
| 475 util.MkdirAll(localOutputDir, 0700) |
| 476 noOutputSlaves := []string{} |
| 477 // Copy outputs from all slaves locally. |
| 478 for i := 0; i < totalPages/numPerWorker; i++ { |
| 479 startRange := (i * numPerWorker) + 1 |
| 480 workerLocalOutputPath := filepath.Join(localOutputDir, strconv.I
toa(startRange)+".csv") |
| 481 workerRemoteOutputPath := filepath.Join(BenchmarkRunsDir, runID,
strconv.Itoa(startRange), "outputs", runID+".output") |
| 482 respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath
) |
| 483 if err != nil { |
| 484 glog.Errorf("Could not fetch %s: %s", workerRemoteOutput
Path, err) |
| 485 noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i+1
)) |
| 486 continue |
| 487 } |
| 488 defer util.Close(respBody) |
| 489 out, err := os.Create(workerLocalOutputPath) |
| 490 if err != nil { |
| 491 return noOutputSlaves, fmt.Errorf("Unable to create file
%s: %s", workerLocalOutputPath, err) |
| 492 } |
| 493 defer util.Close(out) |
| 494 defer util.Remove(workerLocalOutputPath) |
| 495 if _, err = io.Copy(out, respBody); err != nil { |
| 496 return noOutputSlaves, fmt.Errorf("Unable to copy to fil
e %s: %s", workerLocalOutputPath, err) |
| 497 } |
| 498 // If an output is less than 20 bytes that means something went
wrong on the slave. |
| 499 outputInfo, err := out.Stat() |
| 500 if err != nil { |
| 501 return noOutputSlaves, fmt.Errorf("Unable to stat file %
s: %s", workerLocalOutputPath, err) |
| 502 } |
| 503 if outputInfo.Size() <= 20 { |
| 504 glog.Errorf("Output file was less than 20 bytes %s: %s",
workerLocalOutputPath, err) |
| 505 noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i+1
)) |
| 506 continue |
| 507 } |
| 508 } |
| 509 // Call csv_merger.py to merge all results into a single results CSV. |
| 510 pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_merger.py") |
| 511 outputFileName := runID + ".output" |
| 512 args := []string{ |
| 513 pathToCsvMerger, |
| 514 "--csv_dir=" + localOutputDir, |
| 515 "--output_csv_name=" + filepath.Join(localOutputDir, outputFileN
ame), |
| 516 } |
| 517 err := ExecuteCmd("python", args, []string{}, CSV_MERGER_TIMEOUT, nil, n
il) |
| 518 if err != nil { |
| 519 return noOutputSlaves, fmt.Errorf("Error running csv_merger.py:
%s", err) |
| 520 } |
| 521 // Copy the output file to Google Storage. |
| 522 remoteOutputDir := filepath.Join(BenchmarkRunsDir, runID, "consolidated_
outputs") |
| 523 if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir)
; err != nil { |
| 524 return noOutputSlaves, fmt.Errorf("Unable to upload %s to %s: %s
", outputFileName, remoteOutputDir, err) |
| 525 } |
| 526 return noOutputSlaves, nil |
| 527 } |
| 528 |
| 529 func RunBenchmark(fileInfoName, pathToPagesets, pathToPyFiles, localOutputDir, c
hromiumBuildName, chromiumBinary, runID, browserExtraArgs, benchmarkName, target
Platform, benchmarkExtraArgs, pagesetType string, repeatBenchmark int) error { |
| 530 pagesetBaseName := filepath.Base(fileInfoName) |
| 531 if pagesetBaseName == TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseNam
e) == ".pyc" { |
| 532 // Ignore timestamp files and .pyc files. |
| 533 return nil |
| 534 } |
| 535 // Read the pageset. |
| 536 pagesetName := strings.TrimSuffix(pagesetBaseName, filepath.Ext(pagesetB
aseName)) |
| 537 pagesetPath := filepath.Join(pathToPagesets, fileInfoName) |
| 538 decodedPageset, err := ReadPageset(pagesetPath) |
| 539 if err != nil { |
| 540 return fmt.Errorf("Could not read %s: %s", pagesetPath, err) |
| 541 } |
| 542 glog.Infof("===== Processing %s for %s =====", pagesetPath, runID) |
| 543 benchmark, present := BenchmarksToTelemetryName[benchmarkName] |
| 544 if !present { |
| 545 // If it is custom benchmark use the entered benchmark name. |
| 546 benchmark = benchmarkName |
| 547 } |
| 548 args := []string{ |
| 549 filepath.Join(TelemetryBinariesDir, BINARY_RUN_BENCHMARK), |
| 550 benchmark, |
| 551 "--also-run-disabled-tests", |
| 552 "--user-agent=" + decodedPageset.UserAgent, |
| 553 "--urls-list=" + decodedPageset.UrlsList, |
| 554 "--archive-data-file=" + decodedPageset.ArchiveDataFile, |
| 555 } |
| 556 // Need to capture output for all benchmarks. |
| 557 outputDirArgValue := filepath.Join(localOutputDir, pagesetName) |
| 558 args = append(args, "--output-dir="+outputDirArgValue) |
| 559 // Figure out which browser and device should be used. |
| 560 if targetPlatform == PLATFORM_ANDROID { |
| 561 if err := InstallChromeAPK(chromiumBuildName); err != nil { |
| 562 return fmt.Errorf("Error while installing APK: %s", err) |
| 563 } |
| 564 args = append(args, "--browser=android-chromium") |
| 565 } else { |
| 566 args = append(args, "--browser=exact", "--browser-executable="+c
hromiumBinary) |
| 567 args = append(args, "--device=desktop") |
| 568 } |
| 569 // Split benchmark args if not empty and append to args. |
| 570 if benchmarkExtraArgs != "" { |
| 571 args = append(args, strings.Fields(benchmarkExtraArgs)...) |
| 572 } |
| 573 if repeatBenchmark > 0 { |
| 574 // Add the number of times to repeat. |
| 575 args = append(args, fmt.Sprintf("--page-repeat=%d", repeatBenchm
ark)) |
| 576 } |
| 577 // Add browserArgs if not empty to args. |
| 578 if browserExtraArgs != "" { |
| 579 args = append(args, "--extra-browser-args="+browserExtraArgs) |
| 580 } |
| 581 // Set the PYTHONPATH to the pagesets and the telemetry dirs. |
| 582 env := []string{ |
| 583 fmt.Sprintf("PYTHONPATH=%s:%s:%s:%s:$PYTHONPATH", pathToPagesets
, TelemetryBinariesDir, TelemetrySrcDir, CatapultSrcDir), |
| 584 "DISPLAY=:0", |
| 585 } |
| 586 timeoutSecs := PagesetTypeToInfo[pagesetType].RunChromiumPerfTimeoutSecs |
| 587 if err := ExecuteCmd("python", args, env, time.Duration(timeoutSecs)*tim
e.Second, nil, nil); err != nil { |
| 588 glog.Errorf("Run benchmark command failed with: %s", err) |
| 589 } |
| 590 return nil |
| 591 } |
| 592 |
| 593 func MergeUploadCSVFilesOnWorkers(localOutputDir, pathToPyFiles, runID, remoteDi
r string, gs *GsUtil, startRange int) error { |
| 594 // Move all results into a single directory. |
| 595 fileInfos, err := ioutil.ReadDir(localOutputDir) |
| 596 if err != nil { |
| 597 return fmt.Errorf("Unable to read %s: %s", localOutputDir, err) |
| 598 } |
| 599 for _, fileInfo := range fileInfos { |
| 600 if !fileInfo.IsDir() { |
| 601 continue |
| 602 } |
| 603 outputFile := filepath.Join(localOutputDir, fileInfo.Name(), "re
sults-pivot-table.csv") |
| 604 newFile := filepath.Join(localOutputDir, fmt.Sprintf("%s.csv", f
ileInfo.Name())) |
| 605 if err := os.Rename(outputFile, newFile); err != nil { |
| 606 glog.Errorf("Could not rename %s to %s: %s", outputFile,
newFile, err) |
| 607 continue |
| 608 } |
| 609 // Add the rank of the page to the CSV file. |
| 610 headers, values, err := getRowsFromCSV(newFile) |
| 611 if err != nil { |
| 612 glog.Errorf("Could not read %s: %s", newFile, err) |
| 613 continue |
| 614 } |
| 615 pageRank := fileInfo.Name() |
| 616 for i := range headers { |
| 617 for j := range values { |
| 618 if headers[i] == "page" { |
| 619 values[j][i] = fmt.Sprintf("%s (#%s)", v
alues[j][i], pageRank) |
| 620 } |
| 621 } |
| 622 } |
| 623 if err := writeRowsToCSV(newFile, headers, values); err != nil { |
| 624 glog.Errorf("Could not write to %s: %s", newFile, err) |
| 625 continue |
| 626 } |
| 627 } |
| 628 // Call csv_pivot_table_merger.py to merge all results into a single res
ults CSV. |
| 629 pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_pivot_table_merger.
py") |
| 630 outputFileName := runID + ".output" |
| 631 args := []string{ |
| 632 pathToCsvMerger, |
| 633 "--csv_dir=" + localOutputDir, |
| 634 "--output_csv_name=" + filepath.Join(localOutputDir, outputFileN
ame), |
| 635 } |
| 636 err = ExecuteCmd("python", args, []string{}, CSV_PIVOT_TABLE_MERGER_TIME
OUT, nil, |
| 637 nil) |
| 638 if err != nil { |
| 639 return fmt.Errorf("Error running csv_pivot_table_merger.py: %s",
err) |
| 640 } |
| 641 // Copy the output file to Google Storage. |
| 642 remoteOutputDir := filepath.Join(remoteDir, strconv.Itoa(startRange), "o
utputs") |
| 643 if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir)
; err != nil { |
| 644 return fmt.Errorf("Unable to upload %s to %s: %s", outputFileNam
e, remoteOutputDir, err) |
| 645 } |
| 646 return nil |
| 647 } |
| 648 |
| 649 func getRowsFromCSV(csvPath string) ([]string, [][]string, error) { |
| 650 csvFile, err := os.Open(csvPath) |
| 651 defer util.Close(csvFile) |
| 652 if err != nil { |
| 653 return nil, nil, fmt.Errorf("Could not open %s: %s", csvPath, er
r) |
| 654 } |
| 655 reader := csv.NewReader(csvFile) |
| 656 reader.FieldsPerRecord = -1 |
| 657 rawCSVdata, err := reader.ReadAll() |
| 658 if err != nil { |
| 659 return nil, nil, fmt.Errorf("Could not read %s: %s", csvPath, er
r) |
| 660 } |
| 661 if len(rawCSVdata) < 2 { |
| 662 return nil, nil, fmt.Errorf("No data in %s", csvPath) |
| 663 } |
| 664 return rawCSVdata[0], rawCSVdata[1:], nil |
| 665 } |
| 666 |
| 667 func writeRowsToCSV(csvPath string, headers []string, values [][]string) error { |
| 668 csvFile, err := os.OpenFile(csvPath, os.O_WRONLY, 666) |
| 669 defer util.Close(csvFile) |
| 670 if err != nil { |
| 671 return fmt.Errorf("Could not open %s: %s", csvPath, err) |
| 672 } |
| 673 writer := csv.NewWriter(csvFile) |
| 674 defer writer.Flush() |
| 675 // Write the headers. |
| 676 if err := writer.Write(headers); err != nil { |
| 677 return fmt.Errorf("Could not write to %s: %s", csvPath, err) |
| 678 } |
| 679 // Write all values. |
| 680 for _, row := range values { |
| 681 if err := writer.Write(row); err != nil { |
| 682 return fmt.Errorf("Could not write to %s: %s", csvPath,
err) |
| 683 } |
| 684 } |
| 685 return nil |
| 686 } |
OLD | NEW |