Chromium Code Reviews| Index: ct/go/worker_scripts/capture_archives/main.go |
| diff --git a/ct/go/worker_scripts/capture_archives/main.go b/ct/go/worker_scripts/capture_archives/main.go |
| index d634fdf9adae5c1c3eed0939cb34ed67038dd98e..197dbe99c29d16a1edd5040dddd7d207234d0a30 100644 |
| --- a/ct/go/worker_scripts/capture_archives/main.go |
| +++ b/ct/go/worker_scripts/capture_archives/main.go |
| @@ -6,7 +6,10 @@ import ( |
| "flag" |
| "fmt" |
| "io/ioutil" |
| + "path" |
| "path/filepath" |
| + "strconv" |
| + "sync" |
| "time" |
| "github.com/skia-dev/glog" |
| @@ -17,9 +20,16 @@ import ( |
| skutil "go.skia.org/infra/go/util" |
| ) |
| +const ( |
| + // The number of goroutines that will run in parallel to capture archives. |
| + WORKER_POOL_SIZE = 1 |
|
rmistry
2016/05/18 16:14:01
Will continue to play with this number to find the
|
| +) |
| + |
| var ( |
| - workerNum = flag.Int("worker_num", 1, "The number of this CT worker. It will be in the {1..100} range.") |
| - pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All.") |
| + startRange = flag.Int("start_range", 1, "The number this worker will capture webpage archives from.") |
| + num = flag.Int("num", 100, "The total number of archives to capture starting from the start_range.") |
| + pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.") |
| + chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How often all chrome processes will be killed on this slave.") |
| ) |
| func main() { |
| @@ -28,10 +38,6 @@ func main() { |
| defer util.TimeTrack(time.Now(), "Capturing Archives") |
| defer glog.Flush() |
| - // Create the task file so that the master knows this worker is still busy. |
| - skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)) |
| - defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES) |
| - |
| // Reset the local chromium checkout. |
| if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { |
| glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) |
| @@ -47,6 +53,7 @@ func main() { |
| pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) |
| skutil.RemoveAll(pathToArchives) |
| skutil.MkdirAll(pathToArchives, 0700) |
| + defer skutil.RemoveAll(pathToArchives) |
| // Instantiate GsUtil object. |
| gs, err := util.NewGsUtil(nil) |
| @@ -55,13 +62,15 @@ func main() { |
| return |
| } |
| - // Download pagesets if they do not exist locally. |
| - if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetType, *workerNum); err != nil { |
| + // Download pagesets. |
| + pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) |
| + pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, util.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num) |
| + if err != nil { |
| glog.Error(err) |
| return |
| } |
| + defer skutil.RemoveAll(pathToPagesets) |
| - pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) |
| recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_RECORD_WPR) |
| timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeoutSecs |
| // Loop through all pagesets. |
| @@ -70,45 +79,82 @@ func main() { |
| glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPagesets, err) |
| return |
| } |
| - glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos) |
| - for _, fileInfo := range fileInfos { |
| - pagesetBaseName := filepath.Base(fileInfo.Name()) |
| - if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" { |
| - // Ignore timestamp files and .pyc files. |
| - continue |
| - } |
| - |
| - // Read the pageset. |
| - pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name()) |
| - decodedPageset, err := util.ReadPageset(pagesetPath) |
| - if err != nil { |
| - glog.Errorf("Could not read %s: %s", pagesetPath, err) |
| - return |
| - } |
| - |
| - glog.Infof("===== Processing %s =====", pagesetPath) |
| - args := []string{ |
| - util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, |
| - "--extra-browser-args=--disable-setuid-sandbox", |
| - "--browser=reference", |
| - "--user-agent=" + decodedPageset.UserAgent, |
| - "--urls-list=" + decodedPageset.UrlsList, |
| - "--archive-data-file=" + decodedPageset.ArchiveDataFile, |
| - "--device=desktop", |
| - } |
| - env := []string{ |
| - fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets), |
| - "DISPLAY=:0", |
| - } |
| - skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil)) |
| + |
| + // Create channel that contains all pageset file names. This channel will |
| + // be consumed by the worker pool. |
| + pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos) |
| + |
| + var wg sync.WaitGroup |
| + // Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to |
| + // the workers (acting as "readers") when it wants to be the "writer" and |
| + // kill all zombie chrome processes. |
| + var mutex sync.RWMutex |
| + |
| + // Loop through workers in the worker pool. |
| + for i := 0; i < WORKER_POOL_SIZE; i++ { |
| + // Increment the WaitGroup counter. |
| + wg.Add(1) |
| + |
| + // Create and run a goroutine closure that captures SKPs. |
| + go func() { |
| + // Decrement the WaitGroup counter when the goroutine completes. |
| + defer wg.Done() |
| + |
| + for pagesetBaseName := range pagesetRequests { |
| + if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" { |
| + // Ignore timestamp files and .pyc files. |
| + continue |
| + } |
| + |
| + mutex.RLock() |
| + |
| + // Read the pageset. |
| + pagesetPath := filepath.Join(pathToPagesets, pagesetBaseName) |
| + decodedPageset, err := util.ReadPageset(pagesetPath) |
| + if err != nil { |
| + glog.Errorf("Could not read %s: %s", pagesetPath, err) |
| + return |
| + } |
| + |
| + glog.Infof("===== Processing %s =====", pagesetPath) |
| + index := strconv.Itoa(pagesetsToIndex[path.Join(pathToPagesets, pagesetBaseName)]) |
| + archiveDataFile := addIndexInDataFileLocation(decodedPageset.ArchiveDataFile, index) |
| + args := []string{ |
| + util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, |
| + "--extra-browser-args=--disable-setuid-sandbox", |
| + "--browser=reference", |
| + "--user-agent=" + decodedPageset.UserAgent, |
| + "--urls-list=" + decodedPageset.UrlsList, |
| + "--archive-data-file=" + archiveDataFile, |
| + "--device=desktop", |
| + } |
| + env := []string{ |
| + fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets), |
| + "DISPLAY=:0", |
| + } |
| + skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil)) |
| + mutex.RUnlock() |
| + } |
| + }() |
| + } |
| + |
| + if !*worker_common.Local { |
| + // Start the cleaner. |
| + go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer) |
| } |
| - // Write timestamp to the webpage archives dir. |
| - skutil.LogErr(util.CreateTimestampFile(pathToArchives)) |
| + // Wait for all spawned goroutines to complete. |
| + wg.Wait() |
| - // Upload webpage archives dir to Google Storage. |
| - if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType, *workerNum); err != nil { |
| + // Upload all webpage archives to Google Storage. |
| + if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType); err != nil { |
| glog.Error(err) |
| return |
| } |
| } |
| + |
| +func addIndexInDataFileLocation(originalDataFile string, index string) string { |
| + fileName := filepath.Base(originalDataFile) |
| + fileDir := filepath.Dir(originalDataFile) |
| + return path.Join(fileDir, index, fileName) |
| +} |