Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // create_pagesets_on_workers is an application that creates pagesets on all CT | 1 // create_pagesets_on_workers is an application that creates pagesets on all CT |
| 2 // workers and uploads it to Google Storage. The requester is emailed when the t ask | 2 // workers and uploads it to Google Storage. The requester is emailed when the t ask |
| 3 // is done. | 3 // is done. |
| 4 package main | 4 package main |
| 5 | 5 |
| 6 import ( | 6 import ( |
| 7 "flag" | 7 "flag" |
| 8 "fmt" | 8 "fmt" |
| 9 » "strings" | 9 » "io/ioutil" |
| 10 » "path" | |
| 11 » "path/filepath" | |
| 12 » "runtime" | |
| 13 » "strconv" | |
| 10 "time" | 14 "time" |
| 11 | 15 |
| 12 "github.com/skia-dev/glog" | 16 "github.com/skia-dev/glog" |
| 13 "go.skia.org/infra/ct/go/ctfe/admin_tasks" | 17 "go.skia.org/infra/ct/go/ctfe/admin_tasks" |
| 14 "go.skia.org/infra/ct/go/frontend" | 18 "go.skia.org/infra/ct/go/frontend" |
| 15 "go.skia.org/infra/ct/go/master_scripts/master_common" | 19 "go.skia.org/infra/ct/go/master_scripts/master_common" |
| 16 "go.skia.org/infra/ct/go/util" | 20 "go.skia.org/infra/ct/go/util" |
| 17 "go.skia.org/infra/go/common" | 21 "go.skia.org/infra/go/common" |
| 22 "go.skia.org/infra/go/swarming" | |
| 18 skutil "go.skia.org/infra/go/util" | 23 skutil "go.skia.org/infra/go/util" |
| 19 ) | 24 ) |
| 20 | 25 |
| 26 const ( | |
| 27 MAX_PAGES_PER_SWARMING_BOT = 1000 | |
| 28 ) | |
| 29 | |
| 21 var ( | 30 var ( |
| 22 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.") | 31 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.") |
| 23 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.") | 32 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.") |
| 24 pagesetType = flag.String("pageset_type", "", "The type of pagesets to c reate from the Alexa CSV list. Eg: 10k, Mobile10k, All.") | 33 pagesetType = flag.String("pageset_type", "", "The type of pagesets to c reate from the Alexa CSV list. Eg: 10k, Mobile10k, All.") |
| 25 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).") | 34 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).") |
| 26 | 35 |
| 27 taskCompletedSuccessfully = new(bool) | 36 taskCompletedSuccessfully = new(bool) |
| 28 ) | 37 ) |
| 29 | 38 |
| 30 func sendEmail(recipients []string) { | 39 func sendEmail(recipients []string) { |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 64 emailsArr = append(emailsArr, util.CtAdmins...) | 73 emailsArr = append(emailsArr, util.CtAdmins...) |
| 65 if len(emailsArr) == 0 { | 74 if len(emailsArr) == 0 { |
| 66 glog.Error("At least one email address must be specified") | 75 glog.Error("At least one email address must be specified") |
| 67 return | 76 return |
| 68 } | 77 } |
| 69 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateP ageSetsUpdateVars{}, *gaeTaskID)) | 78 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateP ageSetsUpdateVars{}, *gaeTaskID)) |
| 70 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Creating pagesets", *r unID, "")) | 79 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Creating pagesets", *r unID, "")) |
| 71 // Ensure webapp is updated and completion email is sent even if task fa ils. | 80 // Ensure webapp is updated and completion email is sent even if task fa ils. |
| 72 defer updateWebappTask() | 81 defer updateWebappTask() |
| 73 defer sendEmail(emailsArr) | 82 defer sendEmail(emailsArr) |
| 74 if !*master_common.Local { | |
| 75 // Cleanup tmp files after the run. | |
| 76 defer util.CleanTmpDir() | |
| 77 } | |
| 78 // Finish with glog flush and how long the task took. | 83 // Finish with glog flush and how long the task took. |
| 79 defer util.TimeTrack(time.Now(), "Creating Pagesets on Workers") | 84 defer util.TimeTrack(time.Now(), "Creating Pagesets on Workers") |
| 80 defer glog.Flush() | 85 defer glog.Flush() |
| 81 | 86 |
| 82 if *pagesetType == "" { | 87 if *pagesetType == "" { |
| 83 glog.Error("Must specify --pageset_type") | 88 glog.Error("Must specify --pageset_type") |
| 84 return | 89 return |
| 85 } | 90 } |
| 86 | 91 |
| 87 » cmd := append(master_common.WorkerSetupCmds(), | 92 » // Instantiate the swarming client. |
| 88 » » // The main command that runs create_pagesets on all workers. | 93 » workDir, err := ioutil.TempDir("", "swarming_work_") |
| 89 » » fmt.Sprintf( | |
| 90 » » » "create_pagesets --worker_num=%s --log_dir=%s --log_id=% s --pageset_type=%s --local=%t;", | |
| 91 » » » util.WORKER_NUM_KEYWORD, util.GLogDir, *runID, *pagesetT ype, *master_common.Local)) | |
| 92 | |
| 93 » _, err := util.SSH(strings.Join(cmd, " "), util.Slaves, util.CREATE_PAGE SETS_TIMEOUT) | |
| 94 if err != nil { | 94 if err != nil { |
| 95 » » glog.Errorf("Error while running cmd %s: %s", cmd, err) | 95 » » glog.Errorf("Could not get temp dir: %s", err) |
| 96 return | 96 return |
| 97 } | 97 } |
| 98 s, err := swarming.NewSwarmingClient(workDir) | |
| 99 if err != nil { | |
| 100 glog.Errorf("Could not instantiate swarming client: %s", err) | |
| 101 return | |
| 102 } | |
| 103 defer s.Cleanup() | |
| 104 // Create isolated.gen.json files from tasks. | |
| 105 taskNames := []string{} | |
| 106 for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES _PER_SWARMING_BOT; i++ { | |
| 107 taskNames = append(taskNames, fmt.Sprintf("create_pagesets_%d", i)) | |
| 108 } | |
| 109 genJSONs := []string{} | |
| 110 // Get path to isolate files. | |
| 111 _, currentFile, _, _ := runtime.Caller(0) | |
| 112 pathToIsolates := filepath.Join(filepath.Dir((filepath.Dir(filepath.Dir( filepath.Dir(currentFile))))), "isolates") | |
|
dogben
2016/05/18 17:38:25
nit: extra parentheses
rmistry
2016/05/19 12:59:50
Done.
| |
| 113 | |
| 114 for i, taskName := range taskNames { | |
| 115 extraArgs := map[string]string{ | |
| 116 "START_RANGE": strconv.Itoa(util.GetStartRange(i+1, MAX _PAGES_PER_SWARMING_BOT)), | |
| 117 "NUM": strconv.Itoa(MAX_PAGES_PER_SWARMING_BOT) , | |
| 118 "PAGESET_TYPE": *pagesetType, | |
| 119 } | |
| 120 genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates , util.CREATE_PAGESETS_ISOLATE), s.WorkDir, "linux", taskName, extraArgs, []stri ng{}) | |
| 121 if err != nil { | |
| 122 glog.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err) | |
| 123 return | |
| 124 } | |
| 125 genJSONs = append(genJSONs, genJSON) | |
| 126 } | |
| 127 // Empty the remote dir before the workers upload to it. | |
| 128 gs, err := util.NewGsUtil(nil) | |
| 129 if err != nil { | |
| 130 glog.Error(err) | |
| 131 return | |
| 132 } | |
| 133 gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.PAGESETS_DIR_NAM E, *pagesetType) | |
| 134 skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) | |
| 135 // Archive, trigger and collect swarming tasks. | |
| 136 if err := util.ArchiveTriggerCollectSwarmingTask(s, taskNames, genJSONs, swarming.RECOMMENDED_HARD_TIMEOUT, swarming.RECOMMENDED_IO_TIMEOUT); err != nil { | |
| 137 glog.Errorf("Error encountered when swarming tasks: %s", err) | |
| 138 return | |
| 139 } | |
| 140 | |
| 98 *taskCompletedSuccessfully = true | 141 *taskCompletedSuccessfully = true |
| 99 } | 142 } |
| OLD | NEW |