Index: ct/go/util/util.go |
diff --git a/ct/go/util/util.go b/ct/go/util/util.go |
index 95d6043d347a3efd4ae6ee1b01c1ba6f39b3c58c..e4291fbed729b83df942f967d0187b4538772688 100644 |
--- a/ct/go/util/util.go |
+++ b/ct/go/util/util.go |
@@ -3,6 +3,7 @@ package util |
import ( |
"bufio" |
+ "encoding/csv" |
"encoding/json" |
"fmt" |
"io" |
@@ -12,6 +13,7 @@ import ( |
"path/filepath" |
"runtime" |
"strconv" |
+ "strings" |
"sync" |
"time" |
@@ -467,3 +469,218 @@ func GetPathToPyFiles(runOnSwarming bool) string { |
return filepath.Join(filepath.Dir(filepath.Dir(filepath.Dir(currentFile))), "py") |
} |
} |
+ |
+func MergeUploadCSVFiles(runID, pathToPyFiles string, gs *GsUtil, totalPages, numPerWorker int) ([]string, error) { |
+ localOutputDir := filepath.Join(StorageDir, BenchmarkRunsDir, runID) |
+ util.MkdirAll(localOutputDir, 0700) |
+ noOutputSlaves := []string{} |
+ // Copy outputs from all slaves locally. |
+ for i := 0; i < totalPages/numPerWorker; i++ { |
+ startRange := (i * numPerWorker) + 1 |
+ workerLocalOutputPath := filepath.Join(localOutputDir, strconv.Itoa(startRange)+".csv") |
+ workerRemoteOutputPath := filepath.Join(BenchmarkRunsDir, runID, strconv.Itoa(startRange), "outputs", runID+".output") |
+ respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath) |
+ if err != nil { |
+ glog.Errorf("Could not fetch %s: %s", workerRemoteOutputPath, err) |
+ noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i+1)) |
+ continue |
+ } |
+ defer util.Close(respBody) |
+ out, err := os.Create(workerLocalOutputPath) |
+ if err != nil { |
+ return noOutputSlaves, fmt.Errorf("Unable to create file %s: %s", workerLocalOutputPath, err) |
+ } |
+ defer util.Close(out) |
+ defer util.Remove(workerLocalOutputPath) |
+ if _, err = io.Copy(out, respBody); err != nil { |
+ return noOutputSlaves, fmt.Errorf("Unable to copy to file %s: %s", workerLocalOutputPath, err) |
+ } |
+ // If an output is less than 20 bytes that means something went wrong on the slave. |
+ outputInfo, err := out.Stat() |
+ if err != nil { |
+ return noOutputSlaves, fmt.Errorf("Unable to stat file %s: %s", workerLocalOutputPath, err) |
+ } |
+ if outputInfo.Size() <= 20 { |
+ glog.Errorf("Output file was less than 20 bytes %s: %s", workerLocalOutputPath, err) |
+ noOutputSlaves = append(noOutputSlaves, strconv.Itoa(i+1)) |
+ continue |
+ } |
+ } |
+ // Call csv_merger.py to merge all results into a single results CSV. |
+ pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_merger.py") |
+ outputFileName := runID + ".output" |
+ args := []string{ |
+ pathToCsvMerger, |
+ "--csv_dir=" + localOutputDir, |
+ "--output_csv_name=" + filepath.Join(localOutputDir, outputFileName), |
+ } |
+ err := ExecuteCmd("python", args, []string{}, CSV_MERGER_TIMEOUT, nil, nil) |
+ if err != nil { |
+ return noOutputSlaves, fmt.Errorf("Error running csv_merger.py: %s", err) |
+ } |
+ // Copy the output file to Google Storage. |
+ remoteOutputDir := filepath.Join(BenchmarkRunsDir, runID, "consolidated_outputs") |
+ if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil { |
+ return noOutputSlaves, fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err) |
+ } |
+ return noOutputSlaves, nil |
+} |
+ |
+func RunBenchmark(fileInfoName, pathToPagesets, pathToPyFiles, localOutputDir, chromiumBuildName, chromiumBinary, runID, browserExtraArgs, benchmarkName, targetPlatform, benchmarkExtraArgs, pagesetType string, repeatBenchmark int) error { |
+ pagesetBaseName := filepath.Base(fileInfoName) |
+ if pagesetBaseName == TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" { |
+ // Ignore timestamp files and .pyc files. |
+ return nil |
+ } |
+ // Read the pageset. |
+ pagesetName := strings.TrimSuffix(pagesetBaseName, filepath.Ext(pagesetBaseName)) |
+ pagesetPath := filepath.Join(pathToPagesets, fileInfoName) |
+ decodedPageset, err := ReadPageset(pagesetPath) |
+ if err != nil { |
+ return fmt.Errorf("Could not read %s: %s", pagesetPath, err) |
+ } |
+ glog.Infof("===== Processing %s for %s =====", pagesetPath, runID) |
+ benchmark, present := BenchmarksToTelemetryName[benchmarkName] |
+ if !present { |
+ // If it is custom benchmark use the entered benchmark name. |
+ benchmark = benchmarkName |
+ } |
+ args := []string{ |
+ filepath.Join(TelemetryBinariesDir, BINARY_RUN_BENCHMARK), |
+ benchmark, |
+ "--also-run-disabled-tests", |
+ "--user-agent=" + decodedPageset.UserAgent, |
+ "--urls-list=" + decodedPageset.UrlsList, |
+ "--archive-data-file=" + decodedPageset.ArchiveDataFile, |
+ } |
+ // Need to capture output for all benchmarks. |
+ outputDirArgValue := filepath.Join(localOutputDir, pagesetName) |
+ args = append(args, "--output-dir="+outputDirArgValue) |
+ // Figure out which browser and device should be used. |
+ if targetPlatform == PLATFORM_ANDROID { |
+ if err := InstallChromeAPK(chromiumBuildName); err != nil { |
+ return fmt.Errorf("Error while installing APK: %s", err) |
+ } |
+ args = append(args, "--browser=android-chromium") |
+ } else { |
+ args = append(args, "--browser=exact", "--browser-executable="+chromiumBinary) |
+ args = append(args, "--device=desktop") |
+ } |
+ // Split benchmark args if not empty and append to args. |
+ if benchmarkExtraArgs != "" { |
+ args = append(args, strings.Fields(benchmarkExtraArgs)...) |
+ } |
+ if repeatBenchmark > 0 { |
+ // Add the number of times to repeat. |
+ args = append(args, fmt.Sprintf("--page-repeat=%d", repeatBenchmark)) |
+ } |
+ // Add browserArgs if not empty to args. |
+ if browserExtraArgs != "" { |
+ args = append(args, "--extra-browser-args="+browserExtraArgs) |
+ } |
+ // Set the PYTHONPATH to the pagesets and the telemetry dirs. |
+ env := []string{ |
+ fmt.Sprintf("PYTHONPATH=%s:%s:%s:%s:$PYTHONPATH", pathToPagesets, TelemetryBinariesDir, TelemetrySrcDir, CatapultSrcDir), |
+ "DISPLAY=:0", |
+ } |
+ timeoutSecs := PagesetTypeToInfo[pagesetType].RunChromiumPerfTimeoutSecs |
+ if err := ExecuteCmd("python", args, env, time.Duration(timeoutSecs)*time.Second, nil, nil); err != nil { |
+ glog.Errorf("Run benchmark command failed with: %s", err) |
+ } |
+ return nil |
+} |
+ |
+func MergeUploadCSVFilesOnWorkers(localOutputDir, pathToPyFiles, runID, remoteDir string, gs *GsUtil, startRange int) error { |
+ // Move all results into a single directory. |
+ fileInfos, err := ioutil.ReadDir(localOutputDir) |
+ if err != nil { |
+ return fmt.Errorf("Unable to read %s: %s", localOutputDir, err) |
+ } |
+ for _, fileInfo := range fileInfos { |
+ if !fileInfo.IsDir() { |
+ continue |
+ } |
+ outputFile := filepath.Join(localOutputDir, fileInfo.Name(), "results-pivot-table.csv") |
+ newFile := filepath.Join(localOutputDir, fmt.Sprintf("%s.csv", fileInfo.Name())) |
+ if err := os.Rename(outputFile, newFile); err != nil { |
+ glog.Errorf("Could not rename %s to %s: %s", outputFile, newFile, err) |
+ continue |
+ } |
+ // Add the rank of the page to the CSV file. |
+ headers, values, err := getRowsFromCSV(newFile) |
+ if err != nil { |
+ glog.Errorf("Could not read %s: %s", newFile, err) |
+ continue |
+ } |
+ pageRank := fileInfo.Name() |
+ for i := range headers { |
+ for j := range values { |
+ if headers[i] == "page" { |
+ values[j][i] = fmt.Sprintf("%s (#%s)", values[j][i], pageRank) |
+ } |
+ } |
+ } |
+ if err := writeRowsToCSV(newFile, headers, values); err != nil { |
+ glog.Errorf("Could not write to %s: %s", newFile, err) |
+ continue |
+ } |
+ } |
+ // Call csv_pivot_table_merger.py to merge all results into a single results CSV. |
+ pathToCsvMerger := filepath.Join(pathToPyFiles, "csv_pivot_table_merger.py") |
+ outputFileName := runID + ".output" |
+ args := []string{ |
+ pathToCsvMerger, |
+ "--csv_dir=" + localOutputDir, |
+ "--output_csv_name=" + filepath.Join(localOutputDir, outputFileName), |
+ } |
+ err = ExecuteCmd("python", args, []string{}, CSV_PIVOT_TABLE_MERGER_TIMEOUT, nil, |
+ nil) |
+ if err != nil { |
+ return fmt.Errorf("Error running csv_pivot_table_merger.py: %s", err) |
+ } |
+ // Copy the output file to Google Storage. |
+ remoteOutputDir := filepath.Join(remoteDir, strconv.Itoa(startRange), "outputs") |
+ if err := gs.UploadFile(outputFileName, localOutputDir, remoteOutputDir); err != nil { |
+ return fmt.Errorf("Unable to upload %s to %s: %s", outputFileName, remoteOutputDir, err) |
+ } |
+ return nil |
+} |
+ |
+func getRowsFromCSV(csvPath string) ([]string, [][]string, error) { |
+ csvFile, err := os.Open(csvPath) |
+ defer util.Close(csvFile) |
+ if err != nil { |
+ return nil, nil, fmt.Errorf("Could not open %s: %s", csvPath, err) |
+ } |
+ reader := csv.NewReader(csvFile) |
+ reader.FieldsPerRecord = -1 |
+ rawCSVdata, err := reader.ReadAll() |
+ if err != nil { |
+ return nil, nil, fmt.Errorf("Could not read %s: %s", csvPath, err) |
+ } |
+ if len(rawCSVdata) < 2 { |
+ return nil, nil, fmt.Errorf("No data in %s", csvPath) |
+ } |
+ return rawCSVdata[0], rawCSVdata[1:], nil |
+} |
+ |
+func writeRowsToCSV(csvPath string, headers []string, values [][]string) error { |
+ csvFile, err := os.OpenFile(csvPath, os.O_WRONLY, 666) |
+ defer util.Close(csvFile) |
+ if err != nil { |
+ return fmt.Errorf("Could not open %s: %s", csvPath, err) |
+ } |
+ writer := csv.NewWriter(csvFile) |
+ defer writer.Flush() |
+ // Write the headers. |
+ if err := writer.Write(headers); err != nil { |
+ return fmt.Errorf("Could not write to %s: %s", csvPath, err) |
+ } |
+ // Write all values. |
+ for _, row := range values { |
+ if err := writer.Write(row); err != nil { |
+ return fmt.Errorf("Could not write to %s: %s", csvPath, err) |
+ } |
+ } |
+ return nil |
+} |