OLD | NEW |
---|---|
1 package aggregator | 1 package aggregator |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "crypto/sha1" | 5 "crypto/sha1" |
6 "fmt" | 6 "fmt" |
7 "io" | 7 "io" |
8 "io/ioutil" | 8 "io/ioutil" |
9 "os" | 9 "os" |
10 "path/filepath" | 10 "path/filepath" |
11 "sort" | 11 "sort" |
12 "strings" | 12 "strings" |
13 "sync" | 13 "sync" |
14 "sync/atomic" | 14 "sync/atomic" |
15 "time" | 15 "time" |
16 | 16 |
17 go_metrics "github.com/rcrowley/go-metrics" | |
18 "github.com/skia-dev/glog" | 17 "github.com/skia-dev/glog" |
19 "go.skia.org/infra/fuzzer/go/common" | 18 "go.skia.org/infra/fuzzer/go/common" |
20 "go.skia.org/infra/fuzzer/go/config" | 19 "go.skia.org/infra/fuzzer/go/config" |
21 "go.skia.org/infra/fuzzer/go/frontend/data" | 20 "go.skia.org/infra/fuzzer/go/frontend/data" |
22 "go.skia.org/infra/go/exec" | 21 "go.skia.org/infra/go/exec" |
23 "go.skia.org/infra/go/fileutil" | 22 "go.skia.org/infra/go/fileutil" |
23 "go.skia.org/infra/go/metrics2" | |
24 "go.skia.org/infra/go/util" | 24 "go.skia.org/infra/go/util" |
25 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
26 "google.golang.org/cloud/storage" | 26 "google.golang.org/cloud/storage" |
27 ) | 27 ) |
28 | 28 |
29 // Aggregator is a key part of the fuzzing operation | 29 // Aggregator is a key part of the fuzzing operation |
30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). | 30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). |
31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It | 31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It |
32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a | 32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a |
33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the | 33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
66 aggregationWaitGroup *sync.WaitGroup | 66 aggregationWaitGroup *sync.WaitGroup |
67 // These three counts are used to determine if there is any pending work . | 67 // These three counts are used to determine if there is any pending work . |
68 // There is no pending work if all three of these values are equal and t he | 68 // There is no pending work if all three of these values are equal and t he |
69 // work queues are empty. | 69 // work queues are empty. |
70 analysisCount int64 | 70 analysisCount int64 |
71 uploadCount int64 | 71 uploadCount int64 |
72 bugReportCount int64 | 72 bugReportCount int64 |
73 } | 73 } |
74 | 74 |
75 const ( | 75 const ( |
76 » BAD_FUZZ = "bad" | 76 » BAD_FUZZ = "bad" |
77 » GREY_FUZZ = "grey" | 77 » GREY_FUZZ = "grey" |
78 » HANG_THRESHOLD = 100 | |
78 ) | 79 ) |
79 | 80 |
80 var ( | 81 var ( |
81 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" | 82 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" |
82 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" | 83 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" |
83 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" | 84 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" |
84 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" | 85 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" |
85 ) | 86 ) |
86 | 87 |
87 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS | 88 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS |
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
238 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date. | 239 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date. |
239 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { | 240 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { |
240 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) | 241 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) |
241 if err != nil { | 242 if err != nil { |
242 return err | 243 return err |
243 } | 244 } |
244 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after | 245 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after |
245 // we have references to where the crashes will be. | 246 // we have references to where the crashes will be. |
246 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream. | 247 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream. |
247 time.Sleep(time.Second) | 248 time.Sleep(time.Second) |
248 » go_metrics.GetOrRegisterGauge("binary_newly_found_fuzzes", go_metrics.De faultRegistry).Update(int64(len(newlyFound))) | 249 » metrics2.GetInt64Metric("newly_found_fuzzes", map[string]string{"categor y": agg.Category}).Update(int64(len(newlyFound))) |
borenet
2016/02/10 15:32:32
Naming conventions: measurements and tag keys shou
kjlubick
2016/02/10 16:14:29
Done.
| |
249 » glog.Infof("[%s] %d newly found bad binary fuzzes", agg.Category, len(ne wlyFound)) | 250 » glog.Infof("[%s] %d newly found bad fuzzes", agg.Category, len(newlyFoun d)) |
250 for _, f := range newlyFound { | 251 for _, f := range newlyFound { |
251 agg.forAnalysis <- f | 252 agg.forAnalysis <- f |
252 } | 253 } |
253 alreadyFoundFuzzes.Append(newlyFound) | 254 alreadyFoundFuzzes.Append(newlyFound) |
254 return nil | 255 return nil |
255 } | 256 } |
256 | 257 |
257 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and | 258 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and |
258 // returns the path to all files that are in a crash* folder that are not alread y in | 259 // returns the path to all files that are in a crash* folder that are not alread y in |
259 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them. | 260 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them. |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
325 } | 326 } |
326 return badFuzzPaths, nil | 327 return badFuzzPaths, nil |
327 } | 328 } |
328 | 329 |
329 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of | 330 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of |
330 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied | 331 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied |
331 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors | 332 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors |
332 // happen, this method terminates. | 333 // happen, this method terminates. |
333 func (agg *Aggregator) waitForAnalysis(identifier int) { | 334 func (agg *Aggregator) waitForAnalysis(identifier int) { |
334 defer agg.aggregationWaitGroup.Done() | 335 defer agg.aggregationWaitGroup.Done() |
335 » defer go_metrics.GetOrRegisterCounter("analysis_process_count", go_metri cs.DefaultRegistry).Dec(int64(1)) | 336 » defer metrics2.NewCounter("analysis_process_count", map[string]string{"c ategory": agg.Category}).Dec(int64(1)) |
borenet
2016/02/10 15:32:32
Use dashes, ie. "analysis-process-count", here and
kjlubick
2016/02/10 16:14:29
Done.
| |
336 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) | 337 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) |
337 | 338 |
338 // our own unique working folder | 339 // our own unique working folder |
339 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier)) | 340 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier)) |
340 if err := agg.setupAnalysis(executableDir); err != nil { | 341 if err := agg.setupAnalysis(executableDir); err != nil { |
341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err) | 342 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err) |
342 return | 343 return |
343 } | 344 } |
344 for { | 345 for { |
345 select { | 346 select { |
(...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
501 // Append adds all of the strings to the underlying slice and sorts it | 502 // Append adds all of the strings to the underlying slice and sorts it |
502 func (s *SortedStringSlice) Append(strs []string) { | 503 func (s *SortedStringSlice) Append(strs []string) { |
503 s.strings = append(s.strings, strs...) | 504 s.strings = append(s.strings, strs...) |
504 s.strings.Sort() | 505 s.strings.Sort() |
505 } | 506 } |
506 | 507 |
507 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads | 508 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads |
508 // them. If any unrecoverable errors happen, this method terminates. | 509 // them. If any unrecoverable errors happen, this method terminates. |
509 func (agg *Aggregator) waitForUploads(identifier int) { | 510 func (agg *Aggregator) waitForUploads(identifier int) { |
510 defer agg.aggregationWaitGroup.Done() | 511 defer agg.aggregationWaitGroup.Done() |
511 » defer go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics .DefaultRegistry).Dec(int64(1)) | 512 » defer metrics2.NewCounter("upload_process_count", map[string]string{"cat egory": agg.Category}).Dec(int64(1)) |
512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) | 513 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) |
513 for { | 514 for { |
514 select { | 515 select { |
515 case p := <-agg.forUpload: | 516 case p := <-agg.forUpload: |
516 atomic.AddInt64(&agg.uploadCount, int64(1)) | 517 atomic.AddInt64(&agg.uploadCount, int64(1)) |
517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { | 518 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { |
518 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name) | 519 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name) |
519 continue | 520 continue |
520 } | 521 } |
521 if err := agg.upload(p); err != nil { | 522 if err := agg.upload(p); err != nil { |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
620 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { | 621 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { |
621 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName) | 622 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName) |
622 } | 623 } |
623 return nil | 624 return nil |
624 } | 625 } |
625 | 626 |
626 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how | 627 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how |
627 // many processes are up. | 628 // many processes are up. |
628 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) { | 629 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) { |
629 defer agg.monitoringWaitGroup.Done() | 630 defer agg.monitoringWaitGroup.Done() |
630 » analysisProcessCount := go_metrics.GetOrRegisterCounter("analysis_proces s_count", go_metrics.DefaultRegistry) | 631 » analysisProcessCount := metrics2.NewCounter("analysis_process_count", ma p[string]string{"category": agg.Category}) |
631 » analysisProcessCount.Clear() | 632 » analysisProcessCount.Reset() |
632 analysisProcessCount.Inc(int64(numAnalysisProcesses)) | 633 analysisProcessCount.Inc(int64(numAnalysisProcesses)) |
633 » uploadProcessCount := go_metrics.GetOrRegisterCounter("upload_process_co unt", go_metrics.DefaultRegistry) | 634 » uploadProcessCount := metrics2.NewCounter("upload_process_count", map[st ring]string{"category": agg.Category}) |
634 » uploadProcessCount.Clear() | 635 » uploadProcessCount.Reset() |
635 uploadProcessCount.Inc(int64(numUploadProcesses)) | 636 uploadProcessCount.Inc(int64(numUploadProcesses)) |
636 | 637 |
637 t := time.Tick(config.Aggregator.StatusPeriod) | 638 t := time.Tick(config.Aggregator.StatusPeriod) |
638 for { | 639 for { |
639 select { | 640 select { |
640 case <-agg.monitoringShutdown: | 641 case <-agg.monitoringShutdown: |
641 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category) | 642 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category) |
642 return | 643 return |
643 case <-t: | 644 case <-t: |
644 » » » go_metrics.GetOrRegisterGauge("binary_analysis_queue_siz e", go_metrics.DefaultRegistry).Update(int64(len(agg.forAnalysis))) | 645 » » » metrics2.GetInt64Metric("analysis_queue_size", map[strin g]string{"category": agg.Category}).Update(int64(len(agg.forAnalysis))) |
645 » » » go_metrics.GetOrRegisterGauge("binary_upload_queue_size" , go_metrics.DefaultRegistry).Update(int64(len(agg.forUpload))) | 646 » » » metrics2.GetInt64Metric("upload_queue_size", map[string] string{"category": agg.Category}).Update(int64(len(agg.forUpload))) |
646 » » » go_metrics.GetOrRegisterGauge("binary_bug_report_queue_s ize", go_metrics.DefaultRegistry).Update(int64(len(agg.forBugReporting))) | 647 » » » metrics2.GetInt64Metric("bug_report_queue_size", map[str ing]string{"category": agg.Category}).Update(int64(len(agg.forBugReporting))) |
647 } | 648 } |
648 } | 649 } |
649 } | 650 } |
650 | 651 |
651 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish | 652 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish |
652 // prior to the shutdown. | 653 // prior to the shutdown. |
653 func (agg *Aggregator) ShutDown() { | 654 func (agg *Aggregator) ShutDown() { |
654 // once for the monitoring and once for the scanning routines | 655 // once for the monitoring and once for the scanning routines |
655 agg.monitoringShutdown <- true | 656 agg.monitoringShutdown <- true |
656 agg.monitoringShutdown <- true | 657 agg.monitoringShutdown <- true |
(...skipping 20 matching lines...) Expand all Loading... | |
677 func (agg *Aggregator) WaitForEmptyQueues() { | 678 func (agg *Aggregator) WaitForEmptyQueues() { |
678 a := len(agg.forAnalysis) | 679 a := len(agg.forAnalysis) |
679 u := len(agg.forUpload) | 680 u := len(agg.forUpload) |
680 b := len(agg.forBugReporting) | 681 b := len(agg.forBugReporting) |
681 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { | 682 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { |
682 glog.Infof("[%s] Queues were already empty", agg.Category) | 683 glog.Infof("[%s] Queues were already empty", agg.Category) |
683 return | 684 return |
684 } | 685 } |
685 t := time.Tick(config.Aggregator.StatusPeriod) | 686 t := time.Tick(config.Aggregator.StatusPeriod) |
686 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod) | 687 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod) |
688 hangCount := 0 | |
687 for _ = range t { | 689 for _ = range t { |
688 a = len(agg.forAnalysis) | 690 a = len(agg.forAnalysis) |
689 u = len(agg.forUpload) | 691 u = len(agg.forUpload) |
690 b = len(agg.forBugReporting) | 692 b = len(agg.forBugReporting) |
691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b) | 693 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b) |
692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t) | 694 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t) |
693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount { | 695 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount { |
694 break | 696 break |
695 } | 697 } |
698 // This prevents waiting forever if an upload crashes, aborts or otherwise hangs. | |
699 hangCount++ | |
700 if hangCount >= HANG_THRESHOLD { | |
701 glog.Warningf("Was waiting for %d rounds and still wasn' t done. Quitting anyway.", hangCount) | |
702 } | |
borenet
2016/02/10 15:32:32
Does this logic belong in this CL?
kjlubick
2016/02/10 16:14:29
Removed.
| |
696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod) | 703 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod) |
704 | |
697 } | 705 } |
698 } | 706 } |
699 | 707 |
700 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed, | 708 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed, |
701 // uploaded and possibly bug reported. | 709 // uploaded and possibly bug reported. |
702 func (agg *Aggregator) ForceAnalysis(path string) { | 710 func (agg *Aggregator) ForceAnalysis(path string) { |
703 agg.forAnalysis <- path | 711 agg.forAnalysis <- path |
704 } | 712 } |
OLD | NEW |