Chromium Code Reviews| Index: ct/go/master_scripts/create_pagesets_on_workers/main.go |
| diff --git a/ct/go/master_scripts/create_pagesets_on_workers/main.go b/ct/go/master_scripts/create_pagesets_on_workers/main.go |
| index b48cbe4b4bba2da6cd6890dc3b44155911767ccd..1f5e6631f04a0a3fdd50fc19ee6ccc609644e898 100644 |
| --- a/ct/go/master_scripts/create_pagesets_on_workers/main.go |
| +++ b/ct/go/master_scripts/create_pagesets_on_workers/main.go |
| @@ -6,7 +6,11 @@ package main |
| import ( |
| "flag" |
| "fmt" |
| - "strings" |
| + "io/ioutil" |
| + "path" |
| + "path/filepath" |
| + "runtime" |
| + "strconv" |
| "time" |
| "github.com/skia-dev/glog" |
| @@ -15,9 +19,14 @@ import ( |
| "go.skia.org/infra/ct/go/master_scripts/master_common" |
| "go.skia.org/infra/ct/go/util" |
| "go.skia.org/infra/go/common" |
| + "go.skia.org/infra/go/swarming" |
| skutil "go.skia.org/infra/go/util" |
| ) |
| +const ( |
| + MAX_PAGES_PER_SWARMING_BOT = 1000 |
| +) |
| + |
| var ( |
| emails = flag.String("emails", "", "The comma separated email addresses to notify when the task is picked up and completes.") |
| gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine task. This task will be updated when the task is completed.") |
| @@ -71,10 +80,6 @@ func main() { |
| // Ensure webapp is updated and completion email is sent even if task fails. |
| defer updateWebappTask() |
| defer sendEmail(emailsArr) |
| - if !*master_common.Local { |
| - // Cleanup tmp files after the run. |
| - defer util.CleanTmpDir() |
| - } |
| // Finish with glog flush and how long the task took. |
| defer util.TimeTrack(time.Now(), "Creating Pagesets on Workers") |
| defer glog.Flush() |
| @@ -84,16 +89,54 @@ func main() { |
| return |
| } |
| - cmd := append(master_common.WorkerSetupCmds(), |
| - // The main command that runs create_pagesets on all workers. |
| - fmt.Sprintf( |
| - "create_pagesets --worker_num=%s --log_dir=%s --log_id=%s --pageset_type=%s --local=%t;", |
| - util.WORKER_NUM_KEYWORD, util.GLogDir, *runID, *pagesetType, *master_common.Local)) |
| + // Instantiate the swarming client. |
| + workDir, err := ioutil.TempDir("", "swarming_work_") |
| + if err != nil { |
| + glog.Errorf("Could not get temp dir: %s", err) |
| + return |
| + } |
| + s, err := swarming.NewSwarmingClient(workDir) |
| + if err != nil { |
| + glog.Errorf("Could not instantiate swarming client: %s", err) |
| + return |
| + } |
| + defer s.Cleanup() |
| + // Create isolated.gen.json files from tasks. |
| + taskNames := []string{} |
| + for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES_PER_SWARMING_BOT; i++ { |
| + taskNames = append(taskNames, fmt.Sprintf("create_pagesets_%d", i)) |
| + } |
| + genJSONs := []string{} |
| + // Get path to isolate files. |
| + _, currentFile, _, _ := runtime.Caller(0) |
| + pathToIsolates := filepath.Join(filepath.Dir((filepath.Dir(filepath.Dir(filepath.Dir(currentFile))))), "isolates") |
|
dogben
2016/05/18 17:38:25
nit: extra parentheses
rmistry
2016/05/19 12:59:50
Done.
|
| - _, err := util.SSH(strings.Join(cmd, " "), util.Slaves, util.CREATE_PAGESETS_TIMEOUT) |
| + for i, taskName := range taskNames { |
| + extraArgs := map[string]string{ |
| + "START_RANGE": strconv.Itoa(util.GetStartRange(i+1, MAX_PAGES_PER_SWARMING_BOT)), |
| + "NUM": strconv.Itoa(MAX_PAGES_PER_SWARMING_BOT), |
| + "PAGESET_TYPE": *pagesetType, |
| + } |
| + genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates, util.CREATE_PAGESETS_ISOLATE), s.WorkDir, "linux", taskName, extraArgs, []string{}) |
| + if err != nil { |
| + glog.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err) |
| + return |
| + } |
| + genJSONs = append(genJSONs, genJSON) |
| + } |
| + // Empty the remote dir before the workers upload to it. |
| + gs, err := util.NewGsUtil(nil) |
| if err != nil { |
| - glog.Errorf("Error while running cmd %s: %s", cmd, err) |
| + glog.Error(err) |
| return |
| } |
| + gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.PAGESETS_DIR_NAME, *pagesetType) |
| + skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) |
| + // Archive, trigger and collect swarming tasks. |
| + if err := util.ArchiveTriggerCollectSwarmingTask(s, taskNames, genJSONs, swarming.RECOMMENDED_HARD_TIMEOUT, swarming.RECOMMENDED_IO_TIMEOUT); err != nil { |
| + glog.Errorf("Error encountered when swarming tasks: %s", err) |
| + return |
| + } |
| + |
| *taskCompletedSuccessfully = true |
| } |