Index: fuzzer/go/aggregator/aggregator.go |
diff --git a/fuzzer/go/aggregator/aggregator.go b/fuzzer/go/aggregator/aggregator.go |
index 9dacc39a8303b74dfc23414b24de9ea191ba1167..93b844cfd04fd3f64a67d46bc89eda9048b3c05b 100644 |
--- a/fuzzer/go/aggregator/aggregator.go |
+++ b/fuzzer/go/aggregator/aggregator.go |
@@ -14,13 +14,13 @@ import ( |
"sync/atomic" |
"time" |
- go_metrics "github.com/rcrowley/go-metrics" |
"github.com/skia-dev/glog" |
"go.skia.org/infra/fuzzer/go/common" |
"go.skia.org/infra/fuzzer/go/config" |
"go.skia.org/infra/fuzzer/go/frontend/data" |
"go.skia.org/infra/go/exec" |
"go.skia.org/infra/go/fileutil" |
+ "go.skia.org/infra/go/metrics2" |
"go.skia.org/infra/go/util" |
"golang.org/x/net/context" |
"google.golang.org/cloud/storage" |
@@ -73,8 +73,9 @@ type Aggregator struct { |
} |
const ( |
- BAD_FUZZ = "bad" |
- GREY_FUZZ = "grey" |
+ BAD_FUZZ = "bad" |
+ GREY_FUZZ = "grey" |
+ HANG_THRESHOLD = 100 |
) |
var ( |
@@ -245,8 +246,8 @@ func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { |
// we have references to where the crashes will be. |
// TODO(kjlubick), switch to using flock once afl-fuzz implements that upstream. |
time.Sleep(time.Second) |
- go_metrics.GetOrRegisterGauge("binary_newly_found_fuzzes", go_metrics.DefaultRegistry).Update(int64(len(newlyFound))) |
- glog.Infof("[%s] %d newly found bad binary fuzzes", agg.Category, len(newlyFound)) |
+ metrics2.GetInt64Metric("newly_found_fuzzes", map[string]string{"category": agg.Category}).Update(int64(len(newlyFound))) |
borenet
2016/02/10 15:32:32
Naming conventions: measurements and tag keys shou
kjlubick
2016/02/10 16:14:29
Done.
|
+ glog.Infof("[%s] %d newly found bad fuzzes", agg.Category, len(newlyFound)) |
for _, f := range newlyFound { |
agg.forAnalysis <- f |
} |
@@ -332,7 +333,7 @@ func (agg *Aggregator) findBadFuzzPaths(alreadyFoundFuzzes *SortedStringSlice) ( |
// happen, this method terminates. |
func (agg *Aggregator) waitForAnalysis(identifier int) { |
defer agg.aggregationWaitGroup.Done() |
- defer go_metrics.GetOrRegisterCounter("analysis_process_count", go_metrics.DefaultRegistry).Dec(int64(1)) |
+ defer metrics2.NewCounter("analysis_process_count", map[string]string{"category": agg.Category}).Dec(int64(1)) |
borenet
2016/02/10 15:32:32
Use dashes, ie. "analysis-process-count", here and
kjlubick
2016/02/10 16:14:29
Done.
|
glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) |
// our own unique working folder |
@@ -508,7 +509,7 @@ func (s *SortedStringSlice) Append(strs []string) { |
// them. If any unrecoverable errors happen, this method terminates. |
func (agg *Aggregator) waitForUploads(identifier int) { |
defer agg.aggregationWaitGroup.Done() |
- defer go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics.DefaultRegistry).Dec(int64(1)) |
+ defer metrics2.NewCounter("upload_process_count", map[string]string{"category": agg.Category}).Dec(int64(1)) |
glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) |
for { |
select { |
@@ -627,11 +628,11 @@ func (agg *Aggregator) bugReportingHelper(p bugReportingPackage) error { |
// many processes are up. |
func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses int) { |
defer agg.monitoringWaitGroup.Done() |
- analysisProcessCount := go_metrics.GetOrRegisterCounter("analysis_process_count", go_metrics.DefaultRegistry) |
- analysisProcessCount.Clear() |
+ analysisProcessCount := metrics2.NewCounter("analysis_process_count", map[string]string{"category": agg.Category}) |
+ analysisProcessCount.Reset() |
analysisProcessCount.Inc(int64(numAnalysisProcesses)) |
- uploadProcessCount := go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics.DefaultRegistry) |
- uploadProcessCount.Clear() |
+ uploadProcessCount := metrics2.NewCounter("upload_process_count", map[string]string{"category": agg.Category}) |
+ uploadProcessCount.Reset() |
uploadProcessCount.Inc(int64(numUploadProcesses)) |
t := time.Tick(config.Aggregator.StatusPeriod) |
@@ -641,9 +642,9 @@ func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in |
glog.Infof("[%s] aggregator monitor got signal to shut down", agg.Category) |
return |
case <-t: |
- go_metrics.GetOrRegisterGauge("binary_analysis_queue_size", go_metrics.DefaultRegistry).Update(int64(len(agg.forAnalysis))) |
- go_metrics.GetOrRegisterGauge("binary_upload_queue_size", go_metrics.DefaultRegistry).Update(int64(len(agg.forUpload))) |
- go_metrics.GetOrRegisterGauge("binary_bug_report_queue_size", go_metrics.DefaultRegistry).Update(int64(len(agg.forBugReporting))) |
+ metrics2.GetInt64Metric("analysis_queue_size", map[string]string{"category": agg.Category}).Update(int64(len(agg.forAnalysis))) |
+ metrics2.GetInt64Metric("upload_queue_size", map[string]string{"category": agg.Category}).Update(int64(len(agg.forUpload))) |
+ metrics2.GetInt64Metric("bug_report_queue_size", map[string]string{"category": agg.Category}).Update(int64(len(agg.forBugReporting))) |
} |
} |
} |
@@ -684,6 +685,7 @@ func (agg *Aggregator) WaitForEmptyQueues() { |
} |
t := time.Tick(config.Aggregator.StatusPeriod) |
glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod) |
+ hangCount := 0 |
for _ = range t { |
a = len(agg.forAnalysis) |
u = len(agg.forUpload) |
@@ -693,7 +695,13 @@ func (agg *Aggregator) WaitForEmptyQueues() { |
if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { |
break |
} |
+ // This prevents waiting forever if an upload crashes, aborts or otherwise hangs. |
+ hangCount++ |
+ if hangCount >= HANG_THRESHOLD { |
+ glog.Warningf("Was waiting for %d rounds and still wasn't done. Quitting anyway.", hangCount) |
+ } |
borenet
2016/02/10 15:32:32
Does this logic belong in this CL?
kjlubick
2016/02/10 16:14:29
Removed.
|
glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod) |
+ |
} |
} |