OLD | NEW |
1 // Application that captures webpage archives on a CT worker and uploads it to | 1 // Application that captures webpage archives on a CT worker and uploads it to |
2 // Google Storage. | 2 // Google Storage. |
3 package main | 3 package main |
4 | 4 |
5 import ( | 5 import ( |
6 "flag" | 6 "flag" |
7 "fmt" | 7 "fmt" |
8 "io/ioutil" | 8 "io/ioutil" |
| 9 "path" |
9 "path/filepath" | 10 "path/filepath" |
| 11 "strconv" |
| 12 "sync" |
10 "time" | 13 "time" |
11 | 14 |
12 "github.com/skia-dev/glog" | 15 "github.com/skia-dev/glog" |
13 | 16 |
14 "go.skia.org/infra/ct/go/util" | 17 "go.skia.org/infra/ct/go/util" |
15 "go.skia.org/infra/ct/go/worker_scripts/worker_common" | 18 "go.skia.org/infra/ct/go/worker_scripts/worker_common" |
16 "go.skia.org/infra/go/common" | 19 "go.skia.org/infra/go/common" |
17 skutil "go.skia.org/infra/go/util" | 20 skutil "go.skia.org/infra/go/util" |
18 ) | 21 ) |
19 | 22 |
| 23 const ( |
| 24 // The number of goroutines that will run in parallel to capture archive
s. |
| 25 WORKER_POOL_SIZE = 1 |
| 26 ) |
| 27 |
20 var ( | 28 var ( |
21 » workerNum = flag.Int("worker_num", 1, "The number of this CT worker. I
t will be in the {1..100} range.") | 29 » startRange = flag.Int("start_range", 1, "The number this worker
will capture webpage archives from.") |
22 » pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k,
"The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All
.") | 30 » num = flag.Int("num", 100, "The total number of archives
to capture starting from the start_range.") |
| 31 » pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBIL
E_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.") |
| 32 » chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How
often all chrome processes will be killed on this slave.") |
23 ) | 33 ) |
24 | 34 |
25 func main() { | 35 func main() { |
26 defer common.LogPanic() | 36 defer common.LogPanic() |
27 worker_common.Init() | 37 worker_common.Init() |
28 defer util.TimeTrack(time.Now(), "Capturing Archives") | 38 defer util.TimeTrack(time.Now(), "Capturing Archives") |
29 defer glog.Flush() | 39 defer glog.Flush() |
30 | 40 |
31 // Create the task file so that the master knows this worker is still bu
sy. | |
32 skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)) | |
33 defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES) | |
34 | |
35 // Reset the local chromium checkout. | 41 // Reset the local chromium checkout. |
36 if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { | 42 if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { |
37 glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) | 43 glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) |
38 return | 44 return |
39 } | 45 } |
40 // Sync the local chromium checkout. | 46 // Sync the local chromium checkout. |
41 if err := util.SyncDir(util.ChromiumSrcDir); err != nil { | 47 if err := util.SyncDir(util.ChromiumSrcDir); err != nil { |
42 glog.Errorf("Could not gclient sync %s: %s", util.ChromiumSrcDir
, err) | 48 glog.Errorf("Could not gclient sync %s: %s", util.ChromiumSrcDir
, err) |
43 return | 49 return |
44 } | 50 } |
45 | 51 |
46 // Delete and remake the local webpage archives directory. | 52 // Delete and remake the local webpage archives directory. |
47 pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) | 53 pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) |
48 skutil.RemoveAll(pathToArchives) | 54 skutil.RemoveAll(pathToArchives) |
49 skutil.MkdirAll(pathToArchives, 0700) | 55 skutil.MkdirAll(pathToArchives, 0700) |
| 56 defer skutil.RemoveAll(pathToArchives) |
50 | 57 |
51 // Instantiate GsUtil object. | 58 // Instantiate GsUtil object. |
52 gs, err := util.NewGsUtil(nil) | 59 gs, err := util.NewGsUtil(nil) |
53 if err != nil { | 60 if err != nil { |
54 glog.Error(err) | 61 glog.Error(err) |
55 return | 62 return |
56 } | 63 } |
57 | 64 |
58 » // Download pagesets if they do not exist locally. | 65 » // Download pagesets. |
59 » if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetTyp
e, *workerNum); err != nil { | 66 » pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) |
| 67 » pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, uti
l.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num) |
| 68 » if err != nil { |
60 glog.Error(err) | 69 glog.Error(err) |
61 return | 70 return |
62 } | 71 } |
| 72 defer skutil.RemoveAll(pathToPagesets) |
63 | 73 |
64 pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) | |
65 recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_
RECORD_WPR) | 74 recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_
RECORD_WPR) |
66 timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeo
utSecs | 75 timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeo
utSecs |
67 // Loop through all pagesets. | 76 // Loop through all pagesets. |
68 fileInfos, err := ioutil.ReadDir(pathToPagesets) | 77 fileInfos, err := ioutil.ReadDir(pathToPagesets) |
69 if err != nil { | 78 if err != nil { |
70 glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPage
sets, err) | 79 glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPage
sets, err) |
71 return | 80 return |
72 } | 81 } |
73 glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos) | |
74 for _, fileInfo := range fileInfos { | |
75 pagesetBaseName := filepath.Base(fileInfo.Name()) | |
76 if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(p
agesetBaseName) == ".pyc" { | |
77 // Ignore timestamp files and .pyc files. | |
78 continue | |
79 } | |
80 | 82 |
81 » » // Read the pageset. | 83 » // Create channel that contains all pageset file names. This channel wil
l |
82 » » pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name()) | 84 » // be consumed by the worker pool. |
83 » » decodedPageset, err := util.ReadPageset(pagesetPath) | 85 » pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos) |
84 » » if err != nil { | |
85 » » » glog.Errorf("Could not read %s: %s", pagesetPath, err) | |
86 » » » return | |
87 » » } | |
88 | 86 |
89 » » glog.Infof("===== Processing %s =====", pagesetPath) | 87 » var wg sync.WaitGroup |
90 » » args := []string{ | 88 » // Use a RWMutex for the chromeProcessesCleaner goroutine to communicate
to |
91 » » » util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, | 89 » // the workers (acting as "readers") when it wants to be the "writer" an
d |
92 » » » "--extra-browser-args=--disable-setuid-sandbox", | 90 » // kill all zombie chrome processes. |
93 » » » "--browser=reference", | 91 » var mutex sync.RWMutex |
94 » » » "--user-agent=" + decodedPageset.UserAgent, | 92 |
95 » » » "--urls-list=" + decodedPageset.UrlsList, | 93 » // Loop through workers in the worker pool. |
96 » » » "--archive-data-file=" + decodedPageset.ArchiveDataFile, | 94 » for i := 0; i < WORKER_POOL_SIZE; i++ { |
97 » » » "--device=desktop", | 95 » » // Increment the WaitGroup counter. |
98 » » } | 96 » » wg.Add(1) |
99 » » env := []string{ | 97 |
100 » » » fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets)
, | 98 » » // Create and run a goroutine closure that captures SKPs. |
101 » » » "DISPLAY=:0", | 99 » » go func() { |
102 » » } | 100 » » » // Decrement the WaitGroup counter when the goroutine co
mpletes. |
103 » » skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.D
uration(timeoutSecs)*time.Second, nil, nil)) | 101 » » » defer wg.Done() |
| 102 |
| 103 » » » for pagesetBaseName := range pagesetRequests { |
| 104 » » » » if pagesetBaseName == util.TIMESTAMP_FILE_NAME |
| filepath.Ext(pagesetBaseName) == ".pyc" { |
| 105 » » » » » // Ignore timestamp files and .pyc files
. |
| 106 » » » » » continue |
| 107 » » » » } |
| 108 |
| 109 » » » » mutex.RLock() |
| 110 |
| 111 » » » » // Read the pageset. |
| 112 » » » » pagesetPath := filepath.Join(pathToPagesets, pag
esetBaseName) |
| 113 » » » » decodedPageset, err := util.ReadPageset(pagesetP
ath) |
| 114 » » » » if err != nil { |
| 115 » » » » » glog.Errorf("Could not read %s: %s", pag
esetPath, err) |
| 116 » » » » » return |
| 117 » » » » } |
| 118 |
| 119 » » » » glog.Infof("===== Processing %s =====", pagesetP
ath) |
| 120 » » » » index := strconv.Itoa(pagesetsToIndex[path.Join(
pathToPagesets, pagesetBaseName)]) |
| 121 » » » » archiveDataFile := addIndexInDataFileLocation(de
codedPageset.ArchiveDataFile, index) |
| 122 » » » » args := []string{ |
| 123 » » » » » util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMA
RK, |
| 124 » » » » » "--extra-browser-args=--disable-setuid-s
andbox", |
| 125 » » » » » "--browser=reference", |
| 126 » » » » » "--user-agent=" + decodedPageset.UserAge
nt, |
| 127 » » » » » "--urls-list=" + decodedPageset.UrlsList
, |
| 128 » » » » » "--archive-data-file=" + archiveDataFile
, |
| 129 » » » » » "--device=desktop", |
| 130 » » » » } |
| 131 » » » » env := []string{ |
| 132 » » » » » fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH",
pathToPagesets), |
| 133 » » » » » "DISPLAY=:0", |
| 134 » » » » } |
| 135 » » » » // Retry record_wpr binary 3 times if there are
any errors. |
| 136 » » » » retryAttempts := 3 |
| 137 » » » » for i := 0; ; i++ { |
| 138 » » » » » err = util.ExecuteCmd(recordWprBinary, a
rgs, env, time.Duration(timeoutSecs)*time.Second, nil, nil) |
| 139 » » » » » if err == nil { |
| 140 » » » » » » break |
| 141 » » » » » } |
| 142 » » » » » if i >= (retryAttempts - 1) { |
| 143 » » » » » » glog.Errorf("%s failed inspite o
f 3 retries. Last error: %s", pagesetPath, err) |
| 144 » » » » » » break |
| 145 » » » » » } |
| 146 » » » » » time.Sleep(time.Second) |
| 147 » » » » » glog.Warningf("Retrying due to error: %s
", err) |
| 148 » » » » } |
| 149 » » » » mutex.RUnlock() |
| 150 » » » } |
| 151 » » }() |
104 } | 152 } |
105 | 153 |
106 » // Write timestamp to the webpage archives dir. | 154 » if !*worker_common.Local { |
107 » skutil.LogErr(util.CreateTimestampFile(pathToArchives)) | 155 » » // Start the cleaner. |
| 156 » » go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer) |
| 157 » } |
108 | 158 |
109 » // Upload webpage archives dir to Google Storage. | 159 » // Wait for all spawned goroutines to complete. |
110 » if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetT
ype, *workerNum); err != nil { | 160 » wg.Wait() |
| 161 |
| 162 » // Upload all webpage archives to Google Storage. |
| 163 » if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagese
tType); err != nil { |
111 glog.Error(err) | 164 glog.Error(err) |
112 return | 165 return |
113 } | 166 } |
114 } | 167 } |
| 168 |
| 169 func addIndexInDataFileLocation(originalDataFile string, index string) string { |
| 170 fileName := filepath.Base(originalDataFile) |
| 171 fileDir := filepath.Dir(originalDataFile) |
| 172 return path.Join(fileDir, index, fileName) |
| 173 } |
OLD | NEW |