Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(119)

Side by Side Diff: fuzzer/go/aggregator/aggregator.go

Issue 1682363003: Migrate fuzzer to use shiny new metrics2 package (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 package aggregator 1 package aggregator
2 2
3 import ( 3 import (
4 "bytes" 4 "bytes"
5 "crypto/sha1" 5 "crypto/sha1"
6 "fmt" 6 "fmt"
7 "io" 7 "io"
8 "io/ioutil" 8 "io/ioutil"
9 "os" 9 "os"
10 "path/filepath" 10 "path/filepath"
11 "sort" 11 "sort"
12 "strings" 12 "strings"
13 "sync" 13 "sync"
14 "sync/atomic" 14 "sync/atomic"
15 "time" 15 "time"
16 16
17 go_metrics "github.com/rcrowley/go-metrics"
18 "github.com/skia-dev/glog" 17 "github.com/skia-dev/glog"
19 "go.skia.org/infra/fuzzer/go/common" 18 "go.skia.org/infra/fuzzer/go/common"
20 "go.skia.org/infra/fuzzer/go/config" 19 "go.skia.org/infra/fuzzer/go/config"
21 "go.skia.org/infra/fuzzer/go/frontend/data" 20 "go.skia.org/infra/fuzzer/go/frontend/data"
22 "go.skia.org/infra/go/exec" 21 "go.skia.org/infra/go/exec"
23 "go.skia.org/infra/go/fileutil" 22 "go.skia.org/infra/go/fileutil"
23 "go.skia.org/infra/go/metrics2"
24 "go.skia.org/infra/go/util" 24 "go.skia.org/infra/go/util"
25 "golang.org/x/net/context" 25 "golang.org/x/net/context"
26 "google.golang.org/cloud/storage" 26 "google.golang.org/cloud/storage"
27 ) 27 )
28 28
29 // Aggregator is a key part of the fuzzing operation 29 // Aggregator is a key part of the fuzzing operation
30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). 30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md).
31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It 31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It
32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a 32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a
33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the 33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
66 aggregationWaitGroup *sync.WaitGroup 66 aggregationWaitGroup *sync.WaitGroup
67 // These three counts are used to determine if there is any pending work . 67 // These three counts are used to determine if there is any pending work .
68 // There is no pending work if all three of these values are equal and t he 68 // There is no pending work if all three of these values are equal and t he
69 // work queues are empty. 69 // work queues are empty.
70 analysisCount int64 70 analysisCount int64
71 uploadCount int64 71 uploadCount int64
72 bugReportCount int64 72 bugReportCount int64
73 } 73 }
74 74
75 const ( 75 const (
76 » BAD_FUZZ = "bad" 76 » BAD_FUZZ = "bad"
77 » GREY_FUZZ = "grey" 77 » GREY_FUZZ = "grey"
78 » HANG_THRESHOLD = 100
78 ) 79 )
79 80
80 var ( 81 var (
81 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" 82 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug"
82 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" 83 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release"
83 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" 84 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug"
84 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" 85 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release"
85 ) 86 )
86 87
87 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS 88 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date. 239 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date.
239 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { 240 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error {
240 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) 241 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes)
241 if err != nil { 242 if err != nil {
242 return err 243 return err
243 } 244 }
244 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after 245 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after
245 // we have references to where the crashes will be. 246 // we have references to where the crashes will be.
246 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream. 247 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream.
247 time.Sleep(time.Second) 248 time.Sleep(time.Second)
248 » go_metrics.GetOrRegisterGauge("binary_newly_found_fuzzes", go_metrics.De faultRegistry).Update(int64(len(newlyFound))) 249 » metrics2.GetInt64Metric("newly_found_fuzzes", map[string]string{"categor y": agg.Category}).Update(int64(len(newlyFound)))
borenet 2016/02/10 15:32:32 Naming conventions: measurements and tag keys shou
kjlubick 2016/02/10 16:14:29 Done.
249 » glog.Infof("[%s] %d newly found bad binary fuzzes", agg.Category, len(ne wlyFound)) 250 » glog.Infof("[%s] %d newly found bad fuzzes", agg.Category, len(newlyFoun d))
250 for _, f := range newlyFound { 251 for _, f := range newlyFound {
251 agg.forAnalysis <- f 252 agg.forAnalysis <- f
252 } 253 }
253 alreadyFoundFuzzes.Append(newlyFound) 254 alreadyFoundFuzzes.Append(newlyFound)
254 return nil 255 return nil
255 } 256 }
256 257
257 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and 258 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and
258 // returns the path to all files that are in a crash* folder that are not alread y in 259 // returns the path to all files that are in a crash* folder that are not alread y in
259 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them. 260 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them.
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
325 } 326 }
326 return badFuzzPaths, nil 327 return badFuzzPaths, nil
327 } 328 }
328 329
329 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of 330 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of
330 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied 331 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied
331 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors 332 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors
332 // happen, this method terminates. 333 // happen, this method terminates.
333 func (agg *Aggregator) waitForAnalysis(identifier int) { 334 func (agg *Aggregator) waitForAnalysis(identifier int) {
334 defer agg.aggregationWaitGroup.Done() 335 defer agg.aggregationWaitGroup.Done()
335 » defer go_metrics.GetOrRegisterCounter("analysis_process_count", go_metri cs.DefaultRegistry).Dec(int64(1)) 336 » defer metrics2.NewCounter("analysis_process_count", map[string]string{"c ategory": agg.Category}).Dec(int64(1))
borenet 2016/02/10 15:32:32 Use dashes, ie. "analysis-process-count", here and
kjlubick 2016/02/10 16:14:29 Done.
336 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) 337 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier)
337 338
338 // our own unique working folder 339 // our own unique working folder
339 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier)) 340 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier))
340 if err := agg.setupAnalysis(executableDir); err != nil { 341 if err := agg.setupAnalysis(executableDir); err != nil {
341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err) 342 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err)
342 return 343 return
343 } 344 }
344 for { 345 for {
345 select { 346 select {
(...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after
501 // Append adds all of the strings to the underlying slice and sorts it 502 // Append adds all of the strings to the underlying slice and sorts it
502 func (s *SortedStringSlice) Append(strs []string) { 503 func (s *SortedStringSlice) Append(strs []string) {
503 s.strings = append(s.strings, strs...) 504 s.strings = append(s.strings, strs...)
504 s.strings.Sort() 505 s.strings.Sort()
505 } 506 }
506 507
507 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads 508 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads
508 // them. If any unrecoverable errors happen, this method terminates. 509 // them. If any unrecoverable errors happen, this method terminates.
509 func (agg *Aggregator) waitForUploads(identifier int) { 510 func (agg *Aggregator) waitForUploads(identifier int) {
510 defer agg.aggregationWaitGroup.Done() 511 defer agg.aggregationWaitGroup.Done()
511 » defer go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics .DefaultRegistry).Dec(int64(1)) 512 » defer metrics2.NewCounter("upload_process_count", map[string]string{"cat egory": agg.Category}).Dec(int64(1))
512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) 513 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier)
513 for { 514 for {
514 select { 515 select {
515 case p := <-agg.forUpload: 516 case p := <-agg.forUpload:
516 atomic.AddInt64(&agg.uploadCount, int64(1)) 517 atomic.AddInt64(&agg.uploadCount, int64(1))
517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { 518 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ {
518 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name) 519 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name)
519 continue 520 continue
520 } 521 }
521 if err := agg.upload(p); err != nil { 522 if err := agg.upload(p); err != nil {
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
620 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { 621 if agg.MakeBugOnBadFuzz && p.IsBadFuzz {
621 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName) 622 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName)
622 } 623 }
623 return nil 624 return nil
624 } 625 }
625 626
626 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how 627 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how
627 // many processes are up. 628 // many processes are up.
628 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) { 629 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) {
629 defer agg.monitoringWaitGroup.Done() 630 defer agg.monitoringWaitGroup.Done()
630 » analysisProcessCount := go_metrics.GetOrRegisterCounter("analysis_proces s_count", go_metrics.DefaultRegistry) 631 » analysisProcessCount := metrics2.NewCounter("analysis_process_count", ma p[string]string{"category": agg.Category})
631 » analysisProcessCount.Clear() 632 » analysisProcessCount.Reset()
632 analysisProcessCount.Inc(int64(numAnalysisProcesses)) 633 analysisProcessCount.Inc(int64(numAnalysisProcesses))
633 » uploadProcessCount := go_metrics.GetOrRegisterCounter("upload_process_co unt", go_metrics.DefaultRegistry) 634 » uploadProcessCount := metrics2.NewCounter("upload_process_count", map[st ring]string{"category": agg.Category})
634 » uploadProcessCount.Clear() 635 » uploadProcessCount.Reset()
635 uploadProcessCount.Inc(int64(numUploadProcesses)) 636 uploadProcessCount.Inc(int64(numUploadProcesses))
636 637
637 t := time.Tick(config.Aggregator.StatusPeriod) 638 t := time.Tick(config.Aggregator.StatusPeriod)
638 for { 639 for {
639 select { 640 select {
640 case <-agg.monitoringShutdown: 641 case <-agg.monitoringShutdown:
641 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category) 642 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category)
642 return 643 return
643 case <-t: 644 case <-t:
644 » » » go_metrics.GetOrRegisterGauge("binary_analysis_queue_siz e", go_metrics.DefaultRegistry).Update(int64(len(agg.forAnalysis))) 645 » » » metrics2.GetInt64Metric("analysis_queue_size", map[strin g]string{"category": agg.Category}).Update(int64(len(agg.forAnalysis)))
645 » » » go_metrics.GetOrRegisterGauge("binary_upload_queue_size" , go_metrics.DefaultRegistry).Update(int64(len(agg.forUpload))) 646 » » » metrics2.GetInt64Metric("upload_queue_size", map[string] string{"category": agg.Category}).Update(int64(len(agg.forUpload)))
646 » » » go_metrics.GetOrRegisterGauge("binary_bug_report_queue_s ize", go_metrics.DefaultRegistry).Update(int64(len(agg.forBugReporting))) 647 » » » metrics2.GetInt64Metric("bug_report_queue_size", map[str ing]string{"category": agg.Category}).Update(int64(len(agg.forBugReporting)))
647 } 648 }
648 } 649 }
649 } 650 }
650 651
651 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish 652 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish
652 // prior to the shutdown. 653 // prior to the shutdown.
653 func (agg *Aggregator) ShutDown() { 654 func (agg *Aggregator) ShutDown() {
654 // once for the monitoring and once for the scanning routines 655 // once for the monitoring and once for the scanning routines
655 agg.monitoringShutdown <- true 656 agg.monitoringShutdown <- true
656 agg.monitoringShutdown <- true 657 agg.monitoringShutdown <- true
(...skipping 20 matching lines...) Expand all
677 func (agg *Aggregator) WaitForEmptyQueues() { 678 func (agg *Aggregator) WaitForEmptyQueues() {
678 a := len(agg.forAnalysis) 679 a := len(agg.forAnalysis)
679 u := len(agg.forUpload) 680 u := len(agg.forUpload)
680 b := len(agg.forBugReporting) 681 b := len(agg.forBugReporting)
681 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { 682 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount {
682 glog.Infof("[%s] Queues were already empty", agg.Category) 683 glog.Infof("[%s] Queues were already empty", agg.Category)
683 return 684 return
684 } 685 }
685 t := time.Tick(config.Aggregator.StatusPeriod) 686 t := time.Tick(config.Aggregator.StatusPeriod)
686 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod) 687 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod)
688 hangCount := 0
687 for _ = range t { 689 for _ = range t {
688 a = len(agg.forAnalysis) 690 a = len(agg.forAnalysis)
689 u = len(agg.forUpload) 691 u = len(agg.forUpload)
690 b = len(agg.forBugReporting) 692 b = len(agg.forBugReporting)
691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b) 693 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b)
692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t) 694 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t)
693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount { 695 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount {
694 break 696 break
695 } 697 }
698 // This prevents waiting forever if an upload crashes, aborts or otherwise hangs.
699 hangCount++
700 if hangCount >= HANG_THRESHOLD {
701 glog.Warningf("Was waiting for %d rounds and still wasn' t done. Quitting anyway.", hangCount)
702 }
borenet 2016/02/10 15:32:32 Does this logic belong in this CL?
kjlubick 2016/02/10 16:14:29 Removed.
696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod) 703 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod)
704
697 } 705 }
698 } 706 }
699 707
700 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed, 708 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed,
701 // uploaded and possibly bug reported. 709 // uploaded and possibly bug reported.
702 func (agg *Aggregator) ForceAnalysis(path string) { 710 func (agg *Aggregator) ForceAnalysis(path string) {
703 agg.forAnalysis <- path 711 agg.forAnalysis <- path
704 } 712 }
OLDNEW
« no previous file with comments | « alertserver/alerts.cfg ('k') | fuzzer/go/config/config.go » ('j') | fuzzer/go/config/config.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698