Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(862)

Unified Diff: ct/go/worker_scripts/capture_archives/main.go

Issue 1988133002: Use swarming in capture_archives CT task and capture archives in parallel (Closed) Base URL: https://skia.googlesource.com/buildbot@ct-4-create_pagesets
Patch Set: Added retries to capture archives Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/worker_scripts/worker_common/worker_common.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: ct/go/worker_scripts/capture_archives/main.go
diff --git a/ct/go/worker_scripts/capture_archives/main.go b/ct/go/worker_scripts/capture_archives/main.go
index d634fdf9adae5c1c3eed0939cb34ed67038dd98e..c92976edaa7e38a6c9571fef90f0087459931373 100644
--- a/ct/go/worker_scripts/capture_archives/main.go
+++ b/ct/go/worker_scripts/capture_archives/main.go
@@ -6,7 +6,10 @@ import (
"flag"
"fmt"
"io/ioutil"
+ "path"
"path/filepath"
+ "strconv"
+ "sync"
"time"
"github.com/skia-dev/glog"
@@ -17,9 +20,16 @@ import (
skutil "go.skia.org/infra/go/util"
)
+const (
+ // The number of goroutines that will run in parallel to capture archives.
+ WORKER_POOL_SIZE = 1
+)
+
var (
- workerNum = flag.Int("worker_num", 1, "The number of this CT worker. It will be in the {1..100} range.")
- pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All.")
+ startRange = flag.Int("start_range", 1, "The number this worker will capture webpage archives from.")
+ num = flag.Int("num", 100, "The total number of archives to capture starting from the start_range.")
+ pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.")
+ chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How often all chrome processes will be killed on this slave.")
)
func main() {
@@ -28,10 +38,6 @@ func main() {
defer util.TimeTrack(time.Now(), "Capturing Archives")
defer glog.Flush()
- // Create the task file so that the master knows this worker is still busy.
- skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES))
- defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)
-
// Reset the local chromium checkout.
if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil {
glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err)
@@ -47,6 +53,7 @@ func main() {
pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType)
skutil.RemoveAll(pathToArchives)
skutil.MkdirAll(pathToArchives, 0700)
+ defer skutil.RemoveAll(pathToArchives)
// Instantiate GsUtil object.
gs, err := util.NewGsUtil(nil)
@@ -55,13 +62,15 @@ func main() {
return
}
- // Download pagesets if they do not exist locally.
- if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetType, *workerNum); err != nil {
+ // Download pagesets.
+ pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType)
+ pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, util.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num)
+ if err != nil {
glog.Error(err)
return
}
+ defer skutil.RemoveAll(pathToPagesets)
- pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType)
recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_RECORD_WPR)
timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeoutSecs
// Loop through all pagesets.
@@ -70,45 +79,95 @@ func main() {
glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPagesets, err)
return
}
- glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos)
- for _, fileInfo := range fileInfos {
- pagesetBaseName := filepath.Base(fileInfo.Name())
- if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" {
- // Ignore timestamp files and .pyc files.
- continue
- }
-
- // Read the pageset.
- pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name())
- decodedPageset, err := util.ReadPageset(pagesetPath)
- if err != nil {
- glog.Errorf("Could not read %s: %s", pagesetPath, err)
- return
- }
-
- glog.Infof("===== Processing %s =====", pagesetPath)
- args := []string{
- util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK,
- "--extra-browser-args=--disable-setuid-sandbox",
- "--browser=reference",
- "--user-agent=" + decodedPageset.UserAgent,
- "--urls-list=" + decodedPageset.UrlsList,
- "--archive-data-file=" + decodedPageset.ArchiveDataFile,
- "--device=desktop",
- }
- env := []string{
- fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets),
- "DISPLAY=:0",
- }
- skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil))
+
+ // Create channel that contains all pageset file names. This channel will
+ // be consumed by the worker pool.
+ pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos)
+
+ var wg sync.WaitGroup
+ // Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to
+ // the workers (acting as "readers") when it wants to be the "writer" and
+ // kill all zombie chrome processes.
+ var mutex sync.RWMutex
+
+ // Loop through workers in the worker pool.
+ for i := 0; i < WORKER_POOL_SIZE; i++ {
+ // Increment the WaitGroup counter.
+ wg.Add(1)
+
+ // Create and run a goroutine closure that captures SKPs.
+ go func() {
+ // Decrement the WaitGroup counter when the goroutine completes.
+ defer wg.Done()
+
+ for pagesetBaseName := range pagesetRequests {
+ if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(pagesetBaseName) == ".pyc" {
+ // Ignore timestamp files and .pyc files.
+ continue
+ }
+
+ mutex.RLock()
+
+ // Read the pageset.
+ pagesetPath := filepath.Join(pathToPagesets, pagesetBaseName)
+ decodedPageset, err := util.ReadPageset(pagesetPath)
+ if err != nil {
+ glog.Errorf("Could not read %s: %s", pagesetPath, err)
+ return
+ }
+
+ glog.Infof("===== Processing %s =====", pagesetPath)
+ index := strconv.Itoa(pagesetsToIndex[path.Join(pathToPagesets, pagesetBaseName)])
+ archiveDataFile := addIndexInDataFileLocation(decodedPageset.ArchiveDataFile, index)
+ args := []string{
+ util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK,
+ "--extra-browser-args=--disable-setuid-sandbox",
+ "--browser=reference",
+ "--user-agent=" + decodedPageset.UserAgent,
+ "--urls-list=" + decodedPageset.UrlsList,
+ "--archive-data-file=" + archiveDataFile,
+ "--device=desktop",
+ }
+ env := []string{
+ fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets),
+ "DISPLAY=:0",
+ }
+ // Retry record_wpr binary 3 times if there are any errors.
+ retryAttempts := 3
+ for i := 0; ; i++ {
+ err = util.ExecuteCmd(recordWprBinary, args, env, time.Duration(timeoutSecs)*time.Second, nil, nil)
+ if err == nil {
+ break
+ }
+ if i >= (retryAttempts - 1) {
+ glog.Errorf("%s failed inspite of 3 retries. Last error: %s", pagesetPath, err)
+ break
+ }
+ time.Sleep(time.Second)
+ glog.Warning("Encountered an error. Retrying...")
dogben 2016/05/19 14:29:38 nit: include error in log?
rmistry 2016/05/19 14:47:03 Done.
+ }
+ mutex.RUnlock()
+ }
+ }()
+ }
+
+ if !*worker_common.Local {
+ // Start the cleaner.
+ go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer)
}
- // Write timestamp to the webpage archives dir.
- skutil.LogErr(util.CreateTimestampFile(pathToArchives))
+ // Wait for all spawned goroutines to complete.
+ wg.Wait()
- // Upload webpage archives dir to Google Storage.
- if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType, *workerNum); err != nil {
+ // Upload all webpage archives to Google Storage.
+ if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetType); err != nil {
glog.Error(err)
return
}
}
+
+func addIndexInDataFileLocation(originalDataFile string, index string) string {
+ fileName := filepath.Base(originalDataFile)
+ fileDir := filepath.Dir(originalDataFile)
+ return path.Join(fileDir, index, fileName)
+}
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/worker_scripts/worker_common/worker_common.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698