| Index: ct/go/worker_scripts/capture_archives/main.go
|
| diff --git a/ct/go/worker_scripts/capture_archives/main.go b/ct/go/worker_scripts/capture_archives/main.go
|
| index d634fdf9adae5c1c3eed0939cb34ed67038dd98e..41b310e6f903b0e6c2ce07878c0414fc74844f06 100644
|
| --- a/ct/go/worker_scripts/capture_archives/main.go
|
| +++ b/ct/go/worker_scripts/capture_archives/main.go
|
| @@ -6,7 +6,10 @@ import (
|
| "flag"
|
| "fmt"
|
| "io/ioutil"
|
| + "path"
|
| "path/filepath"
|
| + "strconv"
|
| + "sync"
|
| "time"
|
|
|
| "github.com/skia-dev/glog"
|
| @@ -17,9 +20,16 @@ import (
|
| skutil "go.skia.org/infra/go/util"
|
| )
|
|
|
| +const (
|
| + // The number of goroutines that will run in parallel to capture archives.
|
| + WORKER_POOL_SIZE = 1
|
| +)
|
| +
|
| var (
|
| - workerNum = flag.Int("worker_num", 1, "The number of this CT worker. It will be in the {1..100} range.")
|
| - pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All.")
|
| + startRange = flag.Int("start_range", 1, "The number this worker will capture webpage archives from.")
|
| + num = flag.Int("num", 100, "The total number of archives to capture starting from the start_range.")
|
| + pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.")
|
| + chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How often all chrome processes will be killed on this slave.")
|
| )
|
|
|
| func main() {
|
| @@ -28,10 +38,6 @@ func main() {
|
| defer util.TimeTrack(time.Now(), "Capturing Archives")
|
| defer glog.Flush()
|
|
|
| - // Create the task file so that the master knows this worker is still busy.
|
| - skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES))
|
| - defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)
|
| -
|
| // Reset the local chromium checkout.
|
| if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil {
|
| glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err)
|
| @@ -47,6 +53,7 @@ func main() {
|
| pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType)
|
| skutil.RemoveAll(pathToArchives)
|
| skutil.MkdirAll(pathToArchives, 0700)
|
| + defer skutil.RemoveAll(pathToArchives)
|
|
|
| // Instantiate GsUtil object.
|
| gs, err := util.NewGsUtil(nil)
|
| @@ -55,13 +62,15 @@ func main() {
|
| return
|
| }
|
|
|
| - // Download pagesets if they do not exist locally.
|
| - if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetType, *workerNum); err != nil {
|
| + // Download pagesets.
|
| + pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType)
|
| + pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, util.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num)
|
| + if err != nil {
|
| glog.Error(err)
|
| return
|
| }
|
| + defer skutil.RemoveAll(pathToPagesets)
|
|
|
| - pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType)
|
| recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_RECORD_WPR)
|
| timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeoutSecs
|
| // Loop through all pagesets.
|
| @@ -70,45 +79,95 @@ func main() {
|
| glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPagesets, err)
|
| return
|
| }
|
| - glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos)
|
| - for _, fileInfo := range fileInfos {
|
| - pagesetBaseName := filepath.Base(fileInfo.Name())
|
| - if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" {
|
| - // Ignore timestamp files and .pyc files.
|
| - continue
|
| - }
|
| -
|
| - // Read the pageset.
|
| - pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name())
|
| - decodedPageset, err := util.ReadPageset(pagesetPath)
|
| - if err != nil {
|
| - glog.Errorf("Could not read %s: %s", pagesetPath, err)
|
| - return
|
| - }
|
| -
|
| - glog.Infof("===== Processing %s =====", pagesetPath)
|
| - args := []string{
|
| - util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK,
|
| - "--extra-browser-args=--disable-setuid-sandbox",
|
| - "--browser=reference",
|
| - "--user-agent=" + decodedPageset.UserAgent,
|
| - "--urls-list=" + decodedPageset.UrlsList,
|
| - "--archive-data-file=" + decodedPageset.ArchiveDataFile,
|
| - "--device=desktop",
|
| - }
|
| - env := []string{
|
| - fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets),
|
| - "DISPLAY=:0",
|
| - }
|
| - skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil))
|
| +
|
| + // Create channel that contains all pageset file names. This channel will
|
| + // be consumed by the worker pool.
|
| + pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos)
|
| +
|
| + var wg sync.WaitGroup
|
| + // Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to
|
| + // the workers (acting as "readers") when it wants to be the "writer" and
|
| + // kill all zombie chrome processes.
|
| + var mutex sync.RWMutex
|
| +
|
| + // Loop through workers in the worker pool.
|
| + for i := 0; i < WORKER_POOL_SIZE; i++ {
|
| + // Increment the WaitGroup counter.
|
| + wg.Add(1)
|
| +
|
| + // Create and run a goroutine closure that captures SKPs.
|
| + go func() {
|
| + // Decrement the WaitGroup counter when the goroutine completes.
|
| + defer wg.Done()
|
| +
|
| + for pagesetBaseName := range pagesetRequests {
|
| + if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" {
|
| + // Ignore timestamp files and .pyc files.
|
| + continue
|
| + }
|
| +
|
| + mutex.RLock()
|
| +
|
| + // Read the pageset.
|
| + pagesetPath := filepath.Join(pathToPagesets, pagesetBaseName)
|
| + decodedPageset, err := util.ReadPageset(pagesetPath)
|
| + if err != nil {
|
| + glog.Errorf("Could not read %s: %s", pagesetPath, err)
|
| + return
|
| + }
|
| +
|
| + glog.Infof("===== Processing %s =====", pagesetPath)
|
| + index := strconv.Itoa(pagesetsToIndex[path.Join(pathToPagesets, pagesetBaseName)])
|
| + archiveDataFile := addIndexInDataFileLocation(decodedPageset.ArchiveDataFile, index)
|
| + args := []string{
|
| + util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK,
|
| + "--extra-browser-args=--disable-setuid-sandbox",
|
| + "--browser=reference",
|
| + "--user-agent=" + decodedPageset.UserAgent,
|
| + "--urls-list=" + decodedPageset.UrlsList,
|
| + "--archive-data-file=" + archiveDataFile,
|
| + "--device=desktop",
|
| + }
|
| + env := []string{
|
| + fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets),
|
| + "DISPLAY=:0",
|
| + }
|
| + // Retry record_wpr binary 3 times if there are any errors.
|
| + retryAttempts := 3
|
| + for i := 0; ; i++ {
|
| + err = util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil)
|
| + if err == nil {
|
| + break
|
| + }
|
| + if i >= (retryAttempts - 1) {
|
| + glog.Errorf("%s failed inspite of 3 retries. Last error: %s", pagesetPath, err)
|
| + break
|
| + }
|
| + time.Sleep(time.Second)
|
| + glog.Warningf("Retrying due to error: %s", err)
|
| + }
|
| + mutex.RUnlock()
|
| + }
|
| + }()
|
| + }
|
| +
|
| + if !*worker_common.Local {
|
| + // Start the cleaner.
|
| + go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer)
|
| }
|
|
|
| - // Write timestamp to the webpage archives dir.
|
| - skutil.LogErr(util.CreateTimestampFile(pathToArchives))
|
| + // Wait for all spawned goroutines to complete.
|
| + wg.Wait()
|
|
|
| - // Upload webpage archives dir to Google Storage.
|
| - if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType, *workerNum); err != nil {
|
| + // Upload all webpage archives to Google Storage.
|
| + if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType); err != nil {
|
| glog.Error(err)
|
| return
|
| }
|
| }
|
| +
|
| +func addIndexInDataFileLocation(originalDataFile string, index string) string {
|
| + fileName := filepath.Base(originalDataFile)
|
| + fileDir := filepath.Dir(originalDataFile)
|
| + return path.Join(fileDir, index, fileName)
|
| +}
|
|
|