| Index: ct/go/worker_scripts/capture_archives/main.go | 
| diff --git a/ct/go/worker_scripts/capture_archives/main.go b/ct/go/worker_scripts/capture_archives/main.go | 
| index d634fdf9adae5c1c3eed0939cb34ed67038dd98e..41b310e6f903b0e6c2ce07878c0414fc74844f06 100644 | 
| --- a/ct/go/worker_scripts/capture_archives/main.go | 
| +++ b/ct/go/worker_scripts/capture_archives/main.go | 
| @@ -6,7 +6,10 @@ import ( | 
| "flag" | 
| "fmt" | 
| "io/ioutil" | 
| +	"path" | 
| "path/filepath" | 
| +	"strconv" | 
| +	"sync" | 
| "time" | 
|  | 
| "github.com/skia-dev/glog" | 
| @@ -17,9 +20,16 @@ import ( | 
| skutil "go.skia.org/infra/go/util" | 
| ) | 
|  | 
| +const ( | 
| +	// The number of goroutines that will run in parallel to capture archives. | 
| +	WORKER_POOL_SIZE = 1 | 
| +) | 
| + | 
| var ( | 
| -	workerNum   = flag.Int("worker_num", 1, "The number of this CT worker. It will be in the {1..100} range.") | 
| -	pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All.") | 
| +	startRange         = flag.Int("start_range", 1, "The number this worker will capture webpage archives from.") | 
| +	num                = flag.Int("num", 100, "The total number of archives to capture starting from the start_range.") | 
| +	pagesetType        = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.") | 
| +	chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How often all chrome processes will be killed on this slave.") | 
| ) | 
|  | 
| func main() { | 
| @@ -28,10 +38,6 @@ func main() { | 
| defer util.TimeTrack(time.Now(), "Capturing Archives") | 
| defer glog.Flush() | 
|  | 
| -	// Create the task file so that the master knows this worker is still busy. | 
| -	skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)) | 
| -	defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES) | 
| - | 
| // Reset the local chromium checkout. | 
| if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { | 
| glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) | 
| @@ -47,6 +53,7 @@ func main() { | 
| pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) | 
| skutil.RemoveAll(pathToArchives) | 
| skutil.MkdirAll(pathToArchives, 0700) | 
| +	defer skutil.RemoveAll(pathToArchives) | 
|  | 
| // Instantiate GsUtil object. | 
| gs, err := util.NewGsUtil(nil) | 
| @@ -55,13 +62,15 @@ func main() { | 
| return | 
| } | 
|  | 
| -	// Download pagesets if they do not exist locally. | 
| -	if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetType, *workerNum); err != nil { | 
| +	// Download pagesets. | 
| +	pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) | 
| +	pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, util.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num) | 
| +	if err != nil { | 
| glog.Error(err) | 
| return | 
| } | 
| +	defer skutil.RemoveAll(pathToPagesets) | 
|  | 
| -	pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) | 
| recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_RECORD_WPR) | 
| timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeoutSecs | 
| // Loop through all pagesets. | 
| @@ -70,45 +79,95 @@ func main() { | 
| glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPagesets, err) | 
| return | 
| } | 
| -	glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos) | 
| -	for _, fileInfo := range fileInfos { | 
| -		pagesetBaseName := filepath.Base(fileInfo.Name()) | 
| -		if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" { | 
| -			// Ignore timestamp files and .pyc files. | 
| -			continue | 
| -		} | 
| - | 
| -		// Read the pageset. | 
| -		pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name()) | 
| -		decodedPageset, err := util.ReadPageset(pagesetPath) | 
| -		if err != nil { | 
| -			glog.Errorf("Could not read %s: %s", pagesetPath, err) | 
| -			return | 
| -		} | 
| - | 
| -		glog.Infof("===== Processing %s =====", pagesetPath) | 
| -		args := []string{ | 
| -			util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, | 
| -			"--extra-browser-args=--disable-setuid-sandbox", | 
| -			"--browser=reference", | 
| -			"--user-agent=" + decodedPageset.UserAgent, | 
| -			"--urls-list=" + decodedPageset.UrlsList, | 
| -			"--archive-data-file=" + decodedPageset.ArchiveDataFile, | 
| -			"--device=desktop", | 
| -		} | 
| -		env := []string{ | 
| -			fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets), | 
| -			"DISPLAY=:0", | 
| -		} | 
| -		skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil)) | 
| + | 
| +	// Create channel that contains all pageset file names. This channel will | 
| +	// be consumed by the worker pool. | 
| +	pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos) | 
| + | 
| +	var wg sync.WaitGroup | 
| +	// Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to | 
| +	// the workers (acting as "readers") when it wants to be the "writer" and | 
| +	// kill all zombie chrome processes. | 
| +	var mutex sync.RWMutex | 
| + | 
| +	// Loop through workers in the worker pool. | 
| +	for i := 0; i < WORKER_POOL_SIZE; i++ { | 
| +		// Increment the WaitGroup counter. | 
| +		wg.Add(1) | 
| + | 
| +		// Create and run a goroutine closure that captures SKPs. | 
| +		go func() { | 
| +			// Decrement the WaitGroup counter when the goroutine completes. | 
| +			defer wg.Done() | 
| + | 
| +			for pagesetBaseName := range pagesetRequests { | 
| +				if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" { | 
| +					// Ignore timestamp files and .pyc files. | 
| +					continue | 
| +				} | 
| + | 
| +				mutex.RLock() | 
| + | 
| +				// Read the pageset. | 
| +				pagesetPath := filepath.Join(pathToPagesets, pagesetBaseName) | 
| +				decodedPageset, err := util.ReadPageset(pagesetPath) | 
| +				if err != nil { | 
| +					glog.Errorf("Could not read %s: %s", pagesetPath, err) | 
| +					return | 
| +				} | 
| + | 
| +				glog.Infof("===== Processing %s =====", pagesetPath) | 
| +				index := strconv.Itoa(pagesetsToIndex[path.Join(pathToPagesets, pagesetBaseName)]) | 
| +				archiveDataFile := addIndexInDataFileLocation(decodedPageset.ArchiveDataFile, index) | 
| +				args := []string{ | 
| +					util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, | 
| +					"--extra-browser-args=--disable-setuid-sandbox", | 
| +					"--browser=reference", | 
| +					"--user-agent=" + decodedPageset.UserAgent, | 
| +					"--urls-list=" + decodedPageset.UrlsList, | 
| +					"--archive-data-file=" + archiveDataFile, | 
| +					"--device=desktop", | 
| +				} | 
| +				env := []string{ | 
| +					fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets), | 
| +					"DISPLAY=:0", | 
| +				} | 
| +				// Retry record_wpr binary 3 times if there are any errors. | 
| +				retryAttempts := 3 | 
| +				for i := 0; ; i++ { | 
| +					err = util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil) | 
| +					if err == nil { | 
| +						break | 
| +					} | 
| +					if i >= (retryAttempts - 1) { | 
| +						glog.Errorf("%s failed inspite of 3 retries. Last error: %s", pagesetPath, err) | 
| +						break | 
| +					} | 
| +					time.Sleep(time.Second) | 
| +					glog.Warningf("Retrying due to error: %s", err) | 
| +				} | 
| +				mutex.RUnlock() | 
| +			} | 
| +		}() | 
| +	} | 
| + | 
| +	if !*worker_common.Local { | 
| +		// Start the cleaner. | 
| +		go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer) | 
| } | 
|  | 
| -	// Write timestamp to the webpage archives dir. | 
| -	skutil.LogErr(util.CreateTimestampFile(pathToArchives)) | 
| +	// Wait for all spawned goroutines to complete. | 
| +	wg.Wait() | 
|  | 
| -	// Upload webpage archives dir to Google Storage. | 
| -	if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType, *workerNum); err != nil { | 
| +	// Upload all webpage archives to Google Storage. | 
| +	if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType); err != nil { | 
| glog.Error(err) | 
| return | 
| } | 
| } | 
| + | 
| +func addIndexInDataFileLocation(originalDataFile string, index string) string { | 
| +	fileName := filepath.Base(originalDataFile) | 
| +	fileDir := filepath.Dir(originalDataFile) | 
| +	return path.Join(fileDir, index, fileName) | 
| +} | 
|  |