Index: fuzzer/go/frontend/gsloader/gsloader.go |
diff --git a/fuzzer/go/frontend/gsloader/gsloader.go b/fuzzer/go/frontend/gsloader/gsloader.go |
index b8393ae62916ecdfc9ef3229c66661ab19b2a203..d1a42ac97ee89c735f96700840f8ec6d0576452d 100644 |
--- a/fuzzer/go/frontend/gsloader/gsloader.go |
+++ b/fuzzer/go/frontend/gsloader/gsloader.go |
@@ -3,16 +3,13 @@ package gsloader |
import ( |
"fmt" |
"sort" |
- "sync" |
- "sync/atomic" |
"github.com/skia-dev/glog" |
"go.skia.org/infra/fuzzer/go/common" |
"go.skia.org/infra/fuzzer/go/config" |
- "go.skia.org/infra/fuzzer/go/deduplicator" |
- "go.skia.org/infra/fuzzer/go/frontend/data" |
+ "go.skia.org/infra/fuzzer/go/data" |
"go.skia.org/infra/fuzzer/go/fuzzcache" |
- "go.skia.org/infra/go/gs" |
+ fstorage "go.skia.org/infra/fuzzer/go/storage" |
"google.golang.org/cloud/storage" |
) |
@@ -28,7 +25,7 @@ func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error { |
return fmt.Errorf("Problem decoding existing from bolt db: %s", err) |
} else { |
data.SetStaging(category, *staging) |
- glog.Infof("Successfully loaded %s fuzzes from bolt db cache", category) |
+ glog.Infof("Successfully loaded %s fuzzes from bolt db cache with %d files", category, len(*staging)) |
} |
} |
data.StagingToCurrent() |
@@ -39,10 +36,6 @@ func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error { |
type GSLoader struct { |
storageClient *storage.Client |
Cache *fuzzcache.FuzzReportCache |
- deduplicator *deduplicator.Deduplicator |
- |
- // completedCounter is the number of fuzzes that have been downloaded from GCS, used for logging. |
- completedCounter int32 |
} |
// New creates a GSLoader and returns it. |
@@ -50,7 +43,6 @@ func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader { |
return &GSLoader{ |
storageClient: s, |
Cache: c, |
- deduplicator: deduplicator.New(), |
} |
} |
@@ -62,30 +54,24 @@ func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader { |
func (g *GSLoader) LoadFreshFromGoogleStorage() error { |
revision := config.FrontEnd.SkiaVersion.Hash |
data.ClearStaging() |
- g.deduplicator.Clear() |
fuzzNames := make([]string, 0, 100) |
+ |
for _, cat := range common.FUZZ_CATEGORIES { |
badPath := fmt.Sprintf("%s/%s/bad", cat, revision) |
- reports, err := g.getBinaryReportsFromGS(badPath, cat, nil) |
+ reports, err := fstorage.GetReportsFromGS(g.storageClient, badPath, cat, nil, config.FrontEnd.NumDownloadProcesses) |
if err != nil { |
return err |
} |
b := 0 |
- d := 0 |
for report := range reports { |
- // We always add the fuzzName, to avoid redownloading duplicates over and over again. |
fuzzNames = append(fuzzNames, report.FuzzName) |
- if g.deduplicator.IsUnique(report) { |
- data.NewFuzzFound(cat, report) |
- b++ |
- } else { |
- d++ |
- } |
- |
+ data.NewFuzzFound(cat, report) |
+ b++ |
} |
- glog.Infof("%d bad fuzzes (%d duplicate) freshly loaded from gs://%s/%s", b, d, config.GS.Bucket, badPath) |
- data.StagingToCurrent() |
+ glog.Infof("%d bad fuzzes freshly loaded from gs://%s/%s", b, config.GS.Bucket, badPath) |
} |
+ // We must wait until after all the fuzzes are in staging, otherwise, we'll only have a partial update |
+ data.StagingToCurrent() |
for _, category := range common.FUZZ_CATEGORIES { |
if err := g.Cache.StoreTree(data.StagingCopy(category), category, revision); err != nil { |
@@ -95,10 +81,10 @@ func (g *GSLoader) LoadFreshFromGoogleStorage() error { |
return g.Cache.StoreFuzzNames(fuzzNames, revision) |
} |
-// LoadBinaryFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given whitelist |
+// LoadFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given whitelist |
// and loads them into memory (as staging). After loading them, it updates the cache |
// and moves them from staging to the current copy. |
-func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error { |
+func (g *GSLoader) LoadFuzzesFromGoogleStorage(whitelist []string) error { |
revision := config.FrontEnd.SkiaVersion.Hash |
data.StagingFromCurrent() |
sort.Strings(whitelist) |
@@ -106,24 +92,19 @@ func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error { |
fuzzNames := make([]string, 0, 100) |
for _, cat := range common.FUZZ_CATEGORIES { |
badPath := fmt.Sprintf("%s/%s/bad", cat, revision) |
- reports, err := g.getBinaryReportsFromGS(badPath, cat, whitelist) |
+ reports, err := fstorage.GetReportsFromGS(g.storageClient, badPath, cat, whitelist, config.FrontEnd.NumDownloadProcesses) |
if err != nil { |
return err |
} |
b := 0 |
- d := 0 |
for report := range reports { |
- // We always add the fuzzName, to avoid redownloading duplicates over and over again. |
fuzzNames = append(fuzzNames, report.FuzzName) |
- if g.deduplicator.IsUnique(report) { |
- data.NewFuzzFound(cat, report) |
- b++ |
- } else { |
- d++ |
- } |
+ data.NewFuzzFound(cat, report) |
+ b++ |
} |
- glog.Infof("%d bad fuzzes (%d duplicate) incrementally loaded from gs://%s/%s", b, d, config.GS.Bucket, badPath) |
+ glog.Infof("%d bad fuzzes incrementally loaded from gs://%s/%s", b, config.GS.Bucket, badPath) |
} |
+ // We must wait until after all the fuzzes are in staging, otherwise, we'll only have a partial update |
data.StagingToCurrent() |
oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision) |
@@ -138,124 +119,3 @@ func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error { |
} |
return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...), revision) |
} |
- |
-// A fuzzPackage contains all the information about a fuzz, mostly the paths to the files that |
-// need to be downloaded. |
-type fuzzPackage struct { |
- FuzzName string |
- FuzzCategory string |
- DebugASANName string |
- DebugDumpName string |
- DebugErrName string |
- ReleaseASANName string |
- ReleaseDumpName string |
- ReleaseErrName string |
-} |
- |
-// getBinaryReportsFromGS pulls all files in baseFolder from the skia-fuzzer bucket and |
-// groups them by fuzz. It parses these groups of files into a BinaryFuzzReport and returns |
-// a channel through whcih all reports generated in this way will be streamed. |
-// The channel will be closed when all reports are done being sent. |
-func (g *GSLoader) getBinaryReportsFromGS(baseFolder, category string, whitelist []string) (<-chan data.FuzzReport, error) { |
- reports := make(chan data.FuzzReport, 10000) |
- |
- fuzzPackages, err := g.fetchFuzzPackages(baseFolder, category) |
- if err != nil { |
- close(reports) |
- return reports, err |
- } |
- |
- toDownload := make(chan fuzzPackage, len(fuzzPackages)) |
- g.completedCounter = 0 |
- |
- var wg sync.WaitGroup |
- for i := 0; i < config.FrontEnd.NumDownloadProcesses; i++ { |
- wg.Add(1) |
- go g.download(toDownload, reports, &wg) |
- } |
- |
- for _, d := range fuzzPackages { |
- if whitelist != nil { |
- name := d.FuzzName |
- if i := sort.SearchStrings(whitelist, name); i < len(whitelist) && whitelist[i] == name { |
- // is on the whitelist |
- toDownload <- d |
- } |
- } else { |
- // no white list |
- toDownload <- d |
- } |
- } |
- close(toDownload) |
- |
- go func() { |
- wg.Wait() |
- close(reports) |
- }() |
- |
- return reports, nil |
-} |
- |
-// fetchFuzzPackages scans for all fuzzes in the given folder and returns a |
-// slice of all of the metadata for each fuzz, as a fuzz package. It returns |
-// error if it cannot access Google Storage. |
-func (g *GSLoader) fetchFuzzPackages(baseFolder, category string) (fuzzPackages []fuzzPackage, err error) { |
- |
- fuzzNames, err := common.GetAllFuzzNamesInFolder(g.storageClient, baseFolder) |
- if err != nil { |
- return nil, fmt.Errorf("Problem getting fuzz packages from %s: %s", baseFolder, err) |
- } |
- for _, fuzzName := range fuzzNames { |
- prefix := fmt.Sprintf("%s/%s/%s", baseFolder, fuzzName, fuzzName) |
- fuzzPackages = append(fuzzPackages, fuzzPackage{ |
- FuzzName: fuzzName, |
- FuzzCategory: category, |
- DebugASANName: fmt.Sprintf("%s_debug.asan", prefix), |
- DebugDumpName: fmt.Sprintf("%s_debug.dump", prefix), |
- DebugErrName: fmt.Sprintf("%s_debug.err", prefix), |
- ReleaseASANName: fmt.Sprintf("%s_release.asan", prefix), |
- ReleaseDumpName: fmt.Sprintf("%s_release.dump", prefix), |
- ReleaseErrName: fmt.Sprintf("%s_release.err", prefix), |
- }) |
- } |
- return fuzzPackages, nil |
-} |
- |
-// emptyStringOnError returns a string of the passed in bytes or empty string if err is nil. |
-func emptyStringOnError(b []byte, err error) string { |
- if err != nil { |
- glog.Warningf("Ignoring error when fetching file contents: %v", err) |
- return "" |
- } |
- return string(b) |
-} |
- |
-// download waits for fuzzPackages to appear on the toDownload channel and then downloads |
-// the four pieces of the package. It then parses them into a BinaryFuzzReport and sends |
-// the binary to the passed in channel. When there is no more work to be done, this function. |
-// returns and writes out true to the done channel. |
-func (g *GSLoader) download(toDownload <-chan fuzzPackage, reports chan<- data.FuzzReport, wg *sync.WaitGroup) { |
- defer wg.Done() |
- for job := range toDownload { |
- p := data.GCSPackage{ |
- Name: job.FuzzName, |
- FuzzCategory: job.FuzzCategory, |
- Debug: data.OutputFiles{ |
- Asan: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.DebugASANName)), |
- Dump: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.DebugDumpName)), |
- StdErr: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.DebugErrName)), |
- }, |
- Release: data.OutputFiles{ |
- Asan: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.ReleaseASANName)), |
- Dump: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.ReleaseDumpName)), |
- StdErr: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.ReleaseErrName)), |
- }, |
- } |
- |
- reports <- data.ParseReport(p) |
- atomic.AddInt32(&g.completedCounter, 1) |
- if g.completedCounter%100 == 0 { |
- glog.Infof("%d fuzzes downloaded", g.completedCounter) |
- } |
- } |
-} |