Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Application that captures webpage archives on a CT worker and uploads it to | 1 // Application that captures webpage archives on a CT worker and uploads it to |
| 2 // Google Storage. | 2 // Google Storage. |
| 3 package main | 3 package main |
| 4 | 4 |
| 5 import ( | 5 import ( |
| 6 "flag" | 6 "flag" |
| 7 "fmt" | 7 "fmt" |
| 8 "io/ioutil" | 8 "io/ioutil" |
| 9 "path" | |
| 9 "path/filepath" | 10 "path/filepath" |
| 11 "strconv" | |
| 12 "sync" | |
| 10 "time" | 13 "time" |
| 11 | 14 |
| 12 "github.com/skia-dev/glog" | 15 "github.com/skia-dev/glog" |
| 13 | 16 |
| 14 "go.skia.org/infra/ct/go/util" | 17 "go.skia.org/infra/ct/go/util" |
| 15 "go.skia.org/infra/ct/go/worker_scripts/worker_common" | 18 "go.skia.org/infra/ct/go/worker_scripts/worker_common" |
| 16 "go.skia.org/infra/go/common" | 19 "go.skia.org/infra/go/common" |
| 17 skutil "go.skia.org/infra/go/util" | 20 skutil "go.skia.org/infra/go/util" |
| 18 ) | 21 ) |
| 19 | 22 |
| 23 const ( | |
| 24 // The number of goroutines that will run in parallel to capture archive s. | |
| 25 WORKER_POOL_SIZE = 1 | |
|
rmistry
2016/05/18 16:14:01
Will continue to play with this number to find the
| |
| 26 ) | |
| 27 | |
| 20 var ( | 28 var ( |
| 21 » workerNum = flag.Int("worker_num", 1, "The number of this CT worker. I t will be in the {1..100} range.") | 29 » startRange = flag.Int("start_range", 1, "The number this worker will capture webpage archives from.") |
| 22 » pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBILE_10k, "The type of pagesets to create from the Alexa CSV list. Eg: 10k, Mobile10k, All .") | 30 » num = flag.Int("num", 100, "The total number of archives to capture starting from the start_range.") |
| 31 » pagesetType = flag.String("pageset_type", util.PAGESET_TYPE_MOBIL E_10k, "The pagesets to use to capture archives. Eg: 10k, Mobile10k, All.") | |
| 32 » chromeCleanerTimer = flag.Duration("cleaner_timer", 30*time.Minute, "How often all chrome processes will be killed on this slave.") | |
| 23 ) | 33 ) |
| 24 | 34 |
| 25 func main() { | 35 func main() { |
| 26 defer common.LogPanic() | 36 defer common.LogPanic() |
| 27 worker_common.Init() | 37 worker_common.Init() |
| 28 defer util.TimeTrack(time.Now(), "Capturing Archives") | 38 defer util.TimeTrack(time.Now(), "Capturing Archives") |
| 29 defer glog.Flush() | 39 defer glog.Flush() |
| 30 | 40 |
| 31 // Create the task file so that the master knows this worker is still bu sy. | |
| 32 skutil.LogErr(util.CreateTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES)) | |
| 33 defer util.DeleteTaskFile(util.ACTIVITY_CAPTURING_ARCHIVES) | |
| 34 | |
| 35 // Reset the local chromium checkout. | 41 // Reset the local chromium checkout. |
| 36 if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { | 42 if err := util.ResetCheckout(util.ChromiumSrcDir); err != nil { |
| 37 glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) | 43 glog.Errorf("Could not reset %s: %s", util.ChromiumSrcDir, err) |
| 38 return | 44 return |
| 39 } | 45 } |
| 40 // Sync the local chromium checkout. | 46 // Sync the local chromium checkout. |
| 41 if err := util.SyncDir(util.ChromiumSrcDir); err != nil { | 47 if err := util.SyncDir(util.ChromiumSrcDir); err != nil { |
| 42 glog.Errorf("Could not gclient sync %s: %s", util.ChromiumSrcDir , err) | 48 glog.Errorf("Could not gclient sync %s: %s", util.ChromiumSrcDir , err) |
| 43 return | 49 return |
| 44 } | 50 } |
| 45 | 51 |
| 46 // Delete and remake the local webpage archives directory. | 52 // Delete and remake the local webpage archives directory. |
| 47 pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) | 53 pathToArchives := filepath.Join(util.WebArchivesDir, *pagesetType) |
| 48 skutil.RemoveAll(pathToArchives) | 54 skutil.RemoveAll(pathToArchives) |
| 49 skutil.MkdirAll(pathToArchives, 0700) | 55 skutil.MkdirAll(pathToArchives, 0700) |
| 56 defer skutil.RemoveAll(pathToArchives) | |
| 50 | 57 |
| 51 // Instantiate GsUtil object. | 58 // Instantiate GsUtil object. |
| 52 gs, err := util.NewGsUtil(nil) | 59 gs, err := util.NewGsUtil(nil) |
| 53 if err != nil { | 60 if err != nil { |
| 54 glog.Error(err) | 61 glog.Error(err) |
| 55 return | 62 return |
| 56 } | 63 } |
| 57 | 64 |
| 58 » // Download pagesets if they do not exist locally. | 65 » // Download pagesets. |
| 59 » if err := gs.DownloadWorkerArtifacts(util.PAGESETS_DIR_NAME, *pagesetTyp e, *workerNum); err != nil { | 66 » pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) |
| 67 » pagesetsToIndex, err := gs.DownloadSwarmingArtifacts(pathToPagesets, uti l.PAGESETS_DIR_NAME, *pagesetType, *startRange, *num) | |
| 68 » if err != nil { | |
| 60 glog.Error(err) | 69 glog.Error(err) |
| 61 return | 70 return |
| 62 } | 71 } |
| 72 defer skutil.RemoveAll(pathToPagesets) | |
| 63 | 73 |
| 64 pathToPagesets := filepath.Join(util.PagesetsDir, *pagesetType) | |
| 65 recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_ RECORD_WPR) | 74 recordWprBinary := filepath.Join(util.TelemetryBinariesDir, util.BINARY_ RECORD_WPR) |
| 66 timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeo utSecs | 75 timeoutSecs := util.PagesetTypeToInfo[*pagesetType].CaptureArchivesTimeo utSecs |
| 67 // Loop through all pagesets. | 76 // Loop through all pagesets. |
| 68 fileInfos, err := ioutil.ReadDir(pathToPagesets) | 77 fileInfos, err := ioutil.ReadDir(pathToPagesets) |
| 69 if err != nil { | 78 if err != nil { |
| 70 glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPage sets, err) | 79 glog.Errorf("Unable to read the pagesets dir %s: %s", pathToPage sets, err) |
| 71 return | 80 return |
| 72 } | 81 } |
| 73 glog.Infof("The %s fileInfos are: %s", len(fileInfos), fileInfos) | |
| 74 for _, fileInfo := range fileInfos { | |
| 75 pagesetBaseName := filepath.Base(fileInfo.Name()) | |
| 76 if pagesetBaseName == util.TIMESTAMP_FILE_NAME || filepath.Ext(p agesetBaseName) == ".pyc" { | |
| 77 // Ignore timestamp files and .pyc files. | |
| 78 continue | |
| 79 } | |
| 80 | 82 |
| 81 » » // Read the pageset. | 83 » // Create channel that contains all pageset file names. This channel wil l |
| 82 » » pagesetPath := filepath.Join(pathToPagesets, fileInfo.Name()) | 84 » // be consumed by the worker pool. |
| 83 » » decodedPageset, err := util.ReadPageset(pagesetPath) | 85 » pagesetRequests := util.GetClosedChannelOfPagesets(fileInfos) |
| 84 » » if err != nil { | |
| 85 » » » glog.Errorf("Could not read %s: %s", pagesetPath, err) | |
| 86 » » » return | |
| 87 » » } | |
| 88 | 86 |
| 89 » » glog.Infof("===== Processing %s =====", pagesetPath) | 87 » var wg sync.WaitGroup |
| 90 » » args := []string{ | 88 » // Use a RWMutex for the chromeProcessesCleaner goroutine to communicate to |
| 91 » » » util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMARK, | 89 » // the workers (acting as "readers") when it wants to be the "writer" an d |
| 92 » » » "--extra-browser-args=--disable-setuid-sandbox", | 90 » // kill all zombie chrome processes. |
| 93 » » » "--browser=reference", | 91 » var mutex sync.RWMutex |
| 94 » » » "--user-agent=" + decodedPageset.UserAgent, | 92 |
| 95 » » » "--urls-list=" + decodedPageset.UrlsList, | 93 » // Loop through workers in the worker pool. |
| 96 » » » "--archive-data-file=" + decodedPageset.ArchiveDataFile, | 94 » for i := 0; i < WORKER_POOL_SIZE; i++ { |
| 97 » » » "--device=desktop", | 95 » » // Increment the WaitGroup counter. |
| 98 » » } | 96 » » wg.Add(1) |
| 99 » » env := []string{ | 97 |
| 100 » » » fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets) , | 98 » » // Create and run a goroutine closure that captures SKPs. |
| 101 » » » "DISPLAY=:0", | 99 » » go func() { |
| 102 » » } | 100 » » » // Decrement the WaitGroup counter when the goroutine co mpletes. |
| 103 » » skutil.LogErr(util.ExecuteCmd(recordWprBinary, args, env, time.D uration(timeoutSecs)*time.Second, nil, nil)) | 101 » » » defer wg.Done() |
| 102 | |
| 103 » » » for pagesetBaseName := range pagesetRequests { | |
| 104 » » » » if pagesetBaseName == util.TIMESTAMP_FILE_NAME | | filepath.Ext(pagesetBaseName) == ".pyc" { | |
| 105 » » » » » // Ignore timestamp files and .pyc files . | |
| 106 » » » » » continue | |
| 107 » » » » } | |
| 108 | |
| 109 » » » » mutex.RLock() | |
| 110 | |
| 111 » » » » // Read the pageset. | |
| 112 » » » » pagesetPath := filepath.Join(pathToPagesets, pag esetBaseName) | |
| 113 » » » » decodedPageset, err := util.ReadPageset(pagesetP ath) | |
| 114 » » » » if err != nil { | |
| 115 » » » » » glog.Errorf("Could not read %s: %s", pag esetPath, err) | |
| 116 » » » » » return | |
| 117 » » » » } | |
| 118 | |
| 119 » » » » glog.Infof("===== Processing %s =====", pagesetP ath) | |
| 120 » » » » index := strconv.Itoa(pagesetsToIndex[path.Join( pathToPagesets, pagesetBaseName)]) | |
| 121 » » » » archiveDataFile := addIndexInDataFileLocation(de codedPageset.ArchiveDataFile, index) | |
| 122 » » » » args := []string{ | |
| 123 » » » » » util.CAPTURE_ARCHIVES_DEFAULT_CT_BENCHMA RK, | |
| 124 » » » » » "--extra-browser-args=--disable-setuid-s andbox", | |
| 125 » » » » » "--browser=reference", | |
| 126 » » » » » "--user-agent=" + decodedPageset.UserAge nt, | |
| 127 » » » » » "--urls-list=" + decodedPageset.UrlsList , | |
| 128 » » » » » "--archive-data-file=" + archiveDataFile , | |
| 129 » » » » » "--device=desktop", | |
| 130 » » » » } | |
| 131 » » » » env := []string{ | |
| 132 » » » » » fmt.Sprintf("PYTHONPATH=%s:$PYTHONPATH", pathToPagesets), | |
| 133 » » » » » "DISPLAY=:0", | |
| 134 » » » » } | |
| 135 » » » » skutil.LogErr(util.ExecuteCmd(recordWprBinary, a rgs, env, time.Duration(timeoutSecs)*time.Second, nil, nil)) | |
| 136 » » » » mutex.RUnlock() | |
| 137 » » » } | |
| 138 » » }() | |
| 104 } | 139 } |
| 105 | 140 |
| 106 » // Write timestamp to the webpage archives dir. | 141 » if !*worker_common.Local { |
| 107 » skutil.LogErr(util.CreateTimestampFile(pathToArchives)) | 142 » » // Start the cleaner. |
| 143 » » go util.ChromeProcessesCleaner(&mutex, *chromeCleanerTimer) | |
| 144 » } | |
| 108 | 145 |
| 109 » // Upload webpage archives dir to Google Storage. | 146 » // Wait for all spawned goroutines to complete. |
| 110 » if err := gs.UploadWorkerArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagesetT ype, *workerNum); err != nil { | 147 » wg.Wait() |
| 148 | |
| 149 » // Upload all webpage archives to Google Storage. | |
| 150 » if err := gs.UploadSwarmingArtifacts(util.WEB_ARCHIVES_DIR_NAME, *pagese tType); err != nil { | |
| 111 glog.Error(err) | 151 glog.Error(err) |
| 112 return | 152 return |
| 113 } | 153 } |
| 114 } | 154 } |
| 155 | |
| 156 func addIndexInDataFileLocation(originalDataFile string, index string) string { | |
| 157 fileName := filepath.Base(originalDataFile) | |
| 158 fileDir := filepath.Dir(originalDataFile) | |
| 159 return path.Join(fileDir, index, fileName) | |
| 160 } | |
| OLD | NEW |