Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(328)

Side by Side Diff: ct/go/master_scripts/run_lua_on_workers/main.go

Issue 1991943002: Use swarming in run_lua CT task (Closed) Base URL: https://skia.googlesource.com/buildbot@ct-6-capture_skps
Patch Set: Use new method in util.go Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | ct/go/util/constants.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // run_lua_on_workers is an application that runs the specified lua script on al l 1 // run_lua_on_workers is an application that runs the specified lua script on al l
2 // CT workers and uploads the results to Google Storage. The requester is emaile d 2 // CT workers and uploads the results to Google Storage. The requester is emaile d
3 // when the task is done. 3 // when the task is done.
4 package main 4 package main
5 5
6 import ( 6 import (
7 "bytes"
8 "database/sql" 7 "database/sql"
9 "flag" 8 "flag"
10 "fmt" 9 "fmt"
11 "io" 10 "io"
12 "io/ioutil" 11 "io/ioutil"
13 "os" 12 "os"
14 "path/filepath" 13 "path/filepath"
15 » "strings" 14 » "strconv"
16 » "text/template"
17 "time" 15 "time"
18 16
19 "github.com/skia-dev/glog" 17 "github.com/skia-dev/glog"
20 "go.skia.org/infra/ct/go/ctfe/lua_scripts" 18 "go.skia.org/infra/ct/go/ctfe/lua_scripts"
21 "go.skia.org/infra/ct/go/frontend" 19 "go.skia.org/infra/ct/go/frontend"
22 "go.skia.org/infra/ct/go/master_scripts/master_common" 20 "go.skia.org/infra/ct/go/master_scripts/master_common"
23 "go.skia.org/infra/ct/go/util" 21 "go.skia.org/infra/ct/go/util"
24 "go.skia.org/infra/go/common" 22 "go.skia.org/infra/go/common"
25 skutil "go.skia.org/infra/go/util" 23 skutil "go.skia.org/infra/go/util"
26 ) 24 )
27 25
26 const (
27 MAX_PAGES_PER_SWARMING_BOT = 100
28 )
29
28 var ( 30 var (
29 emails = flag.String("emails", "", "The comma separated email add resses to notify when the task is picked up and completes.") 31 emails = flag.String("emails", "", "The comma separated email add resses to notify when the task is picked up and completes.")
30 description = flag.String("description", "", "The description of the r un as entered by the requester.") 32 description = flag.String("description", "", "The description of the r un as entered by the requester.")
31 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine task. This task will be updated when the task is completed.") 33 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine task. This task will be updated when the task is completed.")
32 pagesetType = flag.String("pageset_type", "", "The type of pagesets to use. Eg: 10k, Mobile10k, All.") 34 pagesetType = flag.String("pageset_type", "", "The type of pagesets to use. Eg: 10k, Mobile10k, All.")
33 chromiumBuild = flag.String("chromium_build", "", "The chromium build to use for this capture_archives run.") 35 chromiumBuild = flag.String("chromium_build", "", "The chromium build to use for this capture_archives run.")
34 runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).") 36 runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).")
35 37
36 taskCompletedSuccessfully = false 38 taskCompletedSuccessfully = false
37 luaOutputRemoteLink = "" 39 luaOutputRemoteLink = ""
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
92 emailsArr = append(emailsArr, util.CtAdmins...) 94 emailsArr = append(emailsArr, util.CtAdmins...)
93 if len(emailsArr) == 0 { 95 if len(emailsArr) == 0 {
94 glog.Error("At least one email address must be specified") 96 glog.Error("At least one email address must be specified")
95 return 97 return
96 } 98 }
97 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&lua_scripts.UpdateVar s{}, *gaeTaskID)) 99 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&lua_scripts.UpdateVar s{}, *gaeTaskID))
98 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Lua script", *runID, * description)) 100 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Lua script", *runID, * description))
99 // Ensure webapp is updated and email is sent even if task fails. 101 // Ensure webapp is updated and email is sent even if task fails.
100 defer updateWebappTask() 102 defer updateWebappTask()
101 defer sendEmail(emailsArr) 103 defer sendEmail(emailsArr)
102 // Cleanup tmp files after the run.
103 if !*master_common.Local {
104 defer util.CleanTmpDir()
105 }
106 // Finish with glog flush and how long the task took. 104 // Finish with glog flush and how long the task took.
107 defer util.TimeTrack(time.Now(), "Running Lua script on workers") 105 defer util.TimeTrack(time.Now(), "Running Lua script on workers")
108 defer glog.Flush() 106 defer glog.Flush()
109 107
110 if *pagesetType == "" { 108 if *pagesetType == "" {
111 glog.Error("Must specify --pageset_type") 109 glog.Error("Must specify --pageset_type")
112 return 110 return
113 } 111 }
114 if *chromiumBuild == "" { 112 if *chromiumBuild == "" {
115 glog.Error("Must specify --chromium_build") 113 glog.Error("Must specify --chromium_build")
(...skipping 13 matching lines...) Expand all
129 127
130 // Upload the lua script for this run to Google storage. 128 // Upload the lua script for this run to Google storage.
131 luaScriptName := *runID + ".lua" 129 luaScriptName := *runID + ".lua"
132 defer skutil.Remove(filepath.Join(os.TempDir(), luaScriptName)) 130 defer skutil.Remove(filepath.Join(os.TempDir(), luaScriptName))
133 luaScriptRemoteDir := filepath.Join(util.LuaRunsDir, *runID, "scripts") 131 luaScriptRemoteDir := filepath.Join(util.LuaRunsDir, *runID, "scripts")
134 if err := gs.UploadFile(luaScriptName, os.TempDir(), luaScriptRemoteDir) ; err != nil { 132 if err := gs.UploadFile(luaScriptName, os.TempDir(), luaScriptRemoteDir) ; err != nil {
135 glog.Errorf("Could not upload %s to %s: %s", luaScriptName, luaS criptRemoteDir, err) 133 glog.Errorf("Could not upload %s to %s: %s", luaScriptName, luaS criptRemoteDir, err)
136 return 134 return
137 } 135 }
138 136
139 » // Run the run_lua script on all workers. 137 » // Empty the remote dir before the workers upload to it.
140 » runLuaCmdTemplate := "DISPLAY=:0 run_lua --worker_num={{.WorkerNum}} --l og_dir={{.LogDir}} --log_id={{.RunID}} --pageset_type={{.PagesetType}} --chromiu m_build={{.ChromiumBuild}} --run_id={{.RunID}} --local={{.Local}};" 138 » gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, filepath.Join(util.Lu aRunsDir, *runID), *pagesetType)
dogben 2016/05/19 17:02:41 I would hope that runID is unique and therefore th
rmistry 2016/05/19 17:08:19 The other CLs IIRC did not use a runID here. But y
141 » runLuaTemplateParsed := template.Must(template.New("run_lua_cmd").Parse( runLuaCmdTemplate)) 139 » skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir))
142 » luaCmdBytes := new(bytes.Buffer) 140
143 » if err := runLuaTemplateParsed.Execute(luaCmdBytes, struct { 141 » // Archive, trigger and collect swarming tasks.
144 » » WorkerNum string 142 » isolateExtraArgs := map[string]string{
145 » » LogDir string 143 » » "CHROMIUM_BUILD": *chromiumBuild,
146 » » PagesetType string 144 » » "RUN_ID": *runID,
147 » » ChromiumBuild string
148 » » RunID string
149 » » Local bool
150 » }{
151 » » WorkerNum: util.WORKER_NUM_KEYWORD,
152 » » LogDir: util.GLogDir,
153 » » PagesetType: *pagesetType,
154 » » ChromiumBuild: *chromiumBuild,
155 » » RunID: *runID,
156 » » Local: *master_common.Local,
157 » }); err != nil {
158 » » glog.Errorf("Failed to execute template: %s", err)
159 » » return
160 } 145 }
161 » cmd := append(master_common.WorkerSetupCmds(), 146 » if err := util.TriggerSwarmingTask(*pagesetType, "run_lua", util.RUN_LUA _ISOLATE, 2*time.Hour, 1*time.Hour, MAX_PAGES_PER_SWARMING_BOT, isolateExtraArgs ); err != nil {
162 » » // The main command that runs run_lua on all workers. 147 » » glog.Errorf("Error encountered when swarming tasks: %s", err)
163 » » luaCmdBytes.String())
164 » _, err = util.SSH(strings.Join(cmd, " "), util.Slaves, util.RUN_LUA_TIME OUT)
165 » if err != nil {
166 » » glog.Errorf("Error while running cmd %s: %s", cmd, err)
167 return 148 return
168 } 149 }
169 150
170 // Copy outputs from all slaves locally and combine it into one file. 151 // Copy outputs from all slaves locally and combine it into one file.
171 consolidatedFileName := "lua-output" 152 consolidatedFileName := "lua-output"
172 consolidatedLuaOutput := filepath.Join(os.TempDir(), consolidatedFileNam e) 153 consolidatedLuaOutput := filepath.Join(os.TempDir(), consolidatedFileNam e)
154 // If the file already exists it could be that there is another lua task running on this machine.
dogben 2016/05/19 17:02:41 Please just use ioutil.TempDir instead of os.TempD
rmistry 2016/05/19 17:08:19 Unfortunately the file has to be in /tmp/lua-outpu
dogben 2016/05/19 17:15:38 Oh, that is unfortunate.
155 // Wait for the file to be deleted within a deadline.
156 if err := waitForOutputFile(consolidatedLuaOutput); err != nil {
157 glog.Error(err)
158 return
159 }
173 defer skutil.Remove(consolidatedLuaOutput) 160 defer skutil.Remove(consolidatedLuaOutput)
174 if err := ioutil.WriteFile(consolidatedLuaOutput, []byte{}, 0660); err ! = nil { 161 if err := ioutil.WriteFile(consolidatedLuaOutput, []byte{}, 0660); err ! = nil {
175 glog.Errorf("Could not create %s: %s", consolidatedLuaOutput, er r) 162 glog.Errorf("Could not create %s: %s", consolidatedLuaOutput, er r)
176 return 163 return
177 } 164 }
178 » for i := 0; i < util.NumWorkers(); i++ { 165 » for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES _PER_SWARMING_BOT; i++ {
179 » » workerNum := i + 1 166 » » startRange := strconv.Itoa(util.GetStartRange(i, MAX_PAGES_PER_S WARMING_BOT))
180 » » workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, fmt.Sprintf("slave%d", workerNum), "outputs", *runID+".output") 167 » » workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, startRange, "outputs", *runID+".output")
181 respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath ) 168 respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath )
182 if err != nil { 169 if err != nil {
183 glog.Errorf("Could not fetch %s: %s", workerRemoteOutput Path, err) 170 glog.Errorf("Could not fetch %s: %s", workerRemoteOutput Path, err)
184 continue 171 continue
185 } 172 }
186 defer skutil.Close(respBody) 173 defer skutil.Close(respBody)
187 out, err := os.OpenFile(consolidatedLuaOutput, os.O_RDWR|os.O_AP PEND, 0660) 174 out, err := os.OpenFile(consolidatedLuaOutput, os.O_RDWR|os.O_AP PEND, 0660)
188 if err != nil { 175 if err != nil {
189 glog.Errorf("Unable to open file %s: %s", consolidatedLu aOutput, err) 176 glog.Errorf("Unable to open file %s: %s", consolidatedLu aOutput, err)
190 return 177 return
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
234 if err := gs.UploadFile(luaAggregatorOutputFileName, os.TempDir( ), consolidatedOutputRemoteDir); err != nil { 221 if err := gs.UploadFile(luaAggregatorOutputFileName, os.TempDir( ), consolidatedOutputRemoteDir); err != nil {
235 glog.Errorf("Unable to upload %s to %s: %s", luaAggregat orOutputFileName, consolidatedOutputRemoteDir, err) 222 glog.Errorf("Unable to upload %s to %s: %s", luaAggregat orOutputFileName, consolidatedOutputRemoteDir, err)
236 return 223 return
237 } 224 }
238 } else { 225 } else {
239 glog.Info("A lua aggregator has not been specified.") 226 glog.Info("A lua aggregator has not been specified.")
240 } 227 }
241 228
242 taskCompletedSuccessfully = true 229 taskCompletedSuccessfully = true
243 } 230 }
231
232 func waitForOutputFile(luaOutput string) error {
233 // Check every 10 secs and timeout after 10 mins.
234 ticker := time.NewTicker(10 * time.Second)
235 deadline := 10 * time.Minute
236 deadlineTicker := time.NewTicker(deadline)
237 defer ticker.Stop()
238 defer deadlineTicker.Stop()
239 for {
240 select {
241 case <-ticker.C:
242 if _, err := os.Stat(luaOutput); os.IsNotExist(err) {
243 return nil
244 }
245 glog.Infof("%s still exists. Waiting for the other lua t ask to complete.", luaOutput)
246 case <-deadlineTicker.C:
247 return fmt.Errorf("%s still existed after %v secs", luaO utput, deadline.Seconds())
248 }
249 }
250 }
OLDNEW
« no previous file with comments | « no previous file | ct/go/util/constants.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698