Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(269)

Side by Side Diff: ct/go/worker_scripts/capture_archives/main.go

Issue 1988133002: Use swarming in capture_archives CT task and capture archives in parallel (Closed) Base URL: https://skia.googlesource.com/buildbot@ct-4-create_pagesets
Patch Set: Comment Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/worker_scripts/worker_common/worker_common.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Application that captures webpage archives on a CT worker and uploads it to 1 // Application that captures webpage archives on a CT worker and uploads it to
2 // Google Storage. 2 // Google Storage.
3 package main 3 package main
4 4
5 import ( 5 import (
6 "flag" 6 "flag"
7 "fmt" 7 "fmt"
8 "io/ioutil" 8 "io/ioutil"
9 "path"
9 "path/filepath" 10 "path/filepath"
11 "strconv"
12 "sync"
10 "time" 13 "time"
11 14
12 "github.com/skia-dev/glog" 15 "github.com/skia-dev/glog"
13 16
14 "go.skia.org/infra/ct/go/util" 17 "go.skia.org/infra/ct/go/util"
15 "go.skia.org/infra/ct/go/worker_scripts/worker_common" 18 "go.skia.org/infra/ct/go/worker_scripts/worker_common"
16 "go.skia.org/infra/go/common" 19 "go.skia.org/infra/go/common"
17 skutil "go.skia.org/infra/go/util" 20 skutil "go.skia.org/infra/go/util"
18 ) 21 )
19 22
23 const (
24 // The number of goroutines that will run in parallel to capture archive s.
25 WORKER_POOL_SIZE = 1
26 )
27
20 var ( 28 var (
21 » workerNum = flag.Int("worker_num", 1, "The number of this CT worker. I t will be in the {1..100} range.") 29 » startRange = flag.Int("start_range", 1, "The number this worker will capture webpage archives from.")
22 » pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All .") 30 » num = flag.Int("num", 100, "The total number of archives to capture starting from the start_range.")
31 » pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBIL E_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.")
32 » chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How often all chrome processes will be killed on this slave.")
23 ) 33 )
24 34
25 func main() { 35 func main() {
26 defer common.LogPanic() 36 defer common.LogPanic()
27 worker_common.Init() 37 worker_common.Init()
28 defer util.TimeTrack(time.Now(), "Capturing Archives") 38 defer util.TimeTrack(time.Now(), "Capturing Archives")
29 defer glog.Flush() 39 defer glog.Flush()
30 40
31 // Create the task file so that the master knows this worker is still bu sy.
32 skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES))
33 defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)
34
35 // Reset the local chromium checkout. 41 // Reset the local chromium checkout.
36 if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { 42 if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil {
37 glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) 43 glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err)
38 return 44 return
39 } 45 }
40 // Sync the local chromium checkout. 46 // Sync the local chromium checkout.
41 if err := util.SyncDir(util.ChromiumSrcDir); err != nil { 47 if err := util.SyncDir(util.ChromiumSrcDir); err != nil {
42 glog.Errorf("Could not gclient sync %s: %s", util.ChromiumSrcDir , err) 48 glog.Errorf("Could not gclient sync %s: %s", util.ChromiumSrcDir , err)
43 return 49 return
44 } 50 }
45 51
46 // Delete and remake the local webpage archives directory. 52 // Delete and remake the local webpage archives directory.
47 pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) 53 pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType)
48 skutil.RemoveAll(pathToArchives) 54 skutil.RemoveAll(pathToArchives)
49 skutil.MkdirAll(pathToArchives, 0700) 55 skutil.MkdirAll(pathToArchives, 0700)
56 defer skutil.RemoveAll(pathToArchives)
50 57
51 // Instantiate GsUtil object. 58 // Instantiate GsUtil object.
52 gs, err := util.NewGsUtil(nil) 59 gs, err := util.NewGsUtil(nil)
53 if err != nil { 60 if err != nil {
54 glog.Error(err) 61 glog.Error(err)
55 return 62 return
56 } 63 }
57 64
58 » // Download pagesets if they do not exist locally. 65 » // Download pagesets.
59 » if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetTyp e, *workerNum); err != nil { 66 » pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType)
67 » pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, uti l.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num)
68 » if err != nil {
60 glog.Error(err) 69 glog.Error(err)
61 return 70 return
62 } 71 }
72 defer skutil.RemoveAll(pathToPagesets)
63 73
64 pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType)
65 recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_ RECORD_WPR) 74 recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_ RECORD_WPR)
66 timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeo utSecs 75 timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeo utSecs
67 // Loop through all pagesets. 76 // Loop through all pagesets.
68 fileInfos, err := ioutil.ReadDir(pathToPagesets) 77 fileInfos, err := ioutil.ReadDir(pathToPagesets)
69 if err != nil { 78 if err != nil {
70 glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPage sets, err) 79 glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPage sets, err)
71 return 80 return
72 } 81 }
73 glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos)
74 for _, fileInfo := range fileInfos {
75 pagesetBaseName := filepath.Base(fileInfo.Name())
76 if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(p agesetBaseName) == ".pyc" {
77 // Ignore timestamp files and .pyc files.
78 continue
79 }
80 82
81 » » // Read the pageset. 83 » // Create channel that contains all pageset file names. This channel wil l
82 » » pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name()) 84 » // be consumed by the worker pool.
83 » » decodedPageset, err := util.ReadPageset(pagesetPath) 85 » pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos)
84 » » if err != nil {
85 » » » glog.Errorf("Could not read %s: %s", pagesetPath, err)
86 » » » return
87 » » }
88 86
89 » » glog.Infof("===== Processing %s =====", pagesetPath) 87 » var wg sync.WaitGroup
90 » » args := []string{ 88 » // Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to
91 » » » util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, 89 » // the workers (acting as "readers") when it wants to be the "writer" an d
92 » » » "--extra-browser-args=--disable-setuid-sandbox", 90 » // kill all zombie chrome processes.
93 » » » "--browser=reference", 91 » var mutex sync.RWMutex
94 » » » "--user-agent=" + decodedPageset.UserAgent, 92
95 » » » "--urls-list=" + decodedPageset.UrlsList, 93 » // Loop through workers in the worker pool.
96 » » » "--archive-data-file=" + decodedPageset.ArchiveDataFile, 94 » for i := 0; i < WORKER_POOL_SIZE; i++ {
97 » » » "--device=desktop", 95 » » // Increment the WaitGroup counter.
98 » » } 96 » » wg.Add(1)
99 » » env := []string{ 97
100 » » » fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets) , 98 » » // Create and run a goroutine closure that captures SKPs.
101 » » » "DISPLAY=:0", 99 » » go func() {
102 » » } 100 » » » // Decrement the WaitGroup counter when the goroutine co mpletes.
103 » » skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.D uration(timeoutSecs)*time.Second, nil, nil)) 101 » » » defer wg.Done()
102
103 » » » for pagesetBaseName := range pagesetRequests {
104 » » » » if pagesetBaseName == util.TIMESTAMP_FILE_NAME | | filepath.Ext(pagesetBaseName) == ".pyc" {
105 » » » » » // Ignore timestamp files and .pyc files .
106 » » » » » continue
107 » » » » }
108
109 » » » » mutex.RLock()
110
111 » » » » // Read the pageset.
112 » » » » pagesetPath := filepath.Join(pathToPagesets, pag esetBaseName)
113 » » » » decodedPageset, err := util.ReadPageset(pagesetP ath)
114 » » » » if err != nil {
115 » » » » » glog.Errorf("Could not read %s: %s", pag esetPath, err)
116 » » » » » return
117 » » » » }
118
119 » » » » glog.Infof("===== Processing %s =====", pagesetP ath)
120 » » » » index := strconv.Itoa(pagesetsToIndex[path.Join( pathToPagesets, pagesetBaseName)])
121 » » » » archiveDataFile := addIndexInDataFileLocation(de codedPageset.ArchiveDataFile, index)
122 » » » » args := []string{
123 » » » » » util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMA RK,
124 » » » » » "--extra-browser-args=--disable-setuid-s andbox",
125 » » » » » "--browser=reference",
126 » » » » » "--user-agent=" + decodedPageset.UserAge nt,
127 » » » » » "--urls-list=" + decodedPageset.UrlsList ,
128 » » » » » "--archive-data-file=" + archiveDataFile ,
129 » » » » » "--device=desktop",
130 » » » » }
131 » » » » env := []string{
132 » » » » » fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets),
133 » » » » » "DISPLAY=:0",
134 » » » » }
135 » » » » // Retry record_wpr binary 3 times if there are any errors.
136 » » » » retryAttempts := 3
137 » » » » for i := 0; ; i++ {
138 » » » » » err = util.ExecuteCmd(recordWprBinary, a rgs, env, time.Duration(timeoutSecs)*time.Second, nil, nil)
139 » » » » » if err == nil {
140 » » » » » » break
141 » » » » » }
142 » » » » » if i >= (retryAttempts - 1) {
143 » » » » » » glog.Errorf("%s failed inspite o f 3 retries. Last error: %s", pagesetPath, err)
144 » » » » » » break
145 » » » » » }
146 » » » » » time.Sleep(time.Second)
147 » » » » » glog.Warningf("Retrying due to error: %s ", err)
148 » » » » }
149 » » » » mutex.RUnlock()
150 » » » }
151 » » }()
104 } 152 }
105 153
106 » // Write timestamp to the webpage archives dir. 154 » if !*worker_common.Local {
107 » skutil.LogErr(util.CreateTimestampFile(pathToArchives)) 155 » » // Start the cleaner.
156 » » go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer)
157 » }
108 158
109 » // Upload webpage archives dir to Google Storage. 159 » // Wait for all spawned goroutines to complete.
110 » if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetT ype, *workerNum); err != nil { 160 » wg.Wait()
161
162 » // Upload all webpage archives to Google Storage.
163 » if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagese tType); err != nil {
111 glog.Error(err) 164 glog.Error(err)
112 return 165 return
113 } 166 }
114 } 167 }
168
169 func addIndexInDataFileLocation(originalDataFile string, index string) string {
170 fileName := filepath.Base(originalDataFile)
171 fileDir := filepath.Dir(originalDataFile)
172 return path.Join(fileDir, index, fileName)
173 }
OLDNEW
« no previous file with comments | « ct/go/util/constants.go ('k') | ct/go/worker_scripts/worker_common/worker_common.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698