Index: ct/go/master_scripts/capture_archives_on_workers/main.go |
diff --git a/ct/go/master_scripts/capture_archives_on_workers/main.go b/ct/go/master_scripts/capture_archives_on_workers/main.go |
index e08e5f592cb0b4b96d7da36fd3c140694d0cdbfd..766f96e3e001156dab7174cd7412cfef14cb466e 100644 |
--- a/ct/go/master_scripts/capture_archives_on_workers/main.go |
+++ b/ct/go/master_scripts/capture_archives_on_workers/main.go |
@@ -6,7 +6,11 @@ package main |
import ( |
"flag" |
"fmt" |
- "strings" |
+ "io/ioutil" |
+ "path" |
+ "path/filepath" |
+ "runtime" |
+ "strconv" |
"time" |
"github.com/skia-dev/glog" |
@@ -15,6 +19,7 @@ import ( |
"go.skia.org/infra/ct/go/master_scripts/master_common" |
"go.skia.org/infra/ct/go/util" |
"go.skia.org/infra/go/common" |
+ "go.skia.org/infra/go/swarming" |
skutil "go.skia.org/infra/go/util" |
) |
@@ -27,6 +32,10 @@ var ( |
taskCompletedSuccessfully = new(bool) |
) |
+const ( |
+ MAX_PAGES_PER_SWARMING_BOT = 10 |
+) |
+ |
func sendEmail(recipients []string) { |
// Send completion email. |
emailSubject := "Capture archives Cluster telemetry task has completed" |
@@ -71,10 +80,6 @@ func main() { |
// Ensure webapp is updated and completion email is sent even if task fails. |
defer updateWebappTask() |
defer sendEmail(emailsArr) |
- if !*master_common.Local { |
- // Cleanup tmp files after the run. |
- defer util.CleanTmpDir() |
- } |
// Finish with glog flush and how long the task took. |
defer util.TimeTrack(time.Now(), "Capture archives on Workers") |
defer glog.Flush() |
@@ -84,14 +89,54 @@ func main() { |
return |
} |
- cmd := append(master_common.WorkerSetupCmds(), |
- // The main command that runs capture_archives on all workers. |
- fmt.Sprintf("DISPLAY=:0 capture_archives --worker_num=%s --log_dir=%s --log_id=%s --pageset_type=%s --local=%t;", util.WORKER_NUM_KEYWORD, util.GLogDir, *runID, *pagesetType, *master_common.Local)) |
+ // Instantiate the swarming client. |
dogben
2016/05/18 18:59:14
Seems capture_archives and create_pagesets are ver
rmistry
2016/05/19 13:23:18
Done.
|
+ workDir, err := ioutil.TempDir("", "swarming_work_") |
+ if err != nil { |
+ glog.Errorf("Could not get temp dir: %s", err) |
+ return |
+ } |
+ s, err := swarming.NewSwarmingClient(workDir) |
+ if err != nil { |
+ glog.Errorf("Could not instantiate swarming client: %s", err) |
+ return |
+ } |
+ defer s.Cleanup() |
+ // Create isolated.gen.json files from tasks. |
+ taskNames := []string{} |
+ for i := 1; i <= util.PagesetTypeToInfo[*pagesetType].NumPages/MAX_PAGES_PER_SWARMING_BOT; i++ { |
+ taskNames = append(taskNames, fmt.Sprintf("capture_archives_%d", i)) |
+ } |
+ genJSONs := []string{} |
+ // Get path to isolate files. |
+ _, currentFile, _, _ := runtime.Caller(0) |
+ pathToIsolates := filepath.Join(filepath.Dir((filepath.Dir(filepath.Dir(filepath.Dir(currentFile))))), "isolates") |
dogben
2016/05/18 18:59:14
nit: extra parens
rmistry
2016/05/19 13:23:18
Done.
|
- _, err := util.SSH(strings.Join(cmd, " "), util.Slaves, util.CAPTURE_ARCHIVES_TIMEOUT) |
+ for i, taskName := range taskNames { |
+ extraArgs := map[string]string{ |
+ "START_RANGE": strconv.Itoa(util.GetStartRange(i+1, MAX_PAGES_PER_SWARMING_BOT)), |
+ "NUM": strconv.Itoa(MAX_PAGES_PER_SWARMING_BOT), |
+ "PAGESET_TYPE": *pagesetType, |
+ } |
+ genJSON, err := s.CreateIsolatedGenJSON(path.Join(pathToIsolates, util.CAPTURE_ARCHIVES_ISOLATE), s.WorkDir, "linux", taskName, extraArgs, []string{}) |
+ if err != nil { |
+ glog.Errorf("Could not create isolated.gen.json for task %s: %s", taskName, err) |
+ return |
+ } |
+ genJSONs = append(genJSONs, genJSON) |
+ } |
+ // Empty the remote dir before the workers upload to it. |
+ gs, err := util.NewGsUtil(nil) |
if err != nil { |
- glog.Errorf("Error while running cmd %s: %s", cmd, err) |
+ glog.Error(err) |
return |
} |
+ gsBaseDir := filepath.Join(util.SWARMING_DIR_NAME, util.WEB_ARCHIVES_DIR_NAME, *pagesetType) |
+ skutil.LogErr(gs.DeleteRemoteDir(gsBaseDir)) |
+ // Archive, trigger and collect swarming tasks. |
+ if err := util.ArchiveTriggerCollectSwarmingTask(s, taskNames, genJSONs, 2*time.Hour, 1*time.Hour); err != nil { |
+ glog.Errorf("Error encountered when swarming tasks: %s", err) |
+ return |
+ } |
+ |
*taskCompletedSuccessfully = true |
} |