| Index: ct/go/master_scripts/create_pagesets_on_workers/main.go
|
| diff --git a/ct/go/master_scripts/create_pagesets_on_workers/main.go b/ct/go/master_scripts/create_pagesets_on_workers/main.go
|
| index b48cbe4b4bba2da6cd6890dc3b44155911767ccd..2ff449072f55f46343b8ff9f3bf65f787470cc66 100644
|
| --- a/ct/go/master_scripts/create_pagesets_on_workers/main.go
|
| +++ b/ct/go/master_scripts/create_pagesets_on_workers/main.go
|
| @@ -6,7 +6,11 @@ package main
|
| import (
|
| "flag"
|
| "fmt"
|
| - "strings"
|
| + "io/ioutil"
|
| + "path"
|
| + "path/filepath"
|
| + "runtime"
|
| + "strconv"
|
| "time"
|
|
|
| "github.com/skia-dev/glog"
|
| @@ -15,9 +19,14 @@ import (
|
| "go.skia.org/infra/ct/go/master_scripts/master_common"
|
| "go.skia.org/infra/ct/go/util"
|
| "go.skia.org/infra/go/common"
|
| + "go.skia.org/infra/go/swarming"
|
| skutil "go.skia.org/infra/go/util"
|
| )
|
|
|
| +const (
|
| + MAX_PAGES_PER_SWARMING_BOT = 1000
|
| +)
|
| +
|
| var (
|
| emails = flag.String("emails", "", "The comma separated email addresses to notify when the task is picked up and completes.")
|
| gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine task. This task will be updated when the task is completed.")
|
| @@ -71,10 +80,6 @@ func main() {
|
| // Ensure webapp is updated and completion email is sent even if task fails.
|
| defer updateWebappTask()
|
| defer sendEmail(emailsArr)
|
| - if !*master_common.Local {
|
| - // Cleanup tmp files after the run.
|
| - defer util.CleanTmpDir()
|
| - }
|
| // Finish with glog flush and how long the task took.
|
| defer util.TimeTrack(time.Now(), "Creating Pagesets on Workers")
|
| defer glog.Flush()
|
| @@ -84,16 +89,56 @@ func main() {
|
| return
|
| }
|
|
|
| - cmd := append(master_common.WorkerSetupCmds(),
|
| - // The main command that runs create_pagesets on all workers.
|
| - fmt.Sprintf(
|
| - "create_pagesets --worker_num=%s --log_dir=%s --log_id=%s --pageset_type=%s --local=%t;",
|
| - util.WORKER_NUM_KEYWORD, util.GLogDir, *runID, *pagesetType, *master_common.Local))
|
| + // Instantiate the swarming client.
|
| + workDir, err := ioutil.TempDir("", "swarming_work_")
|
| + if err != nil {
|
| + glog.Errorf("Could not get temp dir: %s", err)
|
| + return
|
| + }
|
| + s, err := swarming.NewSwarmingClient(workDir)
|
| + if err != nil {
|
| + glog.Errorf("Could not instantiate swarming client: %s", err)
|
| + return
|
| + }
|
| + defer s.Cleanup()
|
| + // Create isolated.gen.json files from tasks.
|
| + taskNames := []string{}
|
| + for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES_PER_SWARMING_BOT; i++ {
|
| + taskNames = append(taskNames, fmt.Sprintf("create_pagesets_%d", i))
|
| + }
|
| + // TODO(rmistry): REMOVE
|
| + taskNames = []string{"create_pagesets_1", "create_pagesets_2", "create_pagesets_3"}
|
| + genJSONs := []string{}
|
| + // Get path to isolate files.
|
| + _, currentFile, _, _ := runtime.Caller(0)
|
| + pathToIsolates := filepath.Join(filepath.Dir((filepath.Dir(filepath.Dir(filepath.Dir(currentFile))))), "isolates")
|
|
|
| - _, err := util.SSH(strings.Join(cmd, " "), util.Slaves, util.CREATE_PAGESETS_TIMEOUT)
|
| + for i, taskName := range taskNames {
|
| + extraArgs := map[string]string{
|
| + "START_RANGE": strconv.Itoa(util.GetStartRange(i+1, MAX_PAGES_PER_SWARMING_BOT)),
|
| + "NUM": strconv.Itoa(MAX_PAGES_PER_SWARMING_BOT),
|
| + "PAGESET_TYPE": *pagesetType,
|
| + }
|
| + genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates, util.CREATE_PAGESETS_ISOLATE), s.WorkDir, "linux", taskName, extraArgs, []string{})
|
| + if err != nil {
|
| + glog.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err)
|
| + return
|
| + }
|
| + genJSONs = append(genJSONs, genJSON)
|
| + }
|
| + // Empty the remote dir before the workers upload to it.
|
| + gs, err := util.NewGsUtil(nil)
|
| if err != nil {
|
| - glog.Errorf("Error while running cmd %s: %s", cmd, err)
|
| + glog.Error(err)
|
| return
|
| }
|
| + gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.PAGESETS_DIR_NAME, *pagesetType)
|
| + skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir))
|
| + // Archive, trigger and collect swarming tasks.
|
| + if err := util.ArchiveTriggerCollectSwarmingTask(s, taskNames, genJSONs, swarming.RECOMMENDED_HARD_TIMEOUT, swarming.RECOMMENDED_IO_TIMEOUT); err != nil {
|
| + glog.Errorf("Error encountered when swarming tasks: %s", err)
|
| + return
|
| + }
|
| +
|
| *taskCompletedSuccessfully = true
|
| }
|
|
|