Chromium Code Reviews| Index: fuzzer/go/aggregator/aggregator.go |
| diff --git a/fuzzer/go/aggregator/aggregator.go b/fuzzer/go/aggregator/aggregator.go |
| index 9dacc39a8303b74dfc23414b24de9ea191ba1167..93b844cfd04fd3f64a67d46bc89eda9048b3c05b 100644 |
| --- a/fuzzer/go/aggregator/aggregator.go |
| +++ b/fuzzer/go/aggregator/aggregator.go |
| @@ -14,13 +14,13 @@ import ( |
| "sync/atomic" |
| "time" |
| - go_metrics "github.com/rcrowley/go-metrics" |
| "github.com/skia-dev/glog" |
| "go.skia.org/infra/fuzzer/go/common" |
| "go.skia.org/infra/fuzzer/go/config" |
| "go.skia.org/infra/fuzzer/go/frontend/data" |
| "go.skia.org/infra/go/exec" |
| "go.skia.org/infra/go/fileutil" |
| + "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/util" |
| "golang.org/x/net/context" |
| "google.golang.org/cloud/storage" |
| @@ -73,8 +73,9 @@ type Aggregator struct { |
| } |
| const ( |
| - BAD_FUZZ = "bad" |
| - GREY_FUZZ = "grey" |
| + BAD_FUZZ = "bad" |
| + GREY_FUZZ = "grey" |
| + HANG_THRESHOLD = 100 |
| ) |
| var ( |
| @@ -245,8 +246,8 @@ func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { |
| // we have references to where the crashes will be. |
| // TODO(kjlubick), switch to using flock once afl-fuzz implements that upstream. |
| time.Sleep(time.Second) |
| - go_metrics.GetOrRegisterGauge("binary_newly_found_fuzzes", go_metrics.DefaultRegistry).Update(int64(len(newlyFound))) |
| - glog.Infof("[%s] %d newly found bad binary fuzzes", agg.Category, len(newlyFound)) |
| + metrics2.GetInt64Metric("newly_found_fuzzes", map[string]string{"category": agg.Category}).Update(int64(len(newlyFound))) |
|
borenet
2016/02/10 15:32:32
Naming conventions: measurements and tag keys shou
kjlubick
2016/02/10 16:14:29
Done.
|
| + glog.Infof("[%s] %d newly found bad fuzzes", agg.Category, len(newlyFound)) |
| for _, f := range newlyFound { |
| agg.forAnalysis <- f |
| } |
| @@ -332,7 +333,7 @@ func (agg *Aggregator) findBadFuzzPaths(alreadyFoundFuzzes *SortedStringSlice) ( |
| // happen, this method terminates. |
| func (agg *Aggregator) waitForAnalysis(identifier int) { |
| defer agg.aggregationWaitGroup.Done() |
| - defer go_metrics.GetOrRegisterCounter("analysis_process_count", go_metrics.DefaultRegistry).Dec(int64(1)) |
| + defer metrics2.NewCounter("analysis_process_count", map[string]string{"category": agg.Category}).Dec(int64(1)) |
|
borenet
2016/02/10 15:32:32
Use dashes, ie. "analysis-process-count", here and
kjlubick
2016/02/10 16:14:29
Done.
|
| glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) |
| // our own unique working folder |
| @@ -508,7 +509,7 @@ func (s *SortedStringSlice) Append(strs []string) { |
| // them. If any unrecoverable errors happen, this method terminates. |
| func (agg *Aggregator) waitForUploads(identifier int) { |
| defer agg.aggregationWaitGroup.Done() |
| - defer go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics.DefaultRegistry).Dec(int64(1)) |
| + defer metrics2.NewCounter("upload_process_count", map[string]string{"category": agg.Category}).Dec(int64(1)) |
| glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) |
| for { |
| select { |
| @@ -627,11 +628,11 @@ func (agg *Aggregator) bugReportingHelper(p bugReportingPackage) error { |
| // many processes are up. |
| func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses int) { |
| defer agg.monitoringWaitGroup.Done() |
| - analysisProcessCount := go_metrics.GetOrRegisterCounter("analysis_process_count", go_metrics.DefaultRegistry) |
| - analysisProcessCount.Clear() |
| + analysisProcessCount := metrics2.NewCounter("analysis_process_count", map[string]string{"category": agg.Category}) |
| + analysisProcessCount.Reset() |
| analysisProcessCount.Inc(int64(numAnalysisProcesses)) |
| - uploadProcessCount := go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics.DefaultRegistry) |
| - uploadProcessCount.Clear() |
| + uploadProcessCount := metrics2.NewCounter("upload_process_count", map[string]string{"category": agg.Category}) |
| + uploadProcessCount.Reset() |
| uploadProcessCount.Inc(int64(numUploadProcesses)) |
| t := time.Tick(config.Aggregator.StatusPeriod) |
| @@ -641,9 +642,9 @@ func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in |
| glog.Infof("[%s] aggregator monitor got signal to shut down", agg.Category) |
| return |
| case <-t: |
| - go_metrics.GetOrRegisterGauge("binary_analysis_queue_size", go_metrics.DefaultRegistry).Update(int64(len(agg.forAnalysis))) |
| - go_metrics.GetOrRegisterGauge("binary_upload_queue_size", go_metrics.DefaultRegistry).Update(int64(len(agg.forUpload))) |
| - go_metrics.GetOrRegisterGauge("binary_bug_report_queue_size", go_metrics.DefaultRegistry).Update(int64(len(agg.forBugReporting))) |
| + metrics2.GetInt64Metric("analysis_queue_size", map[string]string{"category": agg.Category}).Update(int64(len(agg.forAnalysis))) |
| + metrics2.GetInt64Metric("upload_queue_size", map[string]string{"category": agg.Category}).Update(int64(len(agg.forUpload))) |
| + metrics2.GetInt64Metric("bug_report_queue_size", map[string]string{"category": agg.Category}).Update(int64(len(agg.forBugReporting))) |
| } |
| } |
| } |
| @@ -684,6 +685,7 @@ func (agg *Aggregator) WaitForEmptyQueues() { |
| } |
| t := time.Tick(config.Aggregator.StatusPeriod) |
| glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod) |
| + hangCount := 0 |
| for _ = range t { |
| a = len(agg.forAnalysis) |
| u = len(agg.forUpload) |
| @@ -693,7 +695,13 @@ func (agg *Aggregator) WaitForEmptyQueues() { |
| if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { |
| break |
| } |
| + // This prevents waiting forever if an upload crashes, aborts or otherwise hangs. |
| + hangCount++ |
| + if hangCount >= HANG_THRESHOLD { |
| + glog.Warningf("Was waiting for %d rounds and still wasn't done. Quitting anyway.", hangCount) |
| + } |
|
borenet
2016/02/10 15:32:32
Does this logic belong in this CL?
kjlubick
2016/02/10 16:14:29
Removed.
|
| glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", agg.Category, config.Aggregator.StatusPeriod) |
| + |
| } |
| } |