Index: ct/go/master_scripts/run_lua_on_workers/main.go |
diff --git a/ct/go/master_scripts/run_lua_on_workers/main.go b/ct/go/master_scripts/run_lua_on_workers/main.go |
index c8276876005de61c8f520c7032b51f1a40b7b50d..79522b3b33ee7224ec748aed997df45a94754bca 100644 |
--- a/ct/go/master_scripts/run_lua_on_workers/main.go |
+++ b/ct/go/master_scripts/run_lua_on_workers/main.go |
@@ -4,7 +4,6 @@ |
package main |
import ( |
- "bytes" |
"database/sql" |
"flag" |
"fmt" |
@@ -12,8 +11,7 @@ import ( |
"io/ioutil" |
"os" |
"path/filepath" |
- "strings" |
- "text/template" |
+ "strconv" |
"time" |
"github.com/skia-dev/glog" |
@@ -25,6 +23,10 @@ import ( |
skutil "go.skia.org/infra/go/util" |
) |
+const ( |
+ MAX_PAGES_PER_SWARMING_BOT = 100 |
+) |
+ |
var ( |
emails = flag.String("emails", "", "The comma separated email addresses to notify when the task is picked up and completes.") |
description = flag.String("description", "", "The description of the run as entered by the requester.") |
@@ -99,10 +101,6 @@ func main() { |
// Ensure webapp is updated and email is sent even if task fails. |
defer updateWebappTask() |
defer sendEmail(emailsArr) |
- // Cleanup tmp files after the run. |
- if !*master_common.Local { |
- defer util.CleanTmpDir() |
- } |
// Finish with glog flush and how long the task took. |
defer util.TimeTrack(time.Now(), "Running Lua script on workers") |
defer glog.Flush() |
@@ -136,48 +134,37 @@ func main() { |
return |
} |
- // Run the run_lua script on all workers. |
- runLuaCmdTemplate := "DISPLAY=:0 run_lua --worker_num={{.WorkerNum}} --log_dir={{.LogDir}} --log_id={{.RunID}} --pageset_type={{.PagesetType}} --chromium_build={{.ChromiumBuild}} --run_id={{.RunID}} --local={{.Local}};" |
- runLuaTemplateParsed := template.Must(template.New("run_lua_cmd").Parse(runLuaCmdTemplate)) |
- luaCmdBytes := new(bytes.Buffer) |
- if err := runLuaTemplateParsed.Execute(luaCmdBytes, struct { |
- WorkerNum string |
- LogDir string |
- PagesetType string |
- ChromiumBuild string |
- RunID string |
- Local bool |
- }{ |
- WorkerNum: util.WORKER_NUM_KEYWORD, |
- LogDir: util.GLogDir, |
- PagesetType: *pagesetType, |
- ChromiumBuild: *chromiumBuild, |
- RunID: *runID, |
- Local: *master_common.Local, |
- }); err != nil { |
- glog.Errorf("Failed to execute template: %s", err) |
- return |
+ // Empty the remote dir before the workers upload to it. |
+ gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, filepath.Join(util.LuaRunsDir, *runID), *pagesetType) |
dogben
2016/05/19 17:02:41
I would hope that runID is unique and therefore th
rmistry
2016/05/19 17:08:19
The other CLs IIRC did not use a runID here.
But y
|
+ skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) |
+ |
+ // Archive, trigger and collect swarming tasks. |
+ isolateExtraArgs := map[string]string{ |
+ "CHROMIUM_BUILD": *chromiumBuild, |
+ "RUN_ID": *runID, |
} |
- cmd := append(master_common.WorkerSetupCmds(), |
- // The main command that runs run_lua on all workers. |
- luaCmdBytes.String()) |
- _, err = util.SSH(strings.Join(cmd, " "), util.Slaves, util.RUN_LUA_TIMEOUT) |
- if err != nil { |
- glog.Errorf("Error while running cmd %s: %s", cmd, err) |
+ if err := util.TriggerSwarmingTask(*pagesetType, "run_lua", util.RUN_LUA_ISOLATE, 2*time.Hour, 1*time.Hour, MAX_PAGES_PER_SWARMING_BOT, isolateExtraArgs); err != nil { |
+ glog.Errorf("Error encountered when swarming tasks: %s", err) |
return |
} |
// Copy outputs from all slaves locally and combine it into one file. |
consolidatedFileName := "lua-output" |
consolidatedLuaOutput := filepath.Join(os.TempDir(), consolidatedFileName) |
+ // If the file already exists it could be that there is another lua task running on this machine. |
dogben
2016/05/19 17:02:41
Please just use ioutil.TempDir instead of os.TempD
rmistry
2016/05/19 17:08:19
Unfortunately the file has to be in /tmp/lua-outpu
dogben
2016/05/19 17:15:38
Oh, that is unfortunate.
|
+ // Wait for the file to be deleted within a deadline. |
+ if err := waitForOutputFile(consolidatedLuaOutput); err != nil { |
+ glog.Error(err) |
+ return |
+ } |
defer skutil.Remove(consolidatedLuaOutput) |
if err := ioutil.WriteFile(consolidatedLuaOutput, []byte{}, 0660); err != nil { |
glog.Errorf("Could not create %s: %s", consolidatedLuaOutput, err) |
return |
} |
- for i := 0; i < util.NumWorkers(); i++ { |
- workerNum := i + 1 |
- workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, fmt.Sprintf("slave%d", workerNum), "outputs", *runID+".output") |
+ for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES_PER_SWARMING_BOT; i++ { |
+ startRange := strconv.Itoa(util.GetStartRange(i, MAX_PAGES_PER_SWARMING_BOT)) |
+ workerRemoteOutputPath := filepath.Join(util.LuaRunsDir, *runID, startRange, "outputs", *runID+".output") |
respBody, err := gs.GetRemoteFileContents(workerRemoteOutputPath) |
if err != nil { |
glog.Errorf("Could not fetch %s: %s", workerRemoteOutputPath, err) |
@@ -241,3 +228,23 @@ func main() { |
taskCompletedSuccessfully = true |
} |
+ |
+func waitForOutputFile(luaOutput string) error { |
+ // Check every 10 secs and timeout after 10 mins. |
+ ticker := time.NewTicker(10 * time.Second) |
+ deadline := 10 * time.Minute |
+ deadlineTicker := time.NewTicker(deadline) |
+ defer ticker.Stop() |
+ defer deadlineTicker.Stop() |
+ for { |
+ select { |
+ case <-ticker.C: |
+ if _, err := os.Stat(luaOutput); os.IsNotExist(err) { |
+ return nil |
+ } |
+ glog.Infof("%s still exists. Waiting for the other lua task to complete.", luaOutput) |
+ case <-deadlineTicker.C: |
+ return fmt.Errorf("%s still existed after %v secs", luaOutput, deadline.Seconds()) |
+ } |
+ } |
+} |