OLD | NEW |
---|---|
1 // create_pagesets_on_workers is an application that creates pagesets on all CT | 1 // create_pagesets_on_workers is an application that creates pagesets on all CT |
2 // workers and uploads it to Google Storage. The requester is emailed when the t ask | 2 // workers and uploads it to Google Storage. The requester is emailed when the t ask |
3 // is done. | 3 // is done. |
4 package main | 4 package main |
5 | 5 |
6 import ( | 6 import ( |
7 "flag" | 7 "flag" |
8 "fmt" | 8 "fmt" |
9 » "strings" | 9 » "io/ioutil" |
10 » "path" | |
11 » "path/filepath" | |
12 » "runtime" | |
13 » "strconv" | |
10 "time" | 14 "time" |
11 | 15 |
12 "github.com/skia-dev/glog" | 16 "github.com/skia-dev/glog" |
13 "go.skia.org/infra/ct/go/ctfe/admin_tasks" | 17 "go.skia.org/infra/ct/go/ctfe/admin_tasks" |
14 "go.skia.org/infra/ct/go/frontend" | 18 "go.skia.org/infra/ct/go/frontend" |
15 "go.skia.org/infra/ct/go/master_scripts/master_common" | 19 "go.skia.org/infra/ct/go/master_scripts/master_common" |
16 "go.skia.org/infra/ct/go/util" | 20 "go.skia.org/infra/ct/go/util" |
17 "go.skia.org/infra/go/common" | 21 "go.skia.org/infra/go/common" |
22 "go.skia.org/infra/go/swarming" | |
18 skutil "go.skia.org/infra/go/util" | 23 skutil "go.skia.org/infra/go/util" |
19 ) | 24 ) |
20 | 25 |
26 const ( | |
27 MAX_PAGES_PER_SWARMING_BOT = 1000 | |
28 ) | |
29 | |
21 var ( | 30 var ( |
22 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.") | 31 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.") |
23 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.") | 32 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.") |
24 pagesetType = flag.String("pageset_type", "", "The type of pagesets to c reate from the Alexa CSV list. Eg: 10k, Mobile10k, All.") | 33 pagesetType = flag.String("pageset_type", "", "The type of pagesets to c reate from the Alexa CSV list. Eg: 10k, Mobile10k, All.") |
25 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).") | 34 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).") |
26 | 35 |
27 taskCompletedSuccessfully = new(bool) | 36 taskCompletedSuccessfully = new(bool) |
28 ) | 37 ) |
29 | 38 |
30 func sendEmail(recipients []string) { | 39 func sendEmail(recipients []string) { |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
64 emailsArr = append(emailsArr, util.CtAdmins...) | 73 emailsArr = append(emailsArr, util.CtAdmins...) |
65 if len(emailsArr) == 0 { | 74 if len(emailsArr) == 0 { |
66 glog.Error("At least one email address must be specified") | 75 glog.Error("At least one email address must be specified") |
67 return | 76 return |
68 } | 77 } |
69 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateP ageSetsUpdateVars{}, *gaeTaskID)) | 78 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateP ageSetsUpdateVars{}, *gaeTaskID)) |
70 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Creating pagesets", *r unID, "")) | 79 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Creating pagesets", *r unID, "")) |
71 // Ensure webapp is updated and completion email is sent even if task fa ils. | 80 // Ensure webapp is updated and completion email is sent even if task fa ils. |
72 defer updateWebappTask() | 81 defer updateWebappTask() |
73 defer sendEmail(emailsArr) | 82 defer sendEmail(emailsArr) |
74 if !*master_common.Local { | |
75 // Cleanup tmp files after the run. | |
76 defer util.CleanTmpDir() | |
77 } | |
78 // Finish with glog flush and how long the task took. | 83 // Finish with glog flush and how long the task took. |
79 defer util.TimeTrack(time.Now(), "Creating Pagesets on Workers") | 84 defer util.TimeTrack(time.Now(), "Creating Pagesets on Workers") |
80 defer glog.Flush() | 85 defer glog.Flush() |
81 | 86 |
82 if *pagesetType == "" { | 87 if *pagesetType == "" { |
83 glog.Error("Must specify --pageset_type") | 88 glog.Error("Must specify --pageset_type") |
84 return | 89 return |
85 } | 90 } |
86 | 91 |
87 » cmd := append(master_common.WorkerSetupCmds(), | 92 » // Instantiate the swarming client. |
88 » » // The main command that runs create_pagesets on all workers. | 93 » workDir, err := ioutil.TempDir("", "swarming_work_") |
89 » » fmt.Sprintf( | |
90 » » » "create_pagesets --worker_num=%s --log_dir=%s --log_id=% s --pageset_type=%s --local=%t;", | |
91 » » » util.WORKER_NUM_KEYWORD, util.GLogDir, *runID, *pagesetT ype, *master_common.Local)) | |
92 | |
93 » _, err := util.SSH(strings.Join(cmd, " "), util.Slaves, util.CREATE_PAGE SETS_TIMEOUT) | |
94 if err != nil { | 94 if err != nil { |
95 » » glog.Errorf("Error while running cmd %s: %s", cmd, err) | 95 » » glog.Errorf("Could not get temp dir: %s", err) |
96 return | 96 return |
97 } | 97 } |
98 s, err := swarming.NewSwarmingClient(workDir) | |
99 if err != nil { | |
100 glog.Errorf("Could not instantiate swarming client: %s", err) | |
101 return | |
102 } | |
103 defer s.Cleanup() | |
104 // Create isolated.gen.json files from tasks. | |
105 taskNames := []string{} | |
106 for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES _PER_SWARMING_BOT; i++ { | |
107 taskNames = append(taskNames, fmt.Sprintf("create_pagesets_%d", i)) | |
108 } | |
109 genJSONs := []string{} | |
110 // Get path to isolate files. | |
111 _, currentFile, _, _ := runtime.Caller(0) | |
112 pathToIsolates := filepath.Join(filepath.Dir((filepath.Dir(filepath.Dir( filepath.Dir(currentFile))))), "isolates") | |
dogben
2016/05/18 17:38:25
nit: extra parentheses
rmistry
2016/05/19 12:59:50
Done.
| |
113 | |
114 for i, taskName := range taskNames { | |
115 extraArgs := map[string]string{ | |
116 "START_RANGE": strconv.Itoa(util.GetStartRange(i+1, MAX _PAGES_PER_SWARMING_BOT)), | |
117 "NUM": strconv.Itoa(MAX_PAGES_PER_SWARMING_BOT) , | |
118 "PAGESET_TYPE": *pagesetType, | |
119 } | |
120 genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates , util.CREATE_PAGESETS_ISOLATE), s.WorkDir, "linux", taskName, extraArgs, []stri ng{}) | |
121 if err != nil { | |
122 glog.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err) | |
123 return | |
124 } | |
125 genJSONs = append(genJSONs, genJSON) | |
126 } | |
127 // Empty the remote dir before the workers upload to it. | |
128 gs, err := util.NewGsUtil(nil) | |
129 if err != nil { | |
130 glog.Error(err) | |
131 return | |
132 } | |
133 gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.PAGESETS_DIR_NAM E, *pagesetType) | |
134 skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) | |
135 // Archive, trigger and collect swarming tasks. | |
136 if err := util.ArchiveTriggerCollectSwarmingTask(s, taskNames, genJSONs, swarming.RECOMMENDED_HARD_TIMEOUT, swarming.RECOMMENDED_IO_TIMEOUT); err != nil { | |
137 glog.Errorf("Error encountered when swarming tasks: %s", err) | |
138 return | |
139 } | |
140 | |
98 *taskCompletedSuccessfully = true | 141 *taskCompletedSuccessfully = true |
99 } | 142 } |
OLD | NEW |