OLD | NEW |
---|---|
1 // run_lua_on_workers is an application that runs the specified lua script on al l | 1 // run_lua_on_workers is an application that runs the specified lua script on al l |
2 // CT workers and uploads the results to Google Storage. The requester is emaile d | 2 // CT workers and uploads the results to Google Storage. The requester is emaile d |
3 // when the task is done. | 3 // when the task is done. |
4 package main | 4 package main |
5 | 5 |
6 import ( | 6 import ( |
7 "bytes" | |
8 "database/sql" | 7 "database/sql" |
9 "flag" | 8 "flag" |
10 "fmt" | 9 "fmt" |
11 "io" | 10 "io" |
12 "io/ioutil" | 11 "io/ioutil" |
13 "os" | 12 "os" |
14 "path/filepath" | 13 "path/filepath" |
15 » "strings" | 14 » "strconv" |
16 » "text/template" | |
17 "time" | 15 "time" |
18 | 16 |
19 "github.com/skia-dev/glog" | 17 "github.com/skia-dev/glog" |
20 "go.skia.org/infra/ct/go/ctfe/lua_scripts" | 18 "go.skia.org/infra/ct/go/ctfe/lua_scripts" |
21 "go.skia.org/infra/ct/go/frontend" | 19 "go.skia.org/infra/ct/go/frontend" |
22 "go.skia.org/infra/ct/go/master_scripts/master_common" | 20 "go.skia.org/infra/ct/go/master_scripts/master_common" |
23 "go.skia.org/infra/ct/go/util" | 21 "go.skia.org/infra/ct/go/util" |
24 "go.skia.org/infra/go/common" | 22 "go.skia.org/infra/go/common" |
25 skutil "go.skia.org/infra/go/util" | 23 skutil "go.skia.org/infra/go/util" |
26 ) | 24 ) |
27 | 25 |
26 const ( | |
27 MAX_PAGES_PER_SWARMING_BOT = 100 | |
28 ) | |
29 | |
28 var ( | 30 var ( |
29 emails = flag.String("emails", "", "The comma separated email add resses to notify when the task is picked up and completes.") | 31 emails = flag.String("emails", "", "The comma separated email add resses to notify when the task is picked up and completes.") |
30 description = flag.String("description", "", "The description of the r un as entered by the requester.") | 32 description = flag.String("description", "", "The description of the r un as entered by the requester.") |
31 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine task. This task will be updated when the task is completed.") | 33 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine task. This task will be updated when the task is completed.") |
32 pagesetType = flag.String("pageset_type", "", "The type of pagesets to use. Eg: 10k, Mobile10k, All.") | 34 pagesetType = flag.String("pageset_type", "", "The type of pagesets to use. Eg: 10k, Mobile10k, All.") |
33 chromiumBuild = flag.String("chromium_build", "", "The chromium build to use for this capture_archives run.") | 35 chromiumBuild = flag.String("chromium_build", "", "The chromium build to use for this capture_archives run.") |
34 runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).") | 36 runID = flag.String("run_id", "", "The unique run id (typically requester + timestamp).") |
35 | 37 |
36 taskCompletedSuccessfully = false | 38 taskCompletedSuccessfully = false |
37 luaOutputRemoteLink = "" | 39 luaOutputRemoteLink = "" |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
92 emailsArr = append(emailsArr, util.CtAdmins...) | 94 emailsArr = append(emailsArr, util.CtAdmins...) |
93 if len(emailsArr) == 0 { | 95 if len(emailsArr) == 0 { |
94 glog.Error("At least one email address must be specified") | 96 glog.Error("At least one email address must be specified") |
95 return | 97 return |
96 } | 98 } |
97 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&lua_scripts.UpdateVar s{}, *gaeTaskID)) | 99 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&lua_scripts.UpdateVar s{}, *gaeTaskID)) |
98 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Lua script", *runID, * description)) | 100 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Lua script", *runID, * description)) |
99 // Ensure webapp is updated and email is sent even if task fails. | 101 // Ensure webapp is updated and email is sent even if task fails. |
100 defer updateWebappTask() | 102 defer updateWebappTask() |
101 defer sendEmail(emailsArr) | 103 defer sendEmail(emailsArr) |
102 // Cleanup tmp files after the run. | |
103 if !*master_common.Local { | |
104 defer util.CleanTmpDir() | |
105 } | |
106 // Finish with glog flush and how long the task took. | 104 // Finish with glog flush and how long the task took. |
107 defer util.TimeTrack(time.Now(), "Running Lua script on workers") | 105 defer util.TimeTrack(time.Now(), "Running Lua script on workers") |
108 defer glog.Flush() | 106 defer glog.Flush() |
109 | 107 |
110 if *pagesetType == "" { | 108 if *pagesetType == "" { |
111 glog.Error("Must specify --pageset_type") | 109 glog.Error("Must specify --pageset_type") |
112 return | 110 return |
113 } | 111 } |
114 if *chromiumBuild == "" { | 112 if *chromiumBuild == "" { |
115 glog.Error("Must specify --chromium_build") | 113 glog.Error("Must specify --chromium_build") |
(...skipping 13 matching lines...) Expand all Loading... | |
129 | 127 |
130 // Upload the lua script for this run to Google storage. | 128 // Upload the lua script for this run to Google storage. |
131 luaScriptName := *runID + ".lua" | 129 luaScriptName := *runID + ".lua" |
132 defer skutil.Remove(filepath.Join(os.TempDir(), luaScriptName)) | 130 defer skutil.Remove(filepath.Join(os.TempDir(), luaScriptName)) |
133 luaScriptRemoteDir := filepath.Join(util.LuaRunsDir, *runID, "scripts") | 131 luaScriptRemoteDir := filepath.Join(util.LuaRunsDir, *runID, "scripts") |
134 if err := gs.UploadFile(luaScriptName, os.TempDir(), luaScriptRemoteDir) ; err != nil { | 132 if err := gs.UploadFile(luaScriptName, os.TempDir(), luaScriptRemoteDir) ; err != nil { |
135 glog.Errorf("Could not upload %s to %s: %s", luaScriptName, luaS criptRemoteDir, err) | 133 glog.Errorf("Could not upload %s to %s: %s", luaScriptName, luaS criptRemoteDir, err) |
136 return | 134 return |
137 } | 135 } |
138 | 136 |
139 » // Run the run_lua script on all workers. | 137 » // Empty the remote dir before the workers upload to it. |
140 » runLuaCmdTemplate := "DISPLAY=:0 run_lua --worker_num={{.WorkerNum}} --l og_dir={{.LogDir}} --log_id={{.RunID}} --pageset_type={{.PagesetType}} --chromiu m_build={{.ChromiumBuild}} --run_id={{.RunID}} --local={{.Local}};" | 138 » gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, filepath.Join(util.Lu aRunsDir, *runID), *pagesetType) |
dogben
2016/05/19 17:02:41
I would hope that runID is unique and therefore th
rmistry
2016/05/19 17:08:19
The other CLs IIRC did not use a runID here.
But y
| |
141 » runLuaTemplateParsed := template.Must(template.New("run_lua_cmd").Parse( runLuaCmdTemplate)) | 139 » skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) |
142 » luaCmdBytes := new(bytes.Buffer) | 140 |
143 » if err := runLuaTemplateParsed.Execute(luaCmdBytes, struct { | 141 » // Archive, trigger and collect swarming tasks. |
144 » » WorkerNum string | 142 » isolateExtraArgs := map[string]string{ |
145 » » LogDir string | 143 » » "CHROMIUM_BUILD": *chromiumBuild, |
146 » » PagesetType string | 144 » » "RUN_ID": *runID, |
147 » » ChromiumBuild string | |
148 » » RunID string | |
149 » » Local bool | |
150 » }{ | |
151 » » WorkerNum: util.WORKER_NUM_KEYWORD, | |
152 » » LogDir: util.GLogDir, | |
153 » » PagesetType: *pagesetType, | |
154 » » ChromiumBuild: *chromiumBuild, | |
155 » » RunID: *runID, | |
156 » » Local: *master_common.Local, | |
157 » }); err != nil { | |
158 » » glog.Errorf("Failed to execute template: %s", err) | |
159 » » return | |
160 } | 145 } |
161 » cmd := append(master_common.WorkerSetupCmds(), | 146 » if err := util.TriggerSwarmingTask(*pagesetType, "run_lua", util.RUN_LUA _ISOLATE, 2*time.Hour, 1*time.Hour, MAX_PAGES_PER_SWARMING_BOT, isolateExtraArgs ); err != nil { |
162 » » // The main command that runs run_lua on all workers. | 147 » » glog.Errorf("Error encountered when swarming tasks: %s", err) |
163 » » luaCmdBytes.String()) | |
164 » _, err = util.SSH(strings.Join(cmd, " "), util.Slaves, util.RUN_LUA_TIME OUT) | |
165 » if err != nil { | |
166 » » glog.Errorf("Error while running cmd %s: %s", cmd, err) | |
167 return | 148 return |
168 } | 149 } |
169 | 150 |
170 // Copy outputs from all slaves locally and combine it into one file. | 151 // Copy outputs from all slaves locally and combine it into one file. |
171 consolidatedFileName := "lua-output" | 152 consolidatedFileName := "lua-output" |
172 consolidatedLuaOutput := filepath.Join(os.TempDir(), consolidatedFileNam e) | 153 consolidatedLuaOutput := filepath.Join(os.TempDir(), consolidatedFileNam e) |
154 // If the file already exists it could be that there is another lua task running on this machine. | |
dogben
2016/05/19 17:02:41
Please just use ioutil.TempDir instead of os.TempD
rmistry
2016/05/19 17:08:19
Unfortunately the file has to be in /tmp/lua-outpu
dogben
2016/05/19 17:15:38
Oh, that is unfortunate.
| |
155 // Wait for the file to be deleted within a deadline. | |
156 if err := waitForOutputFile(consolidatedLuaOutput); err != nil { | |
157 glog.Error(err) | |
158 return | |
159 } | |
173 defer skutil.Remove(consolidatedLuaOutput) | 160 defer skutil.Remove(consolidatedLuaOutput) |
174 if err := ioutil.WriteFile(consolidatedLuaOutput, []byte{}, 0660); err ! = nil { | 161 if err := ioutil.WriteFile(consolidatedLuaOutput, []byte{}, 0660); err ! = nil { |
175 glog.Errorf("Could not create %s: %s", consolidatedLuaOutput, er r) | 162 glog.Errorf("Could not create %s: %s", consolidatedLuaOutput, er r) |
176 return | 163 return |
177 } | 164 } |
178 » for i := 0; i < util.NumWorkers(); i++ { | 165 » for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES _PER_SWARMING_BOT; i++ { |
179 » » workerNum := i + 1 | 166 » » startRange := strconv.Itoa(util.GetStartRange(i, MAX_PAGES_PER_S WARMING_BOT)) |
180 » » workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, fmt.Sprintf("slave%d", workerNum), "outputs", *runID+".output") | 167 » » workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, startRange, "outputs", *runID+".output") |
181 respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath ) | 168 respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath ) |
182 if err != nil { | 169 if err != nil { |
183 glog.Errorf("Could not fetch %s: %s", workerRemoteOutput Path, err) | 170 glog.Errorf("Could not fetch %s: %s", workerRemoteOutput Path, err) |
184 continue | 171 continue |
185 } | 172 } |
186 defer skutil.Close(respBody) | 173 defer skutil.Close(respBody) |
187 out, err := os.OpenFile(consolidatedLuaOutput, os.O_RDWR|os.O_AP PEND, 0660) | 174 out, err := os.OpenFile(consolidatedLuaOutput, os.O_RDWR|os.O_AP PEND, 0660) |
188 if err != nil { | 175 if err != nil { |
189 glog.Errorf("Unable to open file %s: %s", consolidatedLu aOutput, err) | 176 glog.Errorf("Unable to open file %s: %s", consolidatedLu aOutput, err) |
190 return | 177 return |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
234 if err := gs.UploadFile(luaAggregatorOutputFileName, os.TempDir( ), consolidatedOutputRemoteDir); err != nil { | 221 if err := gs.UploadFile(luaAggregatorOutputFileName, os.TempDir( ), consolidatedOutputRemoteDir); err != nil { |
235 glog.Errorf("Unable to upload %s to %s: %s", luaAggregat orOutputFileName, consolidatedOutputRemoteDir, err) | 222 glog.Errorf("Unable to upload %s to %s: %s", luaAggregat orOutputFileName, consolidatedOutputRemoteDir, err) |
236 return | 223 return |
237 } | 224 } |
238 } else { | 225 } else { |
239 glog.Info("A lua aggregator has not been specified.") | 226 glog.Info("A lua aggregator has not been specified.") |
240 } | 227 } |
241 | 228 |
242 taskCompletedSuccessfully = true | 229 taskCompletedSuccessfully = true |
243 } | 230 } |
231 | |
232 func waitForOutputFile(luaOutput string) error { | |
233 // Check every 10 secs and timeout after 10 mins. | |
234 ticker := time.NewTicker(10 * time.Second) | |
235 deadline := 10 * time.Minute | |
236 deadlineTicker := time.NewTicker(deadline) | |
237 defer ticker.Stop() | |
238 defer deadlineTicker.Stop() | |
239 for { | |
240 select { | |
241 case <-ticker.C: | |
242 if _, err := os.Stat(luaOutput); os.IsNotExist(err) { | |
243 return nil | |
244 } | |
245 glog.Infof("%s still exists. Waiting for the other lua t ask to complete.", luaOutput) | |
246 case <-deadlineTicker.C: | |
247 return fmt.Errorf("%s still existed after %v secs", luaO utput, deadline.Seconds()) | |
248 } | |
249 } | |
250 } | |
OLD | NEW |