Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 package aggregator | 1 package aggregator |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "bytes" | 4 "bytes" |
| 5 "crypto/sha1" | 5 "crypto/sha1" |
| 6 "fmt" | 6 "fmt" |
| 7 "io" | 7 "io" |
| 8 "io/ioutil" | 8 "io/ioutil" |
| 9 "os" | 9 "os" |
| 10 "path/filepath" | 10 "path/filepath" |
| 11 "sort" | 11 "sort" |
| 12 "strings" | 12 "strings" |
| 13 "sync" | 13 "sync" |
| 14 "sync/atomic" | 14 "sync/atomic" |
| 15 "time" | 15 "time" |
| 16 | 16 |
| 17 go_metrics "github.com/rcrowley/go-metrics" | |
| 18 "github.com/skia-dev/glog" | 17 "github.com/skia-dev/glog" |
| 19 "go.skia.org/infra/fuzzer/go/common" | 18 "go.skia.org/infra/fuzzer/go/common" |
| 20 "go.skia.org/infra/fuzzer/go/config" | 19 "go.skia.org/infra/fuzzer/go/config" |
| 21 "go.skia.org/infra/fuzzer/go/frontend/data" | 20 "go.skia.org/infra/fuzzer/go/frontend/data" |
| 22 "go.skia.org/infra/go/exec" | 21 "go.skia.org/infra/go/exec" |
| 23 "go.skia.org/infra/go/fileutil" | 22 "go.skia.org/infra/go/fileutil" |
| 23 "go.skia.org/infra/go/metrics2" | |
| 24 "go.skia.org/infra/go/util" | 24 "go.skia.org/infra/go/util" |
| 25 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
| 26 "google.golang.org/cloud/storage" | 26 "google.golang.org/cloud/storage" |
| 27 ) | 27 ) |
| 28 | 28 |
| 29 // Aggregator is a key part of the fuzzing operation | 29 // Aggregator is a key part of the fuzzing operation |
| 30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). | 30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). |
| 31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It | 31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It |
| 32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a | 32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a |
| 33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the | 33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 66 aggregationWaitGroup *sync.WaitGroup | 66 aggregationWaitGroup *sync.WaitGroup |
| 67 // These three counts are used to determine if there is any pending work . | 67 // These three counts are used to determine if there is any pending work . |
| 68 // There is no pending work if all three of these values are equal and t he | 68 // There is no pending work if all three of these values are equal and t he |
| 69 // work queues are empty. | 69 // work queues are empty. |
| 70 analysisCount int64 | 70 analysisCount int64 |
| 71 uploadCount int64 | 71 uploadCount int64 |
| 72 bugReportCount int64 | 72 bugReportCount int64 |
| 73 } | 73 } |
| 74 | 74 |
| 75 const ( | 75 const ( |
| 76 » BAD_FUZZ = "bad" | 76 » BAD_FUZZ = "bad" |
| 77 » GREY_FUZZ = "grey" | 77 » GREY_FUZZ = "grey" |
| 78 » HANG_THRESHOLD = 100 | |
| 78 ) | 79 ) |
| 79 | 80 |
| 80 var ( | 81 var ( |
| 81 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" | 82 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" |
| 82 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" | 83 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" |
| 83 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" | 84 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" |
| 84 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" | 85 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" |
| 85 ) | 86 ) |
| 86 | 87 |
| 87 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS | 88 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS |
| (...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 238 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date. | 239 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date. |
| 239 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { | 240 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { |
| 240 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) | 241 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) |
| 241 if err != nil { | 242 if err != nil { |
| 242 return err | 243 return err |
| 243 } | 244 } |
| 244 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after | 245 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after |
| 245 // we have references to where the crashes will be. | 246 // we have references to where the crashes will be. |
| 246 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream. | 247 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream. |
| 247 time.Sleep(time.Second) | 248 time.Sleep(time.Second) |
| 248 » go_metrics.GetOrRegisterGauge("binary_newly_found_fuzzes", go_metrics.De faultRegistry).Update(int64(len(newlyFound))) | 249 » metrics2.GetInt64Metric("newly_found_fuzzes", map[string]string{"categor y": agg.Category}).Update(int64(len(newlyFound))) |
|
borenet
2016/02/10 15:32:32
Naming conventions: measurements and tag keys shou
kjlubick
2016/02/10 16:14:29
Done.
| |
| 249 » glog.Infof("[%s] %d newly found bad binary fuzzes", agg.Category, len(ne wlyFound)) | 250 » glog.Infof("[%s] %d newly found bad fuzzes", agg.Category, len(newlyFoun d)) |
| 250 for _, f := range newlyFound { | 251 for _, f := range newlyFound { |
| 251 agg.forAnalysis <- f | 252 agg.forAnalysis <- f |
| 252 } | 253 } |
| 253 alreadyFoundFuzzes.Append(newlyFound) | 254 alreadyFoundFuzzes.Append(newlyFound) |
| 254 return nil | 255 return nil |
| 255 } | 256 } |
| 256 | 257 |
| 257 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and | 258 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and |
| 258 // returns the path to all files that are in a crash* folder that are not alread y in | 259 // returns the path to all files that are in a crash* folder that are not alread y in |
| 259 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them. | 260 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them. |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 325 } | 326 } |
| 326 return badFuzzPaths, nil | 327 return badFuzzPaths, nil |
| 327 } | 328 } |
| 328 | 329 |
| 329 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of | 330 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of |
| 330 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied | 331 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied |
| 331 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors | 332 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors |
| 332 // happen, this method terminates. | 333 // happen, this method terminates. |
| 333 func (agg *Aggregator) waitForAnalysis(identifier int) { | 334 func (agg *Aggregator) waitForAnalysis(identifier int) { |
| 334 defer agg.aggregationWaitGroup.Done() | 335 defer agg.aggregationWaitGroup.Done() |
| 335 » defer go_metrics.GetOrRegisterCounter("analysis_process_count", go_metri cs.DefaultRegistry).Dec(int64(1)) | 336 » defer metrics2.NewCounter("analysis_process_count", map[string]string{"c ategory": agg.Category}).Dec(int64(1)) |
|
borenet
2016/02/10 15:32:32
Use dashes, ie. "analysis-process-count", here and
kjlubick
2016/02/10 16:14:29
Done.
| |
| 336 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) | 337 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) |
| 337 | 338 |
| 338 // our own unique working folder | 339 // our own unique working folder |
| 339 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier)) | 340 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier)) |
| 340 if err := agg.setupAnalysis(executableDir); err != nil { | 341 if err := agg.setupAnalysis(executableDir); err != nil { |
| 341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err) | 342 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err) |
| 342 return | 343 return |
| 343 } | 344 } |
| 344 for { | 345 for { |
| 345 select { | 346 select { |
| (...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 501 // Append adds all of the strings to the underlying slice and sorts it | 502 // Append adds all of the strings to the underlying slice and sorts it |
| 502 func (s *SortedStringSlice) Append(strs []string) { | 503 func (s *SortedStringSlice) Append(strs []string) { |
| 503 s.strings = append(s.strings, strs...) | 504 s.strings = append(s.strings, strs...) |
| 504 s.strings.Sort() | 505 s.strings.Sort() |
| 505 } | 506 } |
| 506 | 507 |
| 507 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads | 508 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads |
| 508 // them. If any unrecoverable errors happen, this method terminates. | 509 // them. If any unrecoverable errors happen, this method terminates. |
| 509 func (agg *Aggregator) waitForUploads(identifier int) { | 510 func (agg *Aggregator) waitForUploads(identifier int) { |
| 510 defer agg.aggregationWaitGroup.Done() | 511 defer agg.aggregationWaitGroup.Done() |
| 511 » defer go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics .DefaultRegistry).Dec(int64(1)) | 512 » defer metrics2.NewCounter("upload_process_count", map[string]string{"cat egory": agg.Category}).Dec(int64(1)) |
| 512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) | 513 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) |
| 513 for { | 514 for { |
| 514 select { | 515 select { |
| 515 case p := <-agg.forUpload: | 516 case p := <-agg.forUpload: |
| 516 atomic.AddInt64(&agg.uploadCount, int64(1)) | 517 atomic.AddInt64(&agg.uploadCount, int64(1)) |
| 517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { | 518 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { |
| 518 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name) | 519 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name) |
| 519 continue | 520 continue |
| 520 } | 521 } |
| 521 if err := agg.upload(p); err != nil { | 522 if err := agg.upload(p); err != nil { |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 620 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { | 621 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { |
| 621 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName) | 622 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName) |
| 622 } | 623 } |
| 623 return nil | 624 return nil |
| 624 } | 625 } |
| 625 | 626 |
| 626 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how | 627 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how |
| 627 // many processes are up. | 628 // many processes are up. |
| 628 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) { | 629 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) { |
| 629 defer agg.monitoringWaitGroup.Done() | 630 defer agg.monitoringWaitGroup.Done() |
| 630 » analysisProcessCount := go_metrics.GetOrRegisterCounter("analysis_proces s_count", go_metrics.DefaultRegistry) | 631 » analysisProcessCount := metrics2.NewCounter("analysis_process_count", ma p[string]string{"category": agg.Category}) |
| 631 » analysisProcessCount.Clear() | 632 » analysisProcessCount.Reset() |
| 632 analysisProcessCount.Inc(int64(numAnalysisProcesses)) | 633 analysisProcessCount.Inc(int64(numAnalysisProcesses)) |
| 633 » uploadProcessCount := go_metrics.GetOrRegisterCounter("upload_process_co unt", go_metrics.DefaultRegistry) | 634 » uploadProcessCount := metrics2.NewCounter("upload_process_count", map[st ring]string{"category": agg.Category}) |
| 634 » uploadProcessCount.Clear() | 635 » uploadProcessCount.Reset() |
| 635 uploadProcessCount.Inc(int64(numUploadProcesses)) | 636 uploadProcessCount.Inc(int64(numUploadProcesses)) |
| 636 | 637 |
| 637 t := time.Tick(config.Aggregator.StatusPeriod) | 638 t := time.Tick(config.Aggregator.StatusPeriod) |
| 638 for { | 639 for { |
| 639 select { | 640 select { |
| 640 case <-agg.monitoringShutdown: | 641 case <-agg.monitoringShutdown: |
| 641 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category) | 642 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category) |
| 642 return | 643 return |
| 643 case <-t: | 644 case <-t: |
| 644 » » » go_metrics.GetOrRegisterGauge("binary_analysis_queue_siz e", go_metrics.DefaultRegistry).Update(int64(len(agg.forAnalysis))) | 645 » » » metrics2.GetInt64Metric("analysis_queue_size", map[strin g]string{"category": agg.Category}).Update(int64(len(agg.forAnalysis))) |
| 645 » » » go_metrics.GetOrRegisterGauge("binary_upload_queue_size" , go_metrics.DefaultRegistry).Update(int64(len(agg.forUpload))) | 646 » » » metrics2.GetInt64Metric("upload_queue_size", map[string] string{"category": agg.Category}).Update(int64(len(agg.forUpload))) |
| 646 » » » go_metrics.GetOrRegisterGauge("binary_bug_report_queue_s ize", go_metrics.DefaultRegistry).Update(int64(len(agg.forBugReporting))) | 647 » » » metrics2.GetInt64Metric("bug_report_queue_size", map[str ing]string{"category": agg.Category}).Update(int64(len(agg.forBugReporting))) |
| 647 } | 648 } |
| 648 } | 649 } |
| 649 } | 650 } |
| 650 | 651 |
| 651 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish | 652 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish |
| 652 // prior to the shutdown. | 653 // prior to the shutdown. |
| 653 func (agg *Aggregator) ShutDown() { | 654 func (agg *Aggregator) ShutDown() { |
| 654 // once for the monitoring and once for the scanning routines | 655 // once for the monitoring and once for the scanning routines |
| 655 agg.monitoringShutdown <- true | 656 agg.monitoringShutdown <- true |
| 656 agg.monitoringShutdown <- true | 657 agg.monitoringShutdown <- true |
| (...skipping 20 matching lines...) Expand all Loading... | |
| 677 func (agg *Aggregator) WaitForEmptyQueues() { | 678 func (agg *Aggregator) WaitForEmptyQueues() { |
| 678 a := len(agg.forAnalysis) | 679 a := len(agg.forAnalysis) |
| 679 u := len(agg.forUpload) | 680 u := len(agg.forUpload) |
| 680 b := len(agg.forBugReporting) | 681 b := len(agg.forBugReporting) |
| 681 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { | 682 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { |
| 682 glog.Infof("[%s] Queues were already empty", agg.Category) | 683 glog.Infof("[%s] Queues were already empty", agg.Category) |
| 683 return | 684 return |
| 684 } | 685 } |
| 685 t := time.Tick(config.Aggregator.StatusPeriod) | 686 t := time.Tick(config.Aggregator.StatusPeriod) |
| 686 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod) | 687 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod) |
| 688 hangCount := 0 | |
| 687 for _ = range t { | 689 for _ = range t { |
| 688 a = len(agg.forAnalysis) | 690 a = len(agg.forAnalysis) |
| 689 u = len(agg.forUpload) | 691 u = len(agg.forUpload) |
| 690 b = len(agg.forBugReporting) | 692 b = len(agg.forBugReporting) |
| 691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b) | 693 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b) |
| 692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t) | 694 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t) |
| 693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount { | 695 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount { |
| 694 break | 696 break |
| 695 } | 697 } |
| 698 // This prevents waiting forever if an upload crashes, aborts or otherwise hangs. | |
| 699 hangCount++ | |
| 700 if hangCount >= HANG_THRESHOLD { | |
| 701 glog.Warningf("Was waiting for %d rounds and still wasn' t done. Quitting anyway.", hangCount) | |
| 702 } | |
|
borenet
2016/02/10 15:32:32
Does this logic belong in this CL?
kjlubick
2016/02/10 16:14:29
Removed.
| |
| 696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod) | 703 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod) |
| 704 | |
| 697 } | 705 } |
| 698 } | 706 } |
| 699 | 707 |
| 700 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed, | 708 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed, |
| 701 // uploaded and possibly bug reported. | 709 // uploaded and possibly bug reported. |
| 702 func (agg *Aggregator) ForceAnalysis(path string) { | 710 func (agg *Aggregator) ForceAnalysis(path string) { |
| 703 agg.forAnalysis <- path | 711 agg.forAnalysis <- path |
| 704 } | 712 } |
| OLD | NEW |