Index: fuzzer/go/aggregator/aggregator.go |
diff --git a/fuzzer/go/aggregator/aggregator.go b/fuzzer/go/aggregator/aggregator.go |
index 9218669b5e31ccb333f412ac0e38e733220b0c0d..abb2f1f5715ddfd6b0647d26396a27024ce00132 100644 |
--- a/fuzzer/go/aggregator/aggregator.go |
+++ b/fuzzer/go/aggregator/aggregator.go |
@@ -17,7 +17,8 @@ import ( |
"github.com/skia-dev/glog" |
"go.skia.org/infra/fuzzer/go/common" |
"go.skia.org/infra/fuzzer/go/config" |
- "go.skia.org/infra/fuzzer/go/frontend/data" |
+ "go.skia.org/infra/fuzzer/go/data" |
+ "go.skia.org/infra/fuzzer/go/deduplicator" |
"go.skia.org/infra/go/exec" |
"go.skia.org/infra/go/fileutil" |
"go.skia.org/infra/go/metrics2" |
@@ -56,6 +57,9 @@ type Aggregator struct { |
forUpload chan uploadPackage |
forBugReporting chan bugReportingPackage |
+ |
+ deduplicator *deduplicator.Deduplicator |
+ |
// The shutdown channels are used to signal shutdowns. There are two groups, to |
// allow for a softer, cleaner shutdown w/ minimal lost work. |
// Group A (monitoring) includes the scanning and the monitoring routine. |
@@ -70,11 +74,15 @@ type Aggregator struct { |
analysisCount int64 |
uploadCount int64 |
bugReportCount int64 |
+ |
+ greyNames []string |
+ badNames []string |
} |
const ( |
- BAD_FUZZ = "bad" |
- GREY_FUZZ = "grey" |
+ BAD_FUZZ = "bad" |
+ GREY_FUZZ = "grey" |
+ HANG_THRESHOLD = 100 |
) |
var ( |
@@ -102,7 +110,7 @@ type bugReportingPackage struct { |
// StartAggregator creates and starts a Aggregator. |
// If there is a problem starting up, an error is returned. Other errors will be logged. |
-func StartAggregator(s *storage.Client, category string) (*Aggregator, error) { |
+func StartAggregator(s *storage.Client, category string, startingReports <-chan data.FuzzReport) (*Aggregator, error) { |
b := Aggregator{ |
Category: category, |
storageClient: s, |
@@ -113,10 +121,16 @@ func StartAggregator(s *storage.Client, category string) (*Aggregator, error) { |
forBugReporting: make(chan bugReportingPackage, 100), |
MakeBugOnBadFuzz: false, |
UploadGreyFuzzes: false, |
+ deduplicator: deduplicator.New(), |
monitoringShutdown: make(chan bool, 2), |
// aggregationShutdown needs to be created with a calculated capacity in start |
} |
+ // preload the duplicator |
+ for report := range startingReports { |
+ b.deduplicator.IsUnique(report) |
+ } |
+ |
return &b, b.start() |
} |
@@ -347,6 +361,7 @@ func (agg *Aggregator) waitForAnalysis(identifier int) { |
atomic.AddInt64(&agg.analysisCount, int64(1)) |
err := agg.analysisHelper(executableDir, badFuzzPath) |
if err != nil { |
+ atomic.AddInt64(&agg.analysisCount, int64(-1)) |
glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.Category, identifier, err) |
return |
} |
@@ -516,10 +531,20 @@ func (agg *Aggregator) waitForUploads(identifier int) { |
atomic.AddInt64(&agg.uploadCount, int64(1)) |
if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { |
glog.Infof("[%s] Skipping upload of grey fuzz %s", agg.Category, p.Data.Name) |
+ // We are skipping the bugReport, so increment the counts. |
+ atomic.AddInt64(&agg.bugReportCount, int64(1)) |
+ continue |
+ } |
+ if p.FuzzType != GREY_FUZZ && !agg.deduplicator.IsUnique(data.ParseReport(p.Data)) { |
+ glog.Infof("[%s] Skipping upload of duplicate fuzz %s", agg.Category, p.Data.Name) |
+ // We are skipping the bugReport, so increment the counts. |
+ atomic.AddInt64(&agg.bugReportCount, int64(1)) |
continue |
} |
if err := agg.upload(p); err != nil { |
glog.Errorf("[%s] Uploader %d terminated due to error: %s", agg.Category, identifier, err) |
+ // We are skipping the bugReport, so increment the counts. |
+ atomic.AddInt64(&agg.bugReportCount, int64(1)) |
return |
} |
agg.forBugReporting <- bugReportingPackage{ |
@@ -538,6 +563,11 @@ func (agg *Aggregator) waitForUploads(identifier int) { |
// some helper methods. |
func (agg *Aggregator) upload(p uploadPackage) error { |
glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|%d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.Data.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data.Release.Dump), len(p.Data.Release.StdErr)) |
+ if p.FuzzType == GREY_FUZZ { |
+ agg.greyNames = append(agg.greyNames, p.Data.Name) |
+ } else { |
+ agg.badNames = append(agg.badNames, p.Data.Name) |
+ } |
if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != nil { |
return err |
@@ -669,6 +699,7 @@ func (agg *Aggregator) ShutDown() { |
// RestartAnalysis restarts the shut down aggregator. Anything that is in the scanning directory |
// should be cleared out, lest it be rescanned/analyzed. |
func (agg *Aggregator) RestartAnalysis() error { |
+ agg.deduplicator.Clear() |
return agg.start() |
} |
@@ -684,6 +715,7 @@ func (agg *Aggregator) WaitForEmptyQueues() { |
} |
t := time.Tick(config.Aggregator.StatusPeriod) |
glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod) |
+ hangCount := 0 |
for _ = range t { |
a = len(agg.forAnalysis) |
u = len(agg.forUpload) |
@@ -693,8 +725,13 @@ func (agg *Aggregator) WaitForEmptyQueues() { |
if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { |
break |
} |
+ // This prevents waiting forever if an upload crashes, aborts or otherwise hangs. |
+ hangCount++ |
+ if hangCount >= HANG_THRESHOLD { |
+ glog.Warningf("Was waiting for %d rounds and still wasn't done. Quitting anyway.", hangCount) |
+ } |
+ |
glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod) |
- |
} |
} |
@@ -703,3 +740,12 @@ func (agg *Aggregator) WaitForEmptyQueues() { |
func (agg *Aggregator) ForceAnalysis(path string) { |
agg.forAnalysis <- path |
} |
+ |
+func (agg *Aggregator) ClearUploadedFuzzNames() { |
+ agg.greyNames = []string{} |
+ agg.badNames = []string{} |
+} |
+ |
+func (agg *Aggregator) UploadedFuzzNames() (bad, grey []string) { |
+ return agg.badNames, agg.greyNames |
+} |