OLD | NEW |
---|---|
1 // capture_archives_on_workers is an application that captures archives on all C T | 1 // capture_archives_on_workers is an application that captures archives on all C T |
2 // workers and uploads it to Google Storage. The requester is emailed when the t ask | 2 // workers and uploads it to Google Storage. The requester is emailed when the t ask |
3 // is done. | 3 // is done. |
4 package main | 4 package main |
5 | 5 |
6 import ( | 6 import ( |
7 "flag" | 7 "flag" |
8 "fmt" | 8 "fmt" |
9 » "strings" | 9 » "io/ioutil" |
10 » "path" | |
11 » "path/filepath" | |
12 » "runtime" | |
13 » "strconv" | |
10 "time" | 14 "time" |
11 | 15 |
12 "github.com/skia-dev/glog" | 16 "github.com/skia-dev/glog" |
13 "go.skia.org/infra/ct/go/ctfe/admin_tasks" | 17 "go.skia.org/infra/ct/go/ctfe/admin_tasks" |
14 "go.skia.org/infra/ct/go/frontend" | 18 "go.skia.org/infra/ct/go/frontend" |
15 "go.skia.org/infra/ct/go/master_scripts/master_common" | 19 "go.skia.org/infra/ct/go/master_scripts/master_common" |
16 "go.skia.org/infra/ct/go/util" | 20 "go.skia.org/infra/ct/go/util" |
17 "go.skia.org/infra/go/common" | 21 "go.skia.org/infra/go/common" |
22 "go.skia.org/infra/go/swarming" | |
18 skutil "go.skia.org/infra/go/util" | 23 skutil "go.skia.org/infra/go/util" |
19 ) | 24 ) |
20 | 25 |
21 var ( | 26 var ( |
22 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.") | 27 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.") |
23 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.") | 28 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.") |
24 pagesetType = flag.String("pageset_type", "", "The type of pagesets to u se. Eg: 10k, Mobile10k, All.") | 29 pagesetType = flag.String("pageset_type", "", "The type of pagesets to u se. Eg: 10k, Mobile10k, All.") |
25 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).") | 30 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).") |
26 | 31 |
27 taskCompletedSuccessfully = new(bool) | 32 taskCompletedSuccessfully = new(bool) |
28 ) | 33 ) |
29 | 34 |
35 const ( | |
36 MAX_PAGES_PER_SWARMING_BOT = 10 | |
37 ) | |
38 | |
30 func sendEmail(recipients []string) { | 39 func sendEmail(recipients []string) { |
31 // Send completion email. | 40 // Send completion email. |
32 emailSubject := "Capture archives Cluster telemetry task has completed" | 41 emailSubject := "Capture archives Cluster telemetry task has completed" |
33 failureHtml := "" | 42 failureHtml := "" |
34 if !*taskCompletedSuccessfully { | 43 if !*taskCompletedSuccessfully { |
35 emailSubject += " with failures" | 44 emailSubject += " with failures" |
36 failureHtml = util.GetFailureEmailHtml(*runID) | 45 failureHtml = util.GetFailureEmailHtml(*runID) |
37 } | 46 } |
38 bodyTemplate := ` | 47 bodyTemplate := ` |
39 The Cluster telemetry queued task to capture archives of %s pagesets has completed.<br/> | 48 The Cluster telemetry queued task to capture archives of %s pagesets has completed.<br/> |
(...skipping 24 matching lines...) Expand all Loading... | |
64 emailsArr = append(emailsArr, util.CtAdmins...) | 73 emailsArr = append(emailsArr, util.CtAdmins...) |
65 if len(emailsArr) == 0 { | 74 if len(emailsArr) == 0 { |
66 glog.Error("At least one email address must be specified") | 75 glog.Error("At least one email address must be specified") |
67 return | 76 return |
68 } | 77 } |
69 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateW ebpageArchivesUpdateVars{}, *gaeTaskID)) | 78 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateW ebpageArchivesUpdateVars{}, *gaeTaskID)) |
70 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Capture archives", *ru nID, "")) | 79 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Capture archives", *ru nID, "")) |
71 // Ensure webapp is updated and completion email is sent even if task fa ils. | 80 // Ensure webapp is updated and completion email is sent even if task fa ils. |
72 defer updateWebappTask() | 81 defer updateWebappTask() |
73 defer sendEmail(emailsArr) | 82 defer sendEmail(emailsArr) |
74 if !*master_common.Local { | |
75 // Cleanup tmp files after the run. | |
76 defer util.CleanTmpDir() | |
77 } | |
78 // Finish with glog flush and how long the task took. | 83 // Finish with glog flush and how long the task took. |
79 defer util.TimeTrack(time.Now(), "Capture archives on Workers") | 84 defer util.TimeTrack(time.Now(), "Capture archives on Workers") |
80 defer glog.Flush() | 85 defer glog.Flush() |
81 | 86 |
82 if *pagesetType == "" { | 87 if *pagesetType == "" { |
83 glog.Error("Must specify --pageset_type") | 88 glog.Error("Must specify --pageset_type") |
84 return | 89 return |
85 } | 90 } |
86 | 91 |
87 » cmd := append(master_common.WorkerSetupCmds(), | 92 » // Instantiate the swarming client. |
dogben
2016/05/18 18:59:14
Seems capture_archives and create_pagesets are ver
rmistry
2016/05/19 13:23:18
Done.
| |
88 » » // The main command that runs capture_archives on all workers. | 93 » workDir, err := ioutil.TempDir("", "swarming_work_") |
89 » » fmt.Sprintf("DISPLAY=:0 capture_archives --worker_num=%s --log_d ir=%s --log_id=%s --pageset_type=%s --local=%t;", util.WORKER_NUM_KEYWORD, util. GLogDir, *runID, *pagesetType, *master_common.Local)) | |
90 | |
91 » _, err := util.SSH(strings.Join(cmd, " "), util.Slaves, util.CAPTURE_ARC HIVES_TIMEOUT) | |
92 if err != nil { | 94 if err != nil { |
93 » » glog.Errorf("Error while running cmd %s: %s", cmd, err) | 95 » » glog.Errorf("Could not get temp dir: %s", err) |
94 return | 96 return |
95 } | 97 } |
98 s, err := swarming.NewSwarmingClient(workDir) | |
99 if err != nil { | |
100 glog.Errorf("Could not instantiate swarming client: %s", err) | |
101 return | |
102 } | |
103 defer s.Cleanup() | |
104 // Create isolated.gen.json files from tasks. | |
105 taskNames := []string{} | |
106 for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES _PER_SWARMING_BOT; i++ { | |
107 taskNames = append(taskNames, fmt.Sprintf("capture_archives_%d", i)) | |
108 } | |
109 genJSONs := []string{} | |
110 // Get path to isolate files. | |
111 _, currentFile, _, _ := runtime.Caller(0) | |
112 pathToIsolates := filepath.Join(filepath.Dir((filepath.Dir(filepath.Dir( filepath.Dir(currentFile))))), "isolates") | |
dogben
2016/05/18 18:59:14
nit: extra parens
rmistry
2016/05/19 13:23:18
Done.
| |
113 | |
114 for i, taskName := range taskNames { | |
115 extraArgs := map[string]string{ | |
116 "START_RANGE": strconv.Itoa(util.GetStartRange(i+1, MAX _PAGES_PER_SWARMING_BOT)), | |
117 "NUM": strconv.Itoa(MAX_PAGES_PER_SWARMING_BOT) , | |
118 "PAGESET_TYPE": *pagesetType, | |
119 } | |
120 genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates , util.CAPTURE_ARCHIVES_ISOLATE), s.WorkDir, "linux", taskName, extraArgs, []str ing{}) | |
121 if err != nil { | |
122 glog.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err) | |
123 return | |
124 } | |
125 genJSONs = append(genJSONs, genJSON) | |
126 } | |
127 // Empty the remote dir before the workers upload to it. | |
128 gs, err := util.NewGsUtil(nil) | |
129 if err != nil { | |
130 glog.Error(err) | |
131 return | |
132 } | |
133 gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.WEB_ARCHIVES_DIR _NAME, *pagesetType) | |
134 skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) | |
135 // Archive, trigger and collect swarming tasks. | |
136 if err := util.ArchiveTriggerCollectSwarmingTask(s, taskNames, genJSONs, 2*time.Hour, 1*time.Hour); err != nil { | |
137 glog.Errorf("Error encountered when swarming tasks: %s", err) | |
138 return | |
139 } | |
140 | |
96 *taskCompletedSuccessfully = true | 141 *taskCompletedSuccessfully = true |
97 } | 142 } |
OLD | NEW |