Chromium Code Reviews| Index: ct/go/master_scripts/run_lua_on_workers/main.go |
| diff --git a/ct/go/master_scripts/run_lua_on_workers/main.go b/ct/go/master_scripts/run_lua_on_workers/main.go |
| index c8276876005de61c8f520c7032b51f1a40b7b50d..79522b3b33ee7224ec748aed997df45a94754bca 100644 |
| --- a/ct/go/master_scripts/run_lua_on_workers/main.go |
| +++ b/ct/go/master_scripts/run_lua_on_workers/main.go |
| @@ -4,7 +4,6 @@ |
| package main |
| import ( |
| - "bytes" |
| "database/sql" |
| "flag" |
| "fmt" |
| @@ -12,8 +11,7 @@ import ( |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| - "strings" |
| - "text/template" |
| + "strconv" |
| "time" |
| "github.com/skia-dev/glog" |
| @@ -25,6 +23,10 @@ import ( |
| skutil "go.skia.org/infra/go/util" |
| ) |
| +const ( |
| + MAX_PAGES_PER_SWARMING_BOT = 100 |
| +) |
| + |
| var ( |
| emails = flag.String("emails", "", "The comma separated email addresses to notify when the task is picked up and completes.") |
| description = flag.String("description", "", "The description of the run as entered by the requester.") |
| @@ -99,10 +101,6 @@ func main() { |
| // Ensure webapp is updated and email is sent even if task fails. |
| defer updateWebappTask() |
| defer sendEmail(emailsArr) |
| - // Cleanup tmp files after the run. |
| - if !*master_common.Local { |
| - defer util.CleanTmpDir() |
| - } |
| // Finish with glog flush and how long the task took. |
| defer util.TimeTrack(time.Now(), "Running Lua script on workers") |
| defer glog.Flush() |
| @@ -136,48 +134,37 @@ func main() { |
| return |
| } |
| - // Run the run_lua script on all workers. |
| - runLuaCmdTemplate := "DISPLAY=:0 run_lua --worker_num={{.WorkerNum}} --log_dir={{.LogDir}} --log_id={{.RunID}} --pageset_type={{.PagesetType}} --chromium_build={{.ChromiumBuild}} --run_id={{.RunID}} --local={{.Local}};" |
| - runLuaTemplateParsed := template.Must(template.New("run_lua_cmd").Parse(runLuaCmdTemplate)) |
| - luaCmdBytes := new(bytes.Buffer) |
| - if err := runLuaTemplateParsed.Execute(luaCmdBytes, struct { |
| - WorkerNum string |
| - LogDir string |
| - PagesetType string |
| - ChromiumBuild string |
| - RunID string |
| - Local bool |
| - }{ |
| - WorkerNum: util.WORKER_NUM_KEYWORD, |
| - LogDir: util.GLogDir, |
| - PagesetType: *pagesetType, |
| - ChromiumBuild: *chromiumBuild, |
| - RunID: *runID, |
| - Local: *master_common.Local, |
| - }); err != nil { |
| - glog.Errorf("Failed to execute template: %s", err) |
| - return |
| + // Empty the remote dir before the workers upload to it. |
| + gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, filepath.Join(util.LuaRunsDir, *runID), *pagesetType) |
|
dogben
2016/05/19 17:02:41
I would hope that runID is unique and therefore th
rmistry
2016/05/19 17:08:19
The other CLs IIRC did not use a runID here.
But y
|
| + skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) |
| + |
| + // Archive, trigger and collect swarming tasks. |
| + isolateExtraArgs := map[string]string{ |
| + "CHROMIUM_BUILD": *chromiumBuild, |
| + "RUN_ID": *runID, |
| } |
| - cmd := append(master_common.WorkerSetupCmds(), |
| - // The main command that runs run_lua on all workers. |
| - luaCmdBytes.String()) |
| - _, err = util.SSH(strings.Join(cmd, " "), util.Slaves, util.RUN_LUA_TIMEOUT) |
| - if err != nil { |
| - glog.Errorf("Error while running cmd %s: %s", cmd, err) |
| + if err := util.TriggerSwarmingTask(*pagesetType, "run_lua", util.RUN_LUA_ISOLATE, 2*time.Hour, 1*time.Hour, MAX_PAGES_PER_SWARMING_BOT, isolateExtraArgs); err != nil { |
| + glog.Errorf("Error encountered when swarming tasks: %s", err) |
| return |
| } |
| // Copy outputs from all slaves locally and combine it into one file. |
| consolidatedFileName := "lua-output" |
| consolidatedLuaOutput := filepath.Join(os.TempDir(), consolidatedFileName) |
| + // If the file already exists it could be that there is another lua task running on this machine. |
|
dogben
2016/05/19 17:02:41
Please just use ioutil.TempDir instead of os.TempD
rmistry
2016/05/19 17:08:19
Unfortunately the file has to be in /tmp/lua-outpu
dogben
2016/05/19 17:15:38
Oh, that is unfortunate.
|
| + // Wait for the file to be deleted within a deadline. |
| + if err := waitForOutputFile(consolidatedLuaOutput); err != nil { |
| + glog.Error(err) |
| + return |
| + } |
| defer skutil.Remove(consolidatedLuaOutput) |
| if err := ioutil.WriteFile(consolidatedLuaOutput, []byte{}, 0660); err != nil { |
| glog.Errorf("Could not create %s: %s", consolidatedLuaOutput, err) |
| return |
| } |
| - for i := 0; i < util.NumWorkers(); i++ { |
| - workerNum := i + 1 |
| - workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, fmt.Sprintf("slave%d", workerNum), "outputs", *runID+".output") |
| + for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES_PER_SWARMING_BOT; i++ { |
| + startRange := strconv.Itoa(util.GetStartRange(i, MAX_PAGES_PER_SWARMING_BOT)) |
| + workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, startRange, "outputs", *runID+".output") |
| respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath) |
| if err != nil { |
| glog.Errorf("Could not fetch %s: %s", workerRemoteOutputPath, err) |
| @@ -241,3 +228,23 @@ func main() { |
| taskCompletedSuccessfully = true |
| } |
| + |
| +func waitForOutputFile(luaOutput string) error { |
| + // Check every 10 secs and timeout after 10 mins. |
| + ticker := time.NewTicker(10 * time.Second) |
| + deadline := 10 * time.Minute |
| + deadlineTicker := time.NewTicker(deadline) |
| + defer ticker.Stop() |
| + defer deadlineTicker.Stop() |
| + for { |
| + select { |
| + case <-ticker.C: |
| + if _, err := os.Stat(luaOutput); os.IsNotExist(err) { |
| + return nil |
| + } |
| + glog.Infof("%s still exists. Waiting for the other lua task to complete.", luaOutput) |
| + case <-deadlineTicker.C: |
| + return fmt.Errorf("%s still existed after %v secs", luaOutput, deadline.Seconds()) |
| + } |
| + } |
| +} |