| Index: fuzzer/go/frontend/gsloader/gsloader.go
|
| diff --git a/fuzzer/go/frontend/gsloader/gsloader.go b/fuzzer/go/frontend/gsloader/gsloader.go
|
| index b8393ae62916ecdfc9ef3229c66661ab19b2a203..d1a42ac97ee89c735f96700840f8ec6d0576452d 100644
|
| --- a/fuzzer/go/frontend/gsloader/gsloader.go
|
| +++ b/fuzzer/go/frontend/gsloader/gsloader.go
|
| @@ -3,16 +3,13 @@ package gsloader
|
| import (
|
| "fmt"
|
| "sort"
|
| - "sync"
|
| - "sync/atomic"
|
|
|
| "github.com/skia-dev/glog"
|
| "go.skia.org/infra/fuzzer/go/common"
|
| "go.skia.org/infra/fuzzer/go/config"
|
| - "go.skia.org/infra/fuzzer/go/deduplicator"
|
| - "go.skia.org/infra/fuzzer/go/frontend/data"
|
| + "go.skia.org/infra/fuzzer/go/data"
|
| "go.skia.org/infra/fuzzer/go/fuzzcache"
|
| - "go.skia.org/infra/go/gs"
|
| + fstorage "go.skia.org/infra/fuzzer/go/storage"
|
| "google.golang.org/cloud/storage"
|
| )
|
|
|
| @@ -28,7 +25,7 @@ func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error {
|
| return fmt.Errorf("Problem decoding existing from bolt db: %s", err)
|
| } else {
|
| data.SetStaging(category, *staging)
|
| - glog.Infof("Successfully loaded %s fuzzes from bolt db cache", category)
|
| + glog.Infof("Successfully loaded %s fuzzes from bolt db cache with %d files", category, len(*staging))
|
| }
|
| }
|
| data.StagingToCurrent()
|
| @@ -39,10 +36,6 @@ func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error {
|
| type GSLoader struct {
|
| storageClient *storage.Client
|
| Cache *fuzzcache.FuzzReportCache
|
| - deduplicator *deduplicator.Deduplicator
|
| -
|
| - // completedCounter is the number of fuzzes that have been downloaded from GCS, used for logging.
|
| - completedCounter int32
|
| }
|
|
|
| // New creates a GSLoader and returns it.
|
| @@ -50,7 +43,6 @@ func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader {
|
| return &GSLoader{
|
| storageClient: s,
|
| Cache: c,
|
| - deduplicator: deduplicator.New(),
|
| }
|
| }
|
|
|
| @@ -62,30 +54,24 @@ func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader {
|
| func (g *GSLoader) LoadFreshFromGoogleStorage() error {
|
| revision := config.FrontEnd.SkiaVersion.Hash
|
| data.ClearStaging()
|
| - g.deduplicator.Clear()
|
| fuzzNames := make([]string, 0, 100)
|
| +
|
| for _, cat := range common.FUZZ_CATEGORIES {
|
| badPath := fmt.Sprintf("%s/%s/bad", cat, revision)
|
| - reports, err := g.getBinaryReportsFromGS(badPath, cat, nil)
|
| + reports, err := fstorage.GetReportsFromGS(g.storageClient, badPath, cat, nil, config.FrontEnd.NumDownloadProcesses)
|
| if err != nil {
|
| return err
|
| }
|
| b := 0
|
| - d := 0
|
| for report := range reports {
|
| - // We always add the fuzzName, to avoid redownloading duplicates over and over again.
|
| fuzzNames = append(fuzzNames, report.FuzzName)
|
| - if g.deduplicator.IsUnique(report) {
|
| - data.NewFuzzFound(cat, report)
|
| - b++
|
| - } else {
|
| - d++
|
| - }
|
| -
|
| + data.NewFuzzFound(cat, report)
|
| + b++
|
| }
|
| - glog.Infof("%d bad fuzzes (%d duplicate) freshly loaded from gs://%s/%s", b, d, config.GS.Bucket, badPath)
|
| - data.StagingToCurrent()
|
| + glog.Infof("%d bad fuzzes freshly loaded from gs://%s/%s", b, config.GS.Bucket, badPath)
|
| }
|
| + // We must wait until after all the fuzzes are in staging, otherwise, we'll only have a partial update
|
| + data.StagingToCurrent()
|
|
|
| for _, category := range common.FUZZ_CATEGORIES {
|
| if err := g.Cache.StoreTree(data.StagingCopy(category), category, revision); err != nil {
|
| @@ -95,10 +81,10 @@ func (g *GSLoader) LoadFreshFromGoogleStorage() error {
|
| return g.Cache.StoreFuzzNames(fuzzNames, revision)
|
| }
|
|
|
| -// LoadBinaryFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given whitelist
|
| +// LoadFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given whitelist
|
| // and loads them into memory (as staging). After loading them, it updates the cache
|
| // and moves them from staging to the current copy.
|
| -func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error {
|
| +func (g *GSLoader) LoadFuzzesFromGoogleStorage(whitelist []string) error {
|
| revision := config.FrontEnd.SkiaVersion.Hash
|
| data.StagingFromCurrent()
|
| sort.Strings(whitelist)
|
| @@ -106,24 +92,19 @@ func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error {
|
| fuzzNames := make([]string, 0, 100)
|
| for _, cat := range common.FUZZ_CATEGORIES {
|
| badPath := fmt.Sprintf("%s/%s/bad", cat, revision)
|
| - reports, err := g.getBinaryReportsFromGS(badPath, cat, whitelist)
|
| + reports, err := fstorage.GetReportsFromGS(g.storageClient, badPath, cat, whitelist, config.FrontEnd.NumDownloadProcesses)
|
| if err != nil {
|
| return err
|
| }
|
| b := 0
|
| - d := 0
|
| for report := range reports {
|
| - // We always add the fuzzName, to avoid redownloading duplicates over and over again.
|
| fuzzNames = append(fuzzNames, report.FuzzName)
|
| - if g.deduplicator.IsUnique(report) {
|
| - data.NewFuzzFound(cat, report)
|
| - b++
|
| - } else {
|
| - d++
|
| - }
|
| + data.NewFuzzFound(cat, report)
|
| + b++
|
| }
|
| - glog.Infof("%d bad fuzzes (%d duplicate) incrementally loaded from gs://%s/%s", b, d, config.GS.Bucket, badPath)
|
| + glog.Infof("%d bad fuzzes incrementally loaded from gs://%s/%s", b, config.GS.Bucket, badPath)
|
| }
|
| + // We must wait until after all the fuzzes are in staging, otherwise, we'll only have a partial update
|
| data.StagingToCurrent()
|
|
|
| oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision)
|
| @@ -138,124 +119,3 @@ func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error {
|
| }
|
| return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...), revision)
|
| }
|
| -
|
| -// A fuzzPackage contains all the information about a fuzz, mostly the paths to the files that
|
| -// need to be downloaded.
|
| -type fuzzPackage struct {
|
| - FuzzName string
|
| - FuzzCategory string
|
| - DebugASANName string
|
| - DebugDumpName string
|
| - DebugErrName string
|
| - ReleaseASANName string
|
| - ReleaseDumpName string
|
| - ReleaseErrName string
|
| -}
|
| -
|
| -// getBinaryReportsFromGS pulls all files in baseFolder from the skia-fuzzer bucket and
|
| -// groups them by fuzz. It parses these groups of files into a BinaryFuzzReport and returns
|
| -// a channel through whcih all reports generated in this way will be streamed.
|
| -// The channel will be closed when all reports are done being sent.
|
| -func (g *GSLoader) getBinaryReportsFromGS(baseFolder, category string, whitelist []string) (<-chan data.FuzzReport, error) {
|
| - reports := make(chan data.FuzzReport, 10000)
|
| -
|
| - fuzzPackages, err := g.fetchFuzzPackages(baseFolder, category)
|
| - if err != nil {
|
| - close(reports)
|
| - return reports, err
|
| - }
|
| -
|
| - toDownload := make(chan fuzzPackage, len(fuzzPackages))
|
| - g.completedCounter = 0
|
| -
|
| - var wg sync.WaitGroup
|
| - for i := 0; i < config.FrontEnd.NumDownloadProcesses; i++ {
|
| - wg.Add(1)
|
| - go g.download(toDownload, reports, &wg)
|
| - }
|
| -
|
| - for _, d := range fuzzPackages {
|
| - if whitelist != nil {
|
| - name := d.FuzzName
|
| - if i := sort.SearchStrings(whitelist, name); i < len(whitelist) && whitelist[i] == name {
|
| - // is on the whitelist
|
| - toDownload <- d
|
| - }
|
| - } else {
|
| - // no white list
|
| - toDownload <- d
|
| - }
|
| - }
|
| - close(toDownload)
|
| -
|
| - go func() {
|
| - wg.Wait()
|
| - close(reports)
|
| - }()
|
| -
|
| - return reports, nil
|
| -}
|
| -
|
| -// fetchFuzzPackages scans for all fuzzes in the given folder and returns a
|
| -// slice of all of the metadata for each fuzz, as a fuzz package. It returns
|
| -// error if it cannot access Google Storage.
|
| -func (g *GSLoader) fetchFuzzPackages(baseFolder, category string) (fuzzPackages []fuzzPackage, err error) {
|
| -
|
| - fuzzNames, err := common.GetAllFuzzNamesInFolder(g.storageClient, baseFolder)
|
| - if err != nil {
|
| - return nil, fmt.Errorf("Problem getting fuzz packages from %s: %s", baseFolder, err)
|
| - }
|
| - for _, fuzzName := range fuzzNames {
|
| - prefix := fmt.Sprintf("%s/%s/%s", baseFolder, fuzzName, fuzzName)
|
| - fuzzPackages = append(fuzzPackages, fuzzPackage{
|
| - FuzzName: fuzzName,
|
| - FuzzCategory: category,
|
| - DebugASANName: fmt.Sprintf("%s_debug.asan", prefix),
|
| - DebugDumpName: fmt.Sprintf("%s_debug.dump", prefix),
|
| - DebugErrName: fmt.Sprintf("%s_debug.err", prefix),
|
| - ReleaseASANName: fmt.Sprintf("%s_release.asan", prefix),
|
| - ReleaseDumpName: fmt.Sprintf("%s_release.dump", prefix),
|
| - ReleaseErrName: fmt.Sprintf("%s_release.err", prefix),
|
| - })
|
| - }
|
| - return fuzzPackages, nil
|
| -}
|
| -
|
| -// emptyStringOnError returns a string of the passed in bytes or empty string if err is nil.
|
| -func emptyStringOnError(b []byte, err error) string {
|
| - if err != nil {
|
| - glog.Warningf("Ignoring error when fetching file contents: %v", err)
|
| - return ""
|
| - }
|
| - return string(b)
|
| -}
|
| -
|
| -// download waits for fuzzPackages to appear on the toDownload channel and then downloads
|
| -// the four pieces of the package. It then parses them into a BinaryFuzzReport and sends
|
| -// the binary to the passed in channel. When there is no more work to be done, this function.
|
| -// returns and writes out true to the done channel.
|
| -func (g *GSLoader) download(toDownload <-chan fuzzPackage, reports chan<- data.FuzzReport, wg *sync.WaitGroup) {
|
| - defer wg.Done()
|
| - for job := range toDownload {
|
| - p := data.GCSPackage{
|
| - Name: job.FuzzName,
|
| - FuzzCategory: job.FuzzCategory,
|
| - Debug: data.OutputFiles{
|
| - Asan: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.DebugASANName)),
|
| - Dump: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.DebugDumpName)),
|
| - StdErr: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.DebugErrName)),
|
| - },
|
| - Release: data.OutputFiles{
|
| - Asan: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.ReleaseASANName)),
|
| - Dump: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.ReleaseDumpName)),
|
| - StdErr: emptyStringOnError(gs.FileContentsFromGS(g.storageClient, config.GS.Bucket, job.ReleaseErrName)),
|
| - },
|
| - }
|
| -
|
| - reports <- data.ParseReport(p)
|
| - atomic.AddInt32(&g.completedCounter, 1)
|
| - if g.completedCounter%100 == 0 {
|
| - glog.Infof("%d fuzzes downloaded", g.completedCounter)
|
| - }
|
| - }
|
| -}
|
|
|