| Index: fuzzer/go/aggregator/aggregator.go
|
| diff --git a/fuzzer/go/aggregator/aggregator.go b/fuzzer/go/aggregator/aggregator.go
|
| index 9218669b5e31ccb333f412ac0e38e733220b0c0d..abb2f1f5715ddfd6b0647d26396a27024ce00132 100644
|
| --- a/fuzzer/go/aggregator/aggregator.go
|
| +++ b/fuzzer/go/aggregator/aggregator.go
|
| @@ -17,7 +17,8 @@ import (
|
| "github.com/skia-dev/glog"
|
| "go.skia.org/infra/fuzzer/go/common"
|
| "go.skia.org/infra/fuzzer/go/config"
|
| - "go.skia.org/infra/fuzzer/go/frontend/data"
|
| + "go.skia.org/infra/fuzzer/go/data"
|
| + "go.skia.org/infra/fuzzer/go/deduplicator"
|
| "go.skia.org/infra/go/exec"
|
| "go.skia.org/infra/go/fileutil"
|
| "go.skia.org/infra/go/metrics2"
|
| @@ -56,6 +57,9 @@ type Aggregator struct {
|
| forUpload chan uploadPackage
|
|
|
| forBugReporting chan bugReportingPackage
|
| +
|
| + deduplicator *deduplicator.Deduplicator
|
| +
|
| // The shutdown channels are used to signal shutdowns. There are two groups, to
|
| // allow for a softer, cleaner shutdown w/ minimal lost work.
|
| // Group A (monitoring) includes the scanning and the monitoring routine.
|
| @@ -70,11 +74,15 @@ type Aggregator struct {
|
| analysisCount int64
|
| uploadCount int64
|
| bugReportCount int64
|
| +
|
| + greyNames []string
|
| + badNames []string
|
| }
|
|
|
| const (
|
| - BAD_FUZZ = "bad"
|
| - GREY_FUZZ = "grey"
|
| + BAD_FUZZ = "bad"
|
| + GREY_FUZZ = "grey"
|
| + HANG_THRESHOLD = 100
|
| )
|
|
|
| var (
|
| @@ -102,7 +110,7 @@ type bugReportingPackage struct {
|
|
|
| // StartAggregator creates and starts a Aggregator.
|
| // If there is a problem starting up, an error is returned. Other errors will be logged.
|
| -func StartAggregator(s *storage.Client, category string) (*Aggregator, error) {
|
| +func StartAggregator(s *storage.Client, category string, startingReports <-chan data.FuzzReport) (*Aggregator, error) {
|
| b := Aggregator{
|
| Category: category,
|
| storageClient: s,
|
| @@ -113,10 +121,16 @@ func StartAggregator(s *storage.Client, category string) (*Aggregator, error) {
|
| forBugReporting: make(chan bugReportingPackage, 100),
|
| MakeBugOnBadFuzz: false,
|
| UploadGreyFuzzes: false,
|
| + deduplicator: deduplicator.New(),
|
| monitoringShutdown: make(chan bool, 2),
|
| // aggregationShutdown needs to be created with a calculated capacity in start
|
| }
|
|
|
| + // preload the duplicator
|
| + for report := range startingReports {
|
| + b.deduplicator.IsUnique(report)
|
| + }
|
| +
|
| return &b, b.start()
|
| }
|
|
|
| @@ -347,6 +361,7 @@ func (agg *Aggregator) waitForAnalysis(identifier int) {
|
| atomic.AddInt64(&agg.analysisCount, int64(1))
|
| err := agg.analysisHelper(executableDir, badFuzzPath)
|
| if err != nil {
|
| + atomic.AddInt64(&agg.analysisCount, int64(-1))
|
| glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.Category, identifier, err)
|
| return
|
| }
|
| @@ -516,10 +531,20 @@ func (agg *Aggregator) waitForUploads(identifier int) {
|
| atomic.AddInt64(&agg.uploadCount, int64(1))
|
| if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ {
|
| glog.Infof("[%s] Skipping upload of grey fuzz %s", agg.Category, p.Data.Name)
|
| + // We are skipping the bugReport, so increment the counts.
|
| + atomic.AddInt64(&agg.bugReportCount, int64(1))
|
| + continue
|
| + }
|
| + if p.FuzzType != GREY_FUZZ && !agg.deduplicator.IsUnique(data.ParseReport(p.Data)) {
|
| + glog.Infof("[%s] Skipping upload of duplicate fuzz %s", agg.Category, p.Data.Name)
|
| + // We are skipping the bugReport, so increment the counts.
|
| + atomic.AddInt64(&agg.bugReportCount, int64(1))
|
| continue
|
| }
|
| if err := agg.upload(p); err != nil {
|
| glog.Errorf("[%s] Uploader %d terminated due to error: %s", agg.Category, identifier, err)
|
| + // We are skipping the bugReport, so increment the counts.
|
| + atomic.AddInt64(&agg.bugReportCount, int64(1))
|
| return
|
| }
|
| agg.forBugReporting <- bugReportingPackage{
|
| @@ -538,6 +563,11 @@ func (agg *Aggregator) waitForUploads(identifier int) {
|
| // some helper methods.
|
| func (agg *Aggregator) upload(p uploadPackage) error {
|
| glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|%d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.Data.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data.Release.Dump), len(p.Data.Release.StdErr))
|
| + if p.FuzzType == GREY_FUZZ {
|
| + agg.greyNames = append(agg.greyNames, p.Data.Name)
|
| + } else {
|
| + agg.badNames = append(agg.badNames, p.Data.Name)
|
| + }
|
|
|
| if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != nil {
|
| return err
|
| @@ -669,6 +699,7 @@ func (agg *Aggregator) ShutDown() {
|
| // RestartAnalysis restarts the shut down aggregator. Anything that is in the scanning directory
|
| // should be cleared out, lest it be rescanned/analyzed.
|
| func (agg *Aggregator) RestartAnalysis() error {
|
| + agg.deduplicator.Clear()
|
| return agg.start()
|
| }
|
|
|
| @@ -684,6 +715,7 @@ func (agg *Aggregator) WaitForEmptyQueues() {
|
| }
|
| t := time.Tick(config.Aggregator.StatusPeriod)
|
| glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod)
|
| + hangCount := 0
|
| for _ = range t {
|
| a = len(agg.forAnalysis)
|
| u = len(agg.forUpload)
|
| @@ -693,8 +725,13 @@ func (agg *Aggregator) WaitForEmptyQueues() {
|
| if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount {
|
| break
|
| }
|
| + // This prevents waiting forever if an upload crashes, aborts or otherwise hangs.
|
| + hangCount++
|
| + if hangCount >= HANG_THRESHOLD {
|
| + glog.Warningf("Was waiting for %d rounds and still wasn't done. Quitting anyway.", hangCount)
|
| + }
|
| +
|
| glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod)
|
| -
|
| }
|
| }
|
|
|
| @@ -703,3 +740,12 @@ func (agg *Aggregator) WaitForEmptyQueues() {
|
| func (agg *Aggregator) ForceAnalysis(path string) {
|
| agg.forAnalysis <- path
|
| }
|
| +
|
| +func (agg *Aggregator) ClearUploadedFuzzNames() {
|
| + agg.greyNames = []string{}
|
| + agg.badNames = []string{}
|
| +}
|
| +
|
| +func (agg *Aggregator) UploadedFuzzNames() (bad, grey []string) {
|
| + return agg.badNames, agg.greyNames
|
| +}
|
|
|