Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(600)

Unified Diff: fuzzer/go/aggregator/aggregator.go

Issue 1691893002: Fuzzer now deduplicates on the analysis side instead of the download side (Closed) Base URL: https://skia.googlesource.com/buildbot@metrics
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | fuzzer/go/backend/version_updater.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: fuzzer/go/aggregator/aggregator.go
diff --git a/fuzzer/go/aggregator/aggregator.go b/fuzzer/go/aggregator/aggregator.go
index 9218669b5e31ccb333f412ac0e38e733220b0c0d..abb2f1f5715ddfd6b0647d26396a27024ce00132 100644
--- a/fuzzer/go/aggregator/aggregator.go
+++ b/fuzzer/go/aggregator/aggregator.go
@@ -17,7 +17,8 @@ import (
"github.com/skia-dev/glog"
"go.skia.org/infra/fuzzer/go/common"
"go.skia.org/infra/fuzzer/go/config"
- "go.skia.org/infra/fuzzer/go/frontend/data"
+ "go.skia.org/infra/fuzzer/go/data"
+ "go.skia.org/infra/fuzzer/go/deduplicator"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/metrics2"
@@ -56,6 +57,9 @@ type Aggregator struct {
forUpload chan uploadPackage
forBugReporting chan bugReportingPackage
+
+ deduplicator *deduplicator.Deduplicator
+
// The shutdown channels are used to signal shutdowns. There are two groups, to
// allow for a softer, cleaner shutdown w/ minimal lost work.
// Group A (monitoring) includes the scanning and the monitoring routine.
@@ -70,11 +74,15 @@ type Aggregator struct {
analysisCount int64
uploadCount int64
bugReportCount int64
+
+ greyNames []string
+ badNames []string
}
const (
- BAD_FUZZ = "bad"
- GREY_FUZZ = "grey"
+ BAD_FUZZ = "bad"
+ GREY_FUZZ = "grey"
+ HANG_THRESHOLD = 100
)
var (
@@ -102,7 +110,7 @@ type bugReportingPackage struct {
// StartAggregator creates and starts a Aggregator.
// If there is a problem starting up, an error is returned. Other errors will be logged.
-func StartAggregator(s *storage.Client, category string) (*Aggregator, error) {
+func StartAggregator(s *storage.Client, category string, startingReports <-chan data.FuzzReport) (*Aggregator, error) {
b := Aggregator{
Category: category,
storageClient: s,
@@ -113,10 +121,16 @@ func StartAggregator(s *storage.Client, category string) (*Aggregator, error) {
forBugReporting: make(chan bugReportingPackage, 100),
MakeBugOnBadFuzz: false,
UploadGreyFuzzes: false,
+ deduplicator: deduplicator.New(),
monitoringShutdown: make(chan bool, 2),
// aggregationShutdown needs to be created with a calculated capacity in start
}
+ // preload the duplicator
+ for report := range startingReports {
+ b.deduplicator.IsUnique(report)
+ }
+
return &b, b.start()
}
@@ -347,6 +361,7 @@ func (agg *Aggregator) waitForAnalysis(identifier int) {
atomic.AddInt64(&agg.analysisCount, int64(1))
err := agg.analysisHelper(executableDir, badFuzzPath)
if err != nil {
+ atomic.AddInt64(&agg.analysisCount, int64(-1))
glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.Category, identifier, err)
return
}
@@ -516,10 +531,20 @@ func (agg *Aggregator) waitForUploads(identifier int) {
atomic.AddInt64(&agg.uploadCount, int64(1))
if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ {
glog.Infof("[%s] Skipping upload of grey fuzz %s", agg.Category, p.Data.Name)
+ // We are skipping the bugReport, so increment the counts.
+ atomic.AddInt64(&agg.bugReportCount, int64(1))
+ continue
+ }
+ if p.FuzzType != GREY_FUZZ && !agg.deduplicator.IsUnique(data.ParseReport(p.Data)) {
+ glog.Infof("[%s] Skipping upload of duplicate fuzz %s", agg.Category, p.Data.Name)
+ // We are skipping the bugReport, so increment the counts.
+ atomic.AddInt64(&agg.bugReportCount, int64(1))
continue
}
if err := agg.upload(p); err != nil {
glog.Errorf("[%s] Uploader %d terminated due to error: %s", agg.Category, identifier, err)
+ // We are skipping the bugReport, so increment the counts.
+ atomic.AddInt64(&agg.bugReportCount, int64(1))
return
}
agg.forBugReporting <- bugReportingPackage{
@@ -538,6 +563,11 @@ func (agg *Aggregator) waitForUploads(identifier int) {
// some helper methods.
func (agg *Aggregator) upload(p uploadPackage) error {
glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|%d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.Data.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data.Release.Dump), len(p.Data.Release.StdErr))
+ if p.FuzzType == GREY_FUZZ {
+ agg.greyNames = append(agg.greyNames, p.Data.Name)
+ } else {
+ agg.badNames = append(agg.badNames, p.Data.Name)
+ }
if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != nil {
return err
@@ -669,6 +699,7 @@ func (agg *Aggregator) ShutDown() {
// RestartAnalysis restarts the shut down aggregator. Anything that is in the scanning directory
// should be cleared out, lest it be rescanned/analyzed.
func (agg *Aggregator) RestartAnalysis() error {
+ agg.deduplicator.Clear()
return agg.start()
}
@@ -684,6 +715,7 @@ func (agg *Aggregator) WaitForEmptyQueues() {
}
t := time.Tick(config.Aggregator.StatusPeriod)
glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod)
+ hangCount := 0
for _ = range t {
a = len(agg.forAnalysis)
u = len(agg.forUpload)
@@ -693,8 +725,13 @@ func (agg *Aggregator) WaitForEmptyQueues() {
if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount {
break
}
+ // This prevents waiting forever if an upload crashes, aborts or otherwise hangs.
+ hangCount++
+ if hangCount >= HANG_THRESHOLD {
+ glog.Warningf("Was waiting for %d rounds and still wasn't done. Quitting anyway.", hangCount)
+ }
+
glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod)
-
}
}
@@ -703,3 +740,12 @@ func (agg *Aggregator) WaitForEmptyQueues() {
func (agg *Aggregator) ForceAnalysis(path string) {
agg.forAnalysis <- path
}
+
+func (agg *Aggregator) ClearUploadedFuzzNames() {
+ agg.greyNames = []string{}
+ agg.badNames = []string{}
+}
+
+func (agg *Aggregator) UploadedFuzzNames() (bad, grey []string) {
+ return agg.badNames, agg.greyNames
+}
« no previous file with comments | « no previous file | fuzzer/go/backend/version_updater.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698