Index: ct/go/worker_scripts/capture_archives/main.go |
diff --git a/ct/go/worker_scripts/capture_archives/main.go b/ct/go/worker_scripts/capture_archives/main.go |
index d634fdf9adae5c1c3eed0939cb34ed67038dd98e..41b310e6f903b0e6c2ce07878c0414fc74844f06 100644 |
--- a/ct/go/worker_scripts/capture_archives/main.go |
+++ b/ct/go/worker_scripts/capture_archives/main.go |
@@ -6,7 +6,10 @@ import ( |
"flag" |
"fmt" |
"io/ioutil" |
+ "path" |
"path/filepath" |
+ "strconv" |
+ "sync" |
"time" |
"github.com/skia-dev/glog" |
@@ -17,9 +20,16 @@ import ( |
skutil "go.skia.org/infra/go/util" |
) |
+const ( |
+ // The number of goroutines that will run in parallel to capture archives. |
+ WORKER_POOL_SIZE = 1 |
+) |
+ |
var ( |
- workerNum = flag.Int("worker_num", 1, "The number of this CT worker. It will be in the {1..100} range.") |
- pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All.") |
+ startRange = flag.Int("start_range", 1, "The number this worker will capture webpage archives from.") |
+ num = flag.Int("num", 100, "The total number of archives to capture starting from the start_range.") |
+ pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.") |
+ chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How often all chrome processes will be killed on this slave.") |
) |
func main() { |
@@ -28,10 +38,6 @@ func main() { |
defer util.TimeTrack(time.Now(), "Capturing Archives") |
defer glog.Flush() |
- // Create the task file so that the master knows this worker is still busy. |
- skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)) |
- defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES) |
- |
// Reset the local chromium checkout. |
if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { |
glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) |
@@ -47,6 +53,7 @@ func main() { |
pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) |
skutil.RemoveAll(pathToArchives) |
skutil.MkdirAll(pathToArchives, 0700) |
+ defer skutil.RemoveAll(pathToArchives) |
// Instantiate GsUtil object. |
gs, err := util.NewGsUtil(nil) |
@@ -55,13 +62,15 @@ func main() { |
return |
} |
- // Download pagesets if they do not exist locally. |
- if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetType, *workerNum); err != nil { |
+ // Download pagesets. |
+ pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) |
+ pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, util.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num) |
+ if err != nil { |
glog.Error(err) |
return |
} |
+ defer skutil.RemoveAll(pathToPagesets) |
- pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) |
recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_RECORD_WPR) |
timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeoutSecs |
// Loop through all pagesets. |
@@ -70,45 +79,95 @@ func main() { |
glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPagesets, err) |
return |
} |
- glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos) |
- for _, fileInfo := range fileInfos { |
- pagesetBaseName := filepath.Base(fileInfo.Name()) |
- if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" { |
- // Ignore timestamp files and .pyc files. |
- continue |
- } |
- |
- // Read the pageset. |
- pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name()) |
- decodedPageset, err := util.ReadPageset(pagesetPath) |
- if err != nil { |
- glog.Errorf("Could not read %s: %s", pagesetPath, err) |
- return |
- } |
- |
- glog.Infof("===== Processing %s =====", pagesetPath) |
- args := []string{ |
- util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, |
- "--extra-browser-args=--disable-setuid-sandbox", |
- "--browser=reference", |
- "--user-agent=" + decodedPageset.UserAgent, |
- "--urls-list=" + decodedPageset.UrlsList, |
- "--archive-data-file=" + decodedPageset.ArchiveDataFile, |
- "--device=desktop", |
- } |
- env := []string{ |
- fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets), |
- "DISPLAY=:0", |
- } |
- skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil)) |
+ |
+ // Create channel that contains all pageset file names. This channel will |
+ // be consumed by the worker pool. |
+ pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos) |
+ |
+ var wg sync.WaitGroup |
+ // Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to |
+ // the workers (acting as "readers") when it wants to be the "writer" and |
+ // kill all zombie chrome processes. |
+ var mutex sync.RWMutex |
+ |
+ // Loop through workers in the worker pool. |
+ for i := 0; i < WORKER_POOL_SIZE; i++ { |
+ // Increment the WaitGroup counter. |
+ wg.Add(1) |
+ |
+ // Create and run a goroutine closure that captures SKPs. |
+ go func() { |
+ // Decrement the WaitGroup counter when the goroutine completes. |
+ defer wg.Done() |
+ |
+ for pagesetBaseName := range pagesetRequests { |
+ if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" { |
+ // Ignore timestamp files and .pyc files. |
+ continue |
+ } |
+ |
+ mutex.RLock() |
+ |
+ // Read the pageset. |
+ pagesetPath := filepath.Join(pathToPagesets, pagesetBaseName) |
+ decodedPageset, err := util.ReadPageset(pagesetPath) |
+ if err != nil { |
+ glog.Errorf("Could not read %s: %s", pagesetPath, err) |
+ return |
+ } |
+ |
+ glog.Infof("===== Processing %s =====", pagesetPath) |
+ index := strconv.Itoa(pagesetsToIndex[path.Join(pathToPagesets, pagesetBaseName)]) |
+ archiveDataFile := addIndexInDataFileLocation(decodedPageset.ArchiveDataFile, index) |
+ args := []string{ |
+ util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, |
+ "--extra-browser-args=--disable-setuid-sandbox", |
+ "--browser=reference", |
+ "--user-agent=" + decodedPageset.UserAgent, |
+ "--urls-list=" + decodedPageset.UrlsList, |
+ "--archive-data-file=" + archiveDataFile, |
+ "--device=desktop", |
+ } |
+ env := []string{ |
+ fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets), |
+ "DISPLAY=:0", |
+ } |
+ // Retry record_wpr binary 3 times if there are any errors. |
+ retryAttempts := 3 |
+ for i := 0; ; i++ { |
+ err = util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil) |
+ if err == nil { |
+ break |
+ } |
+ if i >= (retryAttempts - 1) { |
+ glog.Errorf("%s failed inspite of 3 retries. Last error: %s", pagesetPath, err) |
+ break |
+ } |
+ time.Sleep(time.Second) |
+ glog.Warningf("Retrying due to error: %s", err) |
+ } |
+ mutex.RUnlock() |
+ } |
+ }() |
+ } |
+ |
+ if !*worker_common.Local { |
+ // Start the cleaner. |
+ go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer) |
} |
- // Write timestamp to the webpage archives dir. |
- skutil.LogErr(util.CreateTimestampFile(pathToArchives)) |
+ // Wait for all spawned goroutines to complete. |
+ wg.Wait() |
- // Upload webpage archives dir to Google Storage. |
- if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType, *workerNum); err != nil { |
+ // Upload all webpage archives to Google Storage. |
+ if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType); err != nil { |
glog.Error(err) |
return |
} |
} |
+ |
+func addIndexInDataFileLocation(originalDataFile string, index string) string { |
+ fileName := filepath.Base(originalDataFile) |
+ fileDir := filepath.Dir(originalDataFile) |
+ return path.Join(fileDir, index, fileName) |
+} |