Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(211)

Side by Side Diff: ct/go/master_scripts/capture_archives_on_workers/main.go

Issue 1988133002: Use swarming in capture_archives CT task and capture archives in parallel (Closed) Base URL: https://skia.googlesource.com/buildbot@ct-4-create_pagesets
Patch Set: Try 10 per machine and no parallelism Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | ct/go/util/constants.go » ('j') | ct/go/worker_scripts/capture_archives/main.go » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // capture_archives_on_workers is an application that captures archives on all C T 1 // capture_archives_on_workers is an application that captures archives on all C T
2 // workers and uploads it to Google Storage. The requester is emailed when the t ask 2 // workers and uploads it to Google Storage. The requester is emailed when the t ask
3 // is done. 3 // is done.
4 package main 4 package main
5 5
6 import ( 6 import (
7 "flag" 7 "flag"
8 "fmt" 8 "fmt"
9 » "strings" 9 » "io/ioutil"
10 » "path"
11 » "path/filepath"
12 » "runtime"
13 » "strconv"
10 "time" 14 "time"
11 15
12 "github.com/skia-dev/glog" 16 "github.com/skia-dev/glog"
13 "go.skia.org/infra/ct/go/ctfe/admin_tasks" 17 "go.skia.org/infra/ct/go/ctfe/admin_tasks"
14 "go.skia.org/infra/ct/go/frontend" 18 "go.skia.org/infra/ct/go/frontend"
15 "go.skia.org/infra/ct/go/master_scripts/master_common" 19 "go.skia.org/infra/ct/go/master_scripts/master_common"
16 "go.skia.org/infra/ct/go/util" 20 "go.skia.org/infra/ct/go/util"
17 "go.skia.org/infra/go/common" 21 "go.skia.org/infra/go/common"
22 "go.skia.org/infra/go/swarming"
18 skutil "go.skia.org/infra/go/util" 23 skutil "go.skia.org/infra/go/util"
19 ) 24 )
20 25
21 var ( 26 var (
22 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.") 27 emails = flag.String("emails", "", "The comma separated email addre sses to notify when the task is picked up and completes.")
23 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.") 28 gaeTaskID = flag.Int64("gae_task_id", -1, "The key of the App Engine t ask. This task will be updated when the task is completed.")
24 pagesetType = flag.String("pageset_type", "", "The type of pagesets to u se. Eg: 10k, Mobile10k, All.") 29 pagesetType = flag.String("pageset_type", "", "The type of pagesets to u se. Eg: 10k, Mobile10k, All.")
25 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).") 30 runID = flag.String("run_id", "", "The unique run id (typically re quester + timestamp).")
26 31
27 taskCompletedSuccessfully = new(bool) 32 taskCompletedSuccessfully = new(bool)
28 ) 33 )
29 34
35 const (
36 MAX_PAGES_PER_SWARMING_BOT = 10
37 )
38
30 func sendEmail(recipients []string) { 39 func sendEmail(recipients []string) {
31 // Send completion email. 40 // Send completion email.
32 emailSubject := "Capture archives Cluster telemetry task has completed" 41 emailSubject := "Capture archives Cluster telemetry task has completed"
33 failureHtml := "" 42 failureHtml := ""
34 if !*taskCompletedSuccessfully { 43 if !*taskCompletedSuccessfully {
35 emailSubject += " with failures" 44 emailSubject += " with failures"
36 failureHtml = util.GetFailureEmailHtml(*runID) 45 failureHtml = util.GetFailureEmailHtml(*runID)
37 } 46 }
38 bodyTemplate := ` 47 bodyTemplate := `
39 The Cluster telemetry queued task to capture archives of %s pagesets has completed.<br/> 48 The Cluster telemetry queued task to capture archives of %s pagesets has completed.<br/>
(...skipping 24 matching lines...) Expand all
64 emailsArr = append(emailsArr, util.CtAdmins...) 73 emailsArr = append(emailsArr, util.CtAdmins...)
65 if len(emailsArr) == 0 { 74 if len(emailsArr) == 0 {
66 glog.Error("At least one email address must be specified") 75 glog.Error("At least one email address must be specified")
67 return 76 return
68 } 77 }
69 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateW ebpageArchivesUpdateVars{}, *gaeTaskID)) 78 skutil.LogErr(frontend.UpdateWebappTaskSetStarted(&admin_tasks.RecreateW ebpageArchivesUpdateVars{}, *gaeTaskID))
70 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Capture archives", *ru nID, "")) 79 skutil.LogErr(util.SendTaskStartEmail(emailsArr, "Capture archives", *ru nID, ""))
71 // Ensure webapp is updated and completion email is sent even if task fa ils. 80 // Ensure webapp is updated and completion email is sent even if task fa ils.
72 defer updateWebappTask() 81 defer updateWebappTask()
73 defer sendEmail(emailsArr) 82 defer sendEmail(emailsArr)
74 if !*master_common.Local {
75 // Cleanup tmp files after the run.
76 defer util.CleanTmpDir()
77 }
78 // Finish with glog flush and how long the task took. 83 // Finish with glog flush and how long the task took.
79 defer util.TimeTrack(time.Now(), "Capture archives on Workers") 84 defer util.TimeTrack(time.Now(), "Capture archives on Workers")
80 defer glog.Flush() 85 defer glog.Flush()
81 86
82 if *pagesetType == "" { 87 if *pagesetType == "" {
83 glog.Error("Must specify --pageset_type") 88 glog.Error("Must specify --pageset_type")
84 return 89 return
85 } 90 }
86 91
87 » cmd := append(master_common.WorkerSetupCmds(), 92 » // Instantiate the swarming client.
dogben 2016/05/18 18:59:14 Seems capture_archives and create_pagesets are ver
rmistry 2016/05/19 13:23:18 Done.
88 » » // The main command that runs capture_archives on all workers. 93 » workDir, err := ioutil.TempDir("", "swarming_work_")
89 » » fmt.Sprintf("DISPLAY=:0 capture_archives --worker_num=%s --log_d ir=%s --log_id=%s --pageset_type=%s --local=%t;", util.WORKER_NUM_KEYWORD, util. GLogDir, *runID, *pagesetType, *master_common.Local))
90
91 » _, err := util.SSH(strings.Join(cmd, " "), util.Slaves, util.CAPTURE_ARC HIVES_TIMEOUT)
92 if err != nil { 94 if err != nil {
93 » » glog.Errorf("Error while running cmd %s: %s", cmd, err) 95 » » glog.Errorf("Could not get temp dir: %s", err)
94 return 96 return
95 } 97 }
98 s, err := swarming.NewSwarmingClient(workDir)
99 if err != nil {
100 glog.Errorf("Could not instantiate swarming client: %s", err)
101 return
102 }
103 defer s.Cleanup()
104 // Create isolated.gen.json files from tasks.
105 taskNames := []string{}
106 for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES _PER_SWARMING_BOT; i++ {
107 taskNames = append(taskNames, fmt.Sprintf("capture_archives_%d", i))
108 }
109 genJSONs := []string{}
110 // Get path to isolate files.
111 _, currentFile, _, _ := runtime.Caller(0)
112 pathToIsolates := filepath.Join(filepath.Dir((filepath.Dir(filepath.Dir( filepath.Dir(currentFile))))), "isolates")
dogben 2016/05/18 18:59:14 nit: extra parens
rmistry 2016/05/19 13:23:18 Done.
113
114 for i, taskName := range taskNames {
115 extraArgs := map[string]string{
116 "START_RANGE": strconv.Itoa(util.GetStartRange(i+1, MAX _PAGES_PER_SWARMING_BOT)),
117 "NUM": strconv.Itoa(MAX_PAGES_PER_SWARMING_BOT) ,
118 "PAGESET_TYPE": *pagesetType,
119 }
120 genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates , util.CAPTURE_ARCHIVES_ISOLATE), s.WorkDir, "linux", taskName, extraArgs, []str ing{})
121 if err != nil {
122 glog.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err)
123 return
124 }
125 genJSONs = append(genJSONs, genJSON)
126 }
127 // Empty the remote dir before the workers upload to it.
128 gs, err := util.NewGsUtil(nil)
129 if err != nil {
130 glog.Error(err)
131 return
132 }
133 gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.WEB_ARCHIVES_DIR _NAME, *pagesetType)
134 skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir))
135 // Archive, trigger and collect swarming tasks.
136 if err := util.ArchiveTriggerCollectSwarmingTask(s, taskNames, genJSONs, 2*time.Hour, 1*time.Hour); err != nil {
137 glog.Errorf("Error encountered when swarming tasks: %s", err)
138 return
139 }
140
96 *taskCompletedSuccessfully = true 141 *taskCompletedSuccessfully = true
97 } 142 }
OLDNEW
« no previous file with comments | « no previous file | ct/go/util/constants.go » ('j') | ct/go/worker_scripts/capture_archives/main.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698