| Index: ct/go/util/util.go
|
| diff --git a/ct/go/util/util.go b/ct/go/util/util.go
|
| index 95d6043d347a3efd4ae6ee1b01c1ba6f39b3c58c..e4291fbed729b83df942f967d0187b4538772688 100644
|
| --- a/ct/go/util/util.go
|
| +++ b/ct/go/util/util.go
|
| @@ -3,6 +3,7 @@ package util
|
|
|
| import (
|
| "bufio"
|
| + "encoding/csv"
|
| "encoding/json"
|
| "fmt"
|
| "io"
|
| @@ -12,6 +13,7 @@ import (
|
| "path/filepath"
|
| "runtime"
|
| "strconv"
|
| + "strings"
|
| "sync"
|
| "time"
|
|
|
| @@ -467,3 +469,218 @@ func GetPathToPyFiles(runOnSwarming bool) string {
|
| return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile))), "py")
|
| }
|
| }
|
| +
|
| +func MergeUploadCSVFiles(runID, pathToPyFiles string, gs *GsUtil, totalPages, numPerWorker int) ([]string, error) {
|
| + localOutputDir := filepath.Join(StorageDir, BenchmarkRunsDir, runID)
|
| + util.MkdirAll(localOutputDir, 0700)
|
| + noOutputSlaves := []string{}
|
| + // Copy outputs from all slaves locally.
|
| + for i := 0; i < totalPages/numPerWorker; i++ {
|
| + startRange := (i * numPerWorker) + 1
|
| + workerLocalOutputPath := filepath.Join(localOutputDir, strconv.Itoa(startRange)+".csv")
|
| + workerRemoteOutputPath := filepath.Join(BenchmarkRunsDir, runID, strconv.Itoa(startRange), "outputs", runID+".output")
|
| + respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath)
|
| + if err != nil {
|
| + glog.Errorf("Could not fetch %s: %s", workerRemoteOutputPath, err)
|
| + noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i+1))
|
| + continue
|
| + }
|
| + defer util.Close(respBody)
|
| + out, err := os.Create(workerLocalOutputPath)
|
| + if err != nil {
|
| + return noOutputSlaves, fmt.Errorf("Unable to create file %s: %s", workerLocalOutputPath, err)
|
| + }
|
| + defer util.Close(out)
|
| + defer util.Remove(workerLocalOutputPath)
|
| + if _, err = io.Copy(out, respBody); err != nil {
|
| + return noOutputSlaves, fmt.Errorf("Unable to copy to file %s: %s", workerLocalOutputPath, err)
|
| + }
|
| + // If an output is less than 20 bytes that means something went wrong on the slave.
|
| + outputInfo, err := out.Stat()
|
| + if err != nil {
|
| + return noOutputSlaves, fmt.Errorf("Unable to stat file %s: %s", workerLocalOutputPath, err)
|
| + }
|
| + if outputInfo.Size() <= 20 {
|
| + glog.Errorf("Output file was less than 20 bytes %s: %s", workerLocalOutputPath, err)
|
| + noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i+1))
|
| + continue
|
| + }
|
| + }
|
| + // Call csv_merger.py to merge all results into a single results CSV.
|
| + pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_merger.py")
|
| + outputFileName := runID + ".output"
|
| + args := []string{
|
| + pathToCsvMerger,
|
| + "--csv_dir=" + localOutputDir,
|
| + "--output_csv_name=" + filepath.Join(localOutputDir, outputFileName),
|
| + }
|
| + err := ExecuteCmd("python", args, []string{}, CSV_MERGER_TIMEOUT, nil, nil)
|
| + if err != nil {
|
| + return noOutputSlaves, fmt.Errorf("Error running csv_merger.py: %s", err)
|
| + }
|
| + // Copy the output file to Google Storage.
|
| + remoteOutputDir := filepath.Join(BenchmarkRunsDir, runID, "consolidated_outputs")
|
| + if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil {
|
| + return noOutputSlaves, fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err)
|
| + }
|
| + return noOutputSlaves, nil
|
| +}
|
| +
|
| +func RunBenchmark(fileInfoName, pathToPagesets, pathToPyFiles, localOutputDir, chromiumBuildName, chromiumBinary, runID, browserExtraArgs, benchmarkName, targetPlatform, benchmarkExtraArgs, pagesetType string, repeatBenchmark int) error {
|
| + pagesetBaseName := filepath.Base(fileInfoName)
|
| + if pagesetBaseName == TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" {
|
| + // Ignore timestamp files and .pyc files.
|
| + return nil
|
| + }
|
| + // Read the pageset.
|
| + pagesetName := strings.TrimSuffix(pagesetBaseName, filepath.Ext(pagesetBaseName))
|
| + pagesetPath := filepath.Join(pathToPagesets, fileInfoName)
|
| + decodedPageset, err := ReadPageset(pagesetPath)
|
| + if err != nil {
|
| + return fmt.Errorf("Could not read %s: %s", pagesetPath, err)
|
| + }
|
| + glog.Infof("===== Processing %s for %s =====", pagesetPath, runID)
|
| + benchmark, present := BenchmarksToTelemetryName[benchmarkName]
|
| + if !present {
|
| + // If it is custom benchmark use the entered benchmark name.
|
| + benchmark = benchmarkName
|
| + }
|
| + args := []string{
|
| + filepath.Join(TelemetryBinariesDir, BINARY_RUN_BENCHMARK),
|
| + benchmark,
|
| + "--also-run-disabled-tests",
|
| + "--user-agent=" + decodedPageset.UserAgent,
|
| + "--urls-list=" + decodedPageset.UrlsList,
|
| + "--archive-data-file=" + decodedPageset.ArchiveDataFile,
|
| + }
|
| + // Need to capture output for all benchmarks.
|
| + outputDirArgValue := filepath.Join(localOutputDir, pagesetName)
|
| + args = append(args, "--output-dir="+outputDirArgValue)
|
| + // Figure out which browser and device should be used.
|
| + if targetPlatform == PLATFORM_ANDROID {
|
| + if err := InstallChromeAPK(chromiumBuildName); err != nil {
|
| + return fmt.Errorf("Error while installing APK: %s", err)
|
| + }
|
| + args = append(args, "--browser=android-chromium")
|
| + } else {
|
| + args = append(args, "--browser=exact", "--browser-executable="+chromiumBinary)
|
| + args = append(args, "--device=desktop")
|
| + }
|
| + // Split benchmark args if not empty and append to args.
|
| + if benchmarkExtraArgs != "" {
|
| + args = append(args, strings.Fields(benchmarkExtraArgs)...)
|
| + }
|
| + if repeatBenchmark > 0 {
|
| + // Add the number of times to repeat.
|
| + args = append(args, fmt.Sprintf("--page-repeat=%d", repeatBenchmark))
|
| + }
|
| + // Add browserArgs if not empty to args.
|
| + if browserExtraArgs != "" {
|
| + args = append(args, "--extra-browser-args="+browserExtraArgs)
|
| + }
|
| + // Set the PYTHONPATH to the pagesets and the telemetry dirs.
|
| + env := []string{
|
| + fmt.Sprintf("PYTHONPATH=%s:%s:%s:%s:$PYTHONPATH", pathToPagesets, TelemetryBinariesDir, TelemetrySrcDir, CatapultSrcDir),
|
| + "DISPLAY=:0",
|
| + }
|
| + timeoutSecs := PagesetTypeToInfo[pagesetType].RunChromiumPerfTimeoutSecs
|
| + if err := ExecuteCmd("python", args, env, time.Duration(timeoutSecs)*time.Second, nil, nil); err != nil {
|
| + glog.Errorf("Run benchmark command failed with: %s", err)
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func MergeUploadCSVFilesOnWorkers(localOutputDir, pathToPyFiles, runID, remoteDir string, gs *GsUtil, startRange int) error {
|
| + // Move all results into a single directory.
|
| + fileInfos, err := ioutil.ReadDir(localOutputDir)
|
| + if err != nil {
|
| + return fmt.Errorf("Unable to read %s: %s", localOutputDir, err)
|
| + }
|
| + for _, fileInfo := range fileInfos {
|
| + if !fileInfo.IsDir() {
|
| + continue
|
| + }
|
| + outputFile := filepath.Join(localOutputDir, fileInfo.Name(), "results-pivot-table.csv")
|
| + newFile := filepath.Join(localOutputDir, fmt.Sprintf("%s.csv", fileInfo.Name()))
|
| + if err := os.Rename(outputFile, newFile); err != nil {
|
| + glog.Errorf("Could not rename %s to %s: %s", outputFile, newFile, err)
|
| + continue
|
| + }
|
| + // Add the rank of the page to the CSV file.
|
| + headers, values, err := getRowsFromCSV(newFile)
|
| + if err != nil {
|
| + glog.Errorf("Could not read %s: %s", newFile, err)
|
| + continue
|
| + }
|
| + pageRank := fileInfo.Name()
|
| + for i := range headers {
|
| + for j := range values {
|
| + if headers[i] == "page" {
|
| + values[j][i] = fmt.Sprintf("%s (#%s)", values[j][i], pageRank)
|
| + }
|
| + }
|
| + }
|
| + if err := writeRowsToCSV(newFile, headers, values); err != nil {
|
| + glog.Errorf("Could not write to %s: %s", newFile, err)
|
| + continue
|
| + }
|
| + }
|
| + // Call csv_pivot_table_merger.py to merge all results into a single results CSV.
|
| + pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_pivot_table_merger.py")
|
| + outputFileName := runID + ".output"
|
| + args := []string{
|
| + pathToCsvMerger,
|
| + "--csv_dir=" + localOutputDir,
|
| + "--output_csv_name=" + filepath.Join(localOutputDir, outputFileName),
|
| + }
|
| + err = ExecuteCmd("python", args, []string{}, CSV_PIVOT_TABLE_MERGER_TIMEOUT, nil,
|
| + nil)
|
| + if err != nil {
|
| + return fmt.Errorf("Error running csv_pivot_table_merger.py: %s", err)
|
| + }
|
| + // Copy the output file to Google Storage.
|
| + remoteOutputDir := filepath.Join(remoteDir, strconv.Itoa(startRange), "outputs")
|
| + if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil {
|
| + return fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err)
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +func getRowsFromCSV(csvPath string) ([]string, [][]string, error) {
|
| + csvFile, err := os.Open(csvPath)
|
| + defer util.Close(csvFile)
|
| + if err != nil {
|
| + return nil, nil, fmt.Errorf("Could not open %s: %s", csvPath, err)
|
| + }
|
| + reader := csv.NewReader(csvFile)
|
| + reader.FieldsPerRecord = -1
|
| + rawCSVdata, err := reader.ReadAll()
|
| + if err != nil {
|
| + return nil, nil, fmt.Errorf("Could not read %s: %s", csvPath, err)
|
| + }
|
| + if len(rawCSVdata) < 2 {
|
| + return nil, nil, fmt.Errorf("No data in %s", csvPath)
|
| + }
|
| + return rawCSVdata[0], rawCSVdata[1:], nil
|
| +}
|
| +
|
| +func writeRowsToCSV(csvPath string, headers []string, values [][]string) error {
|
| + csvFile, err := os.OpenFile(csvPath, os.O_WRONLY, 666)
|
| + defer util.Close(csvFile)
|
| + if err != nil {
|
| + return fmt.Errorf("Could not open %s: %s", csvPath, err)
|
| + }
|
| + writer := csv.NewWriter(csvFile)
|
| + defer writer.Flush()
|
| + // Write the headers.
|
| + if err := writer.Write(headers); err != nil {
|
| + return fmt.Errorf("Could not write to %s: %s", csvPath, err)
|
| + }
|
| + // Write all values.
|
| + for _, row := range values {
|
| + if err := writer.Write(row); err != nil {
|
| + return fmt.Errorf("Could not write to %s: %s", csvPath, err)
|
| + }
|
| + }
|
| + return nil
|
| +}
|
|
|