OLD | NEW |
1 package aggregator | 1 package aggregator |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "crypto/sha1" | 5 "crypto/sha1" |
6 "fmt" | 6 "fmt" |
7 "io" | 7 "io" |
8 "io/ioutil" | 8 "io/ioutil" |
9 "os" | 9 "os" |
10 "path/filepath" | 10 "path/filepath" |
11 "sort" | 11 "sort" |
12 "strings" | 12 "strings" |
13 "sync" | 13 "sync" |
14 "sync/atomic" | 14 "sync/atomic" |
15 "time" | 15 "time" |
16 | 16 |
17 go_metrics "github.com/rcrowley/go-metrics" | |
18 "github.com/skia-dev/glog" | 17 "github.com/skia-dev/glog" |
19 "go.skia.org/infra/fuzzer/go/common" | 18 "go.skia.org/infra/fuzzer/go/common" |
20 "go.skia.org/infra/fuzzer/go/config" | 19 "go.skia.org/infra/fuzzer/go/config" |
21 "go.skia.org/infra/fuzzer/go/frontend/data" | 20 "go.skia.org/infra/fuzzer/go/frontend/data" |
22 "go.skia.org/infra/go/exec" | 21 "go.skia.org/infra/go/exec" |
23 "go.skia.org/infra/go/fileutil" | 22 "go.skia.org/infra/go/fileutil" |
| 23 "go.skia.org/infra/go/metrics2" |
24 "go.skia.org/infra/go/util" | 24 "go.skia.org/infra/go/util" |
25 "golang.org/x/net/context" | 25 "golang.org/x/net/context" |
26 "google.golang.org/cloud/storage" | 26 "google.golang.org/cloud/storage" |
27 ) | 27 ) |
28 | 28 |
29 // Aggregator is a key part of the fuzzing operation | 29 // Aggregator is a key part of the fuzzing operation |
30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). | 30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). |
31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req
uired for them. It | 31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req
uired for them. It |
32 // does this by searching in the specified AflOutputPath for new crashes and mov
es them to a | 32 // does this by searching in the specified AflOutputPath for new crashes and mov
es them to a |
33 // temporary holding folder (specified by FuzzPath) for parsing, before sending
them through the | 33 // temporary holding folder (specified by FuzzPath) for parsing, before sending
them through the |
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
238 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze
s up to date. | 238 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze
s up to date. |
239 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { | 239 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { |
240 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) | 240 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) |
241 if err != nil { | 241 if err != nil { |
242 return err | 242 return err |
243 } | 243 } |
244 // AFL-fuzz does not write crashes or hangs atomically, so this workarou
nd waits for a bit after | 244 // AFL-fuzz does not write crashes or hangs atomically, so this workarou
nd waits for a bit after |
245 // we have references to where the crashes will be. | 245 // we have references to where the crashes will be. |
246 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u
pstream. | 246 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u
pstream. |
247 time.Sleep(time.Second) | 247 time.Sleep(time.Second) |
248 » go_metrics.GetOrRegisterGauge("binary_newly_found_fuzzes", go_metrics.De
faultRegistry).Update(int64(len(newlyFound))) | 248 » metrics2.GetInt64Metric("fuzzer.fuzzes.newly-found", map[string]string{"
category": agg.Category}).Update(int64(len(newlyFound))) |
249 » glog.Infof("[%s] %d newly found bad binary fuzzes", agg.Category, len(ne
wlyFound)) | 249 » glog.Infof("[%s] %d newly found bad fuzzes", agg.Category, len(newlyFoun
d)) |
250 for _, f := range newlyFound { | 250 for _, f := range newlyFound { |
251 agg.forAnalysis <- f | 251 agg.forAnalysis <- f |
252 } | 252 } |
253 alreadyFoundFuzzes.Append(newlyFound) | 253 alreadyFoundFuzzes.Append(newlyFound) |
254 return nil | 254 return nil |
255 } | 255 } |
256 | 256 |
257 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the
passed in path and | 257 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the
passed in path and |
258 // returns the path to all files that are in a crash* folder that are not alread
y in | 258 // returns the path to all files that are in a crash* folder that are not alread
y in |
259 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it
finds them. | 259 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it
finds them. |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
325 } | 325 } |
326 return badFuzzPaths, nil | 326 return badFuzzPaths, nil |
327 } | 327 } |
328 | 328 |
329 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a
nd makes a copy of | 329 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a
nd makes a copy of |
330 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin
g the supplied | 330 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin
g the supplied |
331 // AnalysisPackage and then signals the results should be uploaded. If any unrec
overable errors | 331 // AnalysisPackage and then signals the results should be uploaded. If any unrec
overable errors |
332 // happen, this method terminates. | 332 // happen, this method terminates. |
333 func (agg *Aggregator) waitForAnalysis(identifier int) { | 333 func (agg *Aggregator) waitForAnalysis(identifier int) { |
334 defer agg.aggregationWaitGroup.Done() | 334 defer agg.aggregationWaitGroup.Done() |
335 » defer go_metrics.GetOrRegisterCounter("analysis_process_count", go_metri
cs.DefaultRegistry).Dec(int64(1)) | 335 » defer metrics2.NewCounter("analysis-process-count", map[string]string{"c
ategory": agg.Category}).Dec(int64(1)) |
336 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) | 336 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) |
337 | 337 |
338 // our own unique working folder | 338 // our own unique working folder |
339 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer
%d", identifier)) | 339 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer
%d", identifier)) |
340 if err := agg.setupAnalysis(executableDir); err != nil { | 340 if err := agg.setupAnalysis(executableDir); err != nil { |
341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.
Category, identifier, err) | 341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.
Category, identifier, err) |
342 return | 342 return |
343 } | 343 } |
344 for { | 344 for { |
345 select { | 345 select { |
(...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
501 // Append adds all of the strings to the underlying slice and sorts it | 501 // Append adds all of the strings to the underlying slice and sorts it |
502 func (s *SortedStringSlice) Append(strs []string) { | 502 func (s *SortedStringSlice) Append(strs []string) { |
503 s.strings = append(s.strings, strs...) | 503 s.strings = append(s.strings, strs...) |
504 s.strings.Sort() | 504 s.strings.Sort() |
505 } | 505 } |
506 | 506 |
507 // waitForUploads waits for uploadPackages to be sent through the forUpload chan
nel and then uploads | 507 // waitForUploads waits for uploadPackages to be sent through the forUpload chan
nel and then uploads |
508 // them. If any unrecoverable errors happen, this method terminates. | 508 // them. If any unrecoverable errors happen, this method terminates. |
509 func (agg *Aggregator) waitForUploads(identifier int) { | 509 func (agg *Aggregator) waitForUploads(identifier int) { |
510 defer agg.aggregationWaitGroup.Done() | 510 defer agg.aggregationWaitGroup.Done() |
511 » defer go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics
.DefaultRegistry).Dec(int64(1)) | 511 » defer metrics2.NewCounter("upload-process-count", map[string]string{"cat
egory": agg.Category}).Dec(int64(1)) |
512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) | 512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) |
513 for { | 513 for { |
514 select { | 514 select { |
515 case p := <-agg.forUpload: | 515 case p := <-agg.forUpload: |
516 atomic.AddInt64(&agg.uploadCount, int64(1)) | 516 atomic.AddInt64(&agg.uploadCount, int64(1)) |
517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { | 517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { |
518 glog.Infof("[%s] Skipping upload of grey fuzz %s
", agg.Category, p.Data.Name) | 518 glog.Infof("[%s] Skipping upload of grey fuzz %s
", agg.Category, p.Data.Name) |
519 continue | 519 continue |
520 } | 520 } |
521 if err := agg.upload(p); err != nil { | 521 if err := agg.upload(p); err != nil { |
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
620 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { | 620 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { |
621 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F
uzzName) | 621 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F
uzzName) |
622 } | 622 } |
623 return nil | 623 return nil |
624 } | 624 } |
625 | 625 |
626 // monitorStatus sets up the monitoring routine, which reports how big the work
queues are and how | 626 // monitorStatus sets up the monitoring routine, which reports how big the work
queues are and how |
627 // many processes are up. | 627 // many processes are up. |
628 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in
t) { | 628 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in
t) { |
629 defer agg.monitoringWaitGroup.Done() | 629 defer agg.monitoringWaitGroup.Done() |
630 » analysisProcessCount := go_metrics.GetOrRegisterCounter("analysis_proces
s_count", go_metrics.DefaultRegistry) | 630 » analysisProcessCount := metrics2.NewCounter("analysis-process-count", ma
p[string]string{"category": agg.Category}) |
631 » analysisProcessCount.Clear() | 631 » analysisProcessCount.Reset() |
632 analysisProcessCount.Inc(int64(numAnalysisProcesses)) | 632 analysisProcessCount.Inc(int64(numAnalysisProcesses)) |
633 » uploadProcessCount := go_metrics.GetOrRegisterCounter("upload_process_co
unt", go_metrics.DefaultRegistry) | 633 » uploadProcessCount := metrics2.NewCounter("upload-process-count", map[st
ring]string{"category": agg.Category}) |
634 » uploadProcessCount.Clear() | 634 » uploadProcessCount.Reset() |
635 uploadProcessCount.Inc(int64(numUploadProcesses)) | 635 uploadProcessCount.Inc(int64(numUploadProcesses)) |
636 | 636 |
637 t := time.Tick(config.Aggregator.StatusPeriod) | 637 t := time.Tick(config.Aggregator.StatusPeriod) |
638 for { | 638 for { |
639 select { | 639 select { |
640 case <-agg.monitoringShutdown: | 640 case <-agg.monitoringShutdown: |
641 glog.Infof("[%s] aggregator monitor got signal to shut d
own", agg.Category) | 641 glog.Infof("[%s] aggregator monitor got signal to shut d
own", agg.Category) |
642 return | 642 return |
643 case <-t: | 643 case <-t: |
644 » » » go_metrics.GetOrRegisterGauge("binary_analysis_queue_siz
e", go_metrics.DefaultRegistry).Update(int64(len(agg.forAnalysis))) | 644 » » » metrics2.GetInt64Metric("fuzzer.queue-size.analysis", ma
p[string]string{"category": agg.Category}).Update(int64(len(agg.forAnalysis))) |
645 » » » go_metrics.GetOrRegisterGauge("binary_upload_queue_size"
, go_metrics.DefaultRegistry).Update(int64(len(agg.forUpload))) | 645 » » » metrics2.GetInt64Metric("fuzzer.queue-size.upload", map[
string]string{"category": agg.Category}).Update(int64(len(agg.forUpload))) |
646 » » » go_metrics.GetOrRegisterGauge("binary_bug_report_queue_s
ize", go_metrics.DefaultRegistry).Update(int64(len(agg.forBugReporting))) | 646 » » » metrics2.GetInt64Metric("fuzzer.queue-size.bug-report",
map[string]string{"category": agg.Category}).Update(int64(len(agg.forBugReportin
g))) |
647 } | 647 } |
648 } | 648 } |
649 } | 649 } |
650 | 650 |
651 // Shutdown gracefully shuts down the aggregator. Anything that was being proces
sed will finish | 651 // Shutdown gracefully shuts down the aggregator. Anything that was being proces
sed will finish |
652 // prior to the shutdown. | 652 // prior to the shutdown. |
653 func (agg *Aggregator) ShutDown() { | 653 func (agg *Aggregator) ShutDown() { |
654 // once for the monitoring and once for the scanning routines | 654 // once for the monitoring and once for the scanning routines |
655 agg.monitoringShutdown <- true | 655 agg.monitoringShutdown <- true |
656 agg.monitoringShutdown <- true | 656 agg.monitoringShutdown <- true |
(...skipping 30 matching lines...) Expand all Loading... |
687 for _ = range t { | 687 for _ = range t { |
688 a = len(agg.forAnalysis) | 688 a = len(agg.forAnalysis) |
689 u = len(agg.forUpload) | 689 u = len(agg.forUpload) |
690 b = len(agg.forBugReporting) | 690 b = len(agg.forBugReporting) |
691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin
gQueue: %d", agg.Category, a, u, b) | 691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin
gQueue: %d", agg.Category, a, u, b) |
692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin
gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun
t) | 692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin
gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun
t) |
693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload
Count && agg.uploadCount == agg.bugReportCount { | 693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload
Count && agg.uploadCount == agg.bugReportCount { |
694 break | 694 break |
695 } | 695 } |
696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em
pty", agg.Category, config.Aggregator.StatusPeriod) | 696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em
pty", agg.Category, config.Aggregator.StatusPeriod) |
| 697 |
697 } | 698 } |
698 } | 699 } |
699 | 700 |
700 // ForceAnalysis directly adds the given path to the analysis queue, where it wi
ll be analyzed, | 701 // ForceAnalysis directly adds the given path to the analysis queue, where it wi
ll be analyzed, |
701 // uploaded and possibly bug reported. | 702 // uploaded and possibly bug reported. |
702 func (agg *Aggregator) ForceAnalysis(path string) { | 703 func (agg *Aggregator) ForceAnalysis(path string) { |
703 agg.forAnalysis <- path | 704 agg.forAnalysis <- path |
704 } | 705 } |
OLD | NEW |