Index: fuzzer/go/backend/version_updater.go |
diff --git a/fuzzer/go/backend/version_updater.go b/fuzzer/go/backend/version_updater.go |
index 0493f02049c94d04cf5fa66006de5db3c76f32b7..b883f83db360281a5c31942cc1a78e64d202490f 100644 |
--- a/fuzzer/go/backend/version_updater.go |
+++ b/fuzzer/go/backend/version_updater.go |
@@ -2,20 +2,16 @@ package backend |
import ( |
"fmt" |
- "io/ioutil" |
- "os" |
"path/filepath" |
"strings" |
- "sync" |
- "sync/atomic" |
"github.com/skia-dev/glog" |
"go.skia.org/infra/fuzzer/go/aggregator" |
"go.skia.org/infra/fuzzer/go/common" |
"go.skia.org/infra/fuzzer/go/config" |
"go.skia.org/infra/fuzzer/go/generator" |
- "go.skia.org/infra/go/fileutil" |
"go.skia.org/infra/go/gs" |
+ "go.skia.org/infra/go/util" |
"go.skia.org/infra/go/vcsinfo" |
"golang.org/x/net/context" |
"google.golang.org/cloud/storage" |
@@ -80,16 +76,16 @@ func (v *VersionUpdater) UpdateToNewSkiaVersion(newHash string) (*vcsinfo.LongCo |
func (p *FuzzPipeline) reanalyzeAndRestart(storageClient *storage.Client, oldRevision string) error { |
// download all bad and grey fuzzes |
- badFuzzNames, greyFuzzNames, err := p.downloadAllBadAndGreyFuzzes(oldRevision, storageClient) |
+ badFuzzPaths, greyFuzzPaths, err := p.downloadAllBadAndGreyFuzzes(oldRevision, storageClient) |
if err != nil { |
return fmt.Errorf("Problem downloading all previous fuzzes: %s", err) |
} |
- glog.Infof("There are %d bad fuzzes and %d grey fuzzes to rescan.", len(badFuzzNames), len(greyFuzzNames)) |
+ glog.Infof("There are %d bad fuzzes and %d grey fuzzes of category %s to rescan.", len(badFuzzPaths), len(greyFuzzPaths), p.Category) |
// This is a soft shutdown, i.e. it waits for aggregator's queues to be empty |
p.Agg.ShutDown() |
if config.Common.ForceReanalysis { |
- glog.Infof("Deleting previous fuzz results") |
+ glog.Infof("Deleting previous %s fuzz results", p.Category) |
if err := gs.DeleteAllFilesInDir(storageClient, config.GS.Bucket, fmt.Sprintf("%s/%s/", p.Category, oldRevision), config.Aggregator.NumUploadProcesses); err != nil { |
return fmt.Errorf("Could not delete previous fuzzes: %s", err) |
} |
@@ -102,23 +98,34 @@ func (p *FuzzPipeline) reanalyzeAndRestart(storageClient *storage.Client, oldRev |
if err := p.Agg.RestartAnalysis(); err != nil { |
return fmt.Errorf("Had problem restarting analysis/upload chain: %s", err) |
} |
+ // If we aren't reanalyzing, we should upload the names of anything that is currently there. |
+ // If we are reanalyzing, we should re-write the names after we analyze them (see below). |
+ if !config.Common.ForceReanalysis { |
+ p.uploadFuzzNames(storageClient, oldRevision, common.ExtractFuzzNamesFromPaths(badFuzzPaths), common.ExtractFuzzNamesFromPaths(greyFuzzPaths)) |
+ } |
// Reanalyze and reupload the fuzzes, making a bug on regressions. |
glog.Infof("Reanalyzing bad fuzzes") |
p.Agg.MakeBugOnBadFuzz = false |
p.Agg.UploadGreyFuzzes = true |
- for _, name := range badFuzzNames { |
+ p.Agg.ClearUploadedFuzzNames() |
+ for _, name := range badFuzzPaths { |
p.Agg.ForceAnalysis(name) |
} |
p.Agg.WaitForEmptyQueues() |
glog.Infof("Reanalyzing grey fuzzes") |
p.Agg.MakeBugOnBadFuzz = true |
- for _, name := range greyFuzzNames { |
+ for _, name := range greyFuzzPaths { |
p.Agg.ForceAnalysis(name) |
} |
p.Agg.WaitForEmptyQueues() |
p.Agg.MakeBugOnBadFuzz = false |
p.Agg.UploadGreyFuzzes = false |
- glog.Infof("Done reanalyzing") |
+ bad, grey := p.Agg.UploadedFuzzNames() |
+ glog.Infof("Done reanalyzing %s. Uploaded %d bad and %d grey fuzzes", p.Category, len(bad), len(grey)) |
+ |
+ if config.Common.ForceReanalysis { |
+ p.uploadFuzzNames(storageClient, oldRevision, bad, grey) |
+ } |
// redownload samples (in case any are new) |
if err := p.Gen.DownloadSeedFiles(storageClient); err != nil { |
@@ -128,79 +135,17 @@ func (p *FuzzPipeline) reanalyzeAndRestart(storageClient *storage.Client, oldRev |
return p.Gen.Start() |
} |
-// completedCounter is the number of fuzzes that have been downloaded from GCS, used for logging. |
-var completedCounter int32 |
- |
// downloadAllBadAndGreyFuzzes downloads just the fuzzes from a commit in GCS. It uses multiple |
// processes to do so and puts them in config.Aggregator.FuzzPath/[category]. |
-func (p *FuzzPipeline) downloadAllBadAndGreyFuzzes(commitHash string, storageClient *storage.Client) (badFuzzNames []string, greyFuzzNames []string, err error) { |
+func (p *FuzzPipeline) downloadAllBadAndGreyFuzzes(commitHash string, storageClient *storage.Client) (badFuzzPaths []string, greyFuzzPaths []string, err error) { |
downloadPath := filepath.Join(config.Aggregator.FuzzPath, p.Category) |
- toDownload := make(chan string, 100000) |
- completedCounter = 0 |
- |
- var wg sync.WaitGroup |
- for i := 0; i < config.Generator.NumDownloadProcesses; i++ { |
- wg.Add(1) |
- go download(storageClient, toDownload, downloadPath, &wg) |
- } |
- |
- badFilter := func(item *storage.ObjectAttrs) { |
- name := item.Name |
- if strings.Contains(name, ".") { |
- return |
- } |
- fuzzHash := name[strings.LastIndex(name, "/")+1:] |
- badFuzzNames = append(badFuzzNames, filepath.Join(downloadPath, fuzzHash)) |
- toDownload <- item.Name |
- } |
- |
- greyFilter := func(item *storage.ObjectAttrs) { |
- name := item.Name |
- if strings.Contains(name, ".") { |
- return |
- } |
- fuzzHash := name[strings.LastIndex(name, "/")+1:] |
- greyFuzzNames = append(greyFuzzNames, filepath.Join(downloadPath, fuzzHash)) |
- toDownload <- item.Name |
- } |
- |
- if err := gs.AllFilesInDir(storageClient, config.GS.Bucket, fmt.Sprintf("%s/%s/bad", p.Category, commitHash), badFilter); err != nil { |
- return nil, nil, fmt.Errorf("Problem getting bad fuzzes: %s", err) |
- } |
- |
- if err := gs.AllFilesInDir(storageClient, config.GS.Bucket, fmt.Sprintf("%s/%s/grey", p.Category, commitHash), greyFilter); err != nil { |
- return nil, nil, fmt.Errorf("Problem getting grey fuzzes: %s", err) |
- } |
- |
- close(toDownload) |
- wg.Wait() |
- return badFuzzNames, greyFuzzNames, nil |
-} |
- |
-// download starts a go routine that waits for files to download from Google Storage and downloads |
-// them to downloadPath. When it is done (on error or when the channel is closed), it signals to |
-// the WaitGroup that it is done. It also logs the progress on downloading the fuzzes. |
-func download(storageClient *storage.Client, toDownload <-chan string, downloadPath string, wg *sync.WaitGroup) { |
- defer wg.Done() |
- for file := range toDownload { |
- hash := file[strings.LastIndex(file, "/")+1:] |
- onDisk := filepath.Join(downloadPath, hash) |
- if !fileutil.FileExists(onDisk) { |
- contents, err := gs.FileContentsFromGS(storageClient, config.GS.Bucket, file) |
- if err != nil { |
- glog.Warningf("Problem downloading fuzz %s, continuing anyway: %s", file, err) |
- continue |
- } |
- if err = ioutil.WriteFile(onDisk, contents, 0644); err != nil && !os.IsExist(err) { |
- glog.Warningf("Problem writing fuzz to %s, continuing anyway: %s", onDisk, err) |
- } |
- } |
- atomic.AddInt32(&completedCounter, 1) |
- if completedCounter%100 == 0 { |
- glog.Infof("%d fuzzes downloaded", completedCounter) |
- } |
+ bad, err := common.DownloadAllFuzzes(storageClient, downloadPath, p.Category, commitHash, "bad", config.Generator.NumDownloadProcesses) |
+ if err != nil { |
+ return nil, nil, err |
} |
+ grey, err := common.DownloadAllFuzzes(storageClient, downloadPath, p.Category, commitHash, "grey", config.Generator.NumDownloadProcesses) |
+ return bad, grey, err |
} |
// replaceCurrentSkiaVersionWith puts the oldHash in skia_version/old and the newHash in |
@@ -227,3 +172,25 @@ func (v *VersionUpdater) touch(file string) error { |
} |
return nil |
} |
+ |
+// uploadFuzzNames creates two files in the /category/revision/ folder that contain all of the bad fuzz names and the grey fuzz names that are in this folder |
+func (p *FuzzPipeline) uploadFuzzNames(sc *storage.Client, oldRevision string, bad, grey []string) { |
+ uploadString := func(fileName, contents string) error { |
+ name := fmt.Sprintf("%s/%s/%s", p.Category, oldRevision, fileName) |
+ w := sc.Bucket(config.GS.Bucket).Object(name).NewWriter(context.Background()) |
+ defer util.Close(w) |
+ w.ObjectAttrs.ContentEncoding = "text/plain" |
+ |
+ if n, err := w.Write([]byte(contents)); err != nil { |
+ return fmt.Errorf("There was a problem uploading %s. Only uploaded %d bytes: %s", name, n, err) |
+ } |
+ return nil |
+ } |
+ |
+ if err := uploadString("bad_fuzz_names.txt", strings.Join(bad, "|")); err != nil { |
+ glog.Errorf("Problem uploading bad fuzz names: %s", err) |
+ } |
+ if err := uploadString("grey_fuzz_names.txt", strings.Join(grey, "|")); err != nil { |
+ glog.Errorf("Problem uploading grey fuzz names: %s", err) |
+ } |
+} |