Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: fuzzer/go/aggregator/aggregator.go

Issue 1682363003: Migrate fuzzer to use shiny new metrics2 package (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Rename metrics Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 package aggregator 1 package aggregator
2 2
3 import ( 3 import (
4 "bytes" 4 "bytes"
5 "crypto/sha1" 5 "crypto/sha1"
6 "fmt" 6 "fmt"
7 "io" 7 "io"
8 "io/ioutil" 8 "io/ioutil"
9 "os" 9 "os"
10 "path/filepath" 10 "path/filepath"
11 "sort" 11 "sort"
12 "strings" 12 "strings"
13 "sync" 13 "sync"
14 "sync/atomic" 14 "sync/atomic"
15 "time" 15 "time"
16 16
17 go_metrics "github.com/rcrowley/go-metrics"
18 "github.com/skia-dev/glog" 17 "github.com/skia-dev/glog"
19 "go.skia.org/infra/fuzzer/go/common" 18 "go.skia.org/infra/fuzzer/go/common"
20 "go.skia.org/infra/fuzzer/go/config" 19 "go.skia.org/infra/fuzzer/go/config"
21 "go.skia.org/infra/fuzzer/go/frontend/data" 20 "go.skia.org/infra/fuzzer/go/frontend/data"
22 "go.skia.org/infra/go/exec" 21 "go.skia.org/infra/go/exec"
23 "go.skia.org/infra/go/fileutil" 22 "go.skia.org/infra/go/fileutil"
23 "go.skia.org/infra/go/metrics2"
24 "go.skia.org/infra/go/util" 24 "go.skia.org/infra/go/util"
25 "golang.org/x/net/context" 25 "golang.org/x/net/context"
26 "google.golang.org/cloud/storage" 26 "google.golang.org/cloud/storage"
27 ) 27 )
28 28
29 // Aggregator is a key part of the fuzzing operation 29 // Aggregator is a key part of the fuzzing operation
30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). 30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md).
31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It 31 // It will find new bad fuzzes generated by afl-fuzz and create the metadata req uired for them. It
32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a 32 // does this by searching in the specified AflOutputPath for new crashes and mov es them to a
33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the 33 // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the
(...skipping 204 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date. 238 // scanHelper runs findBadFuzzPaths, logs the output and keeps alreadyFoundFuzze s up to date.
239 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { 239 func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error {
240 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes) 240 newlyFound, err := agg.findBadFuzzPaths(alreadyFoundFuzzes)
241 if err != nil { 241 if err != nil {
242 return err 242 return err
243 } 243 }
244 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after 244 // AFL-fuzz does not write crashes or hangs atomically, so this workarou nd waits for a bit after
245 // we have references to where the crashes will be. 245 // we have references to where the crashes will be.
246 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream. 246 // TODO(kjlubick), switch to using flock once afl-fuzz implements that u pstream.
247 time.Sleep(time.Second) 247 time.Sleep(time.Second)
248 » go_metrics.GetOrRegisterGauge("binary_newly_found_fuzzes", go_metrics.De faultRegistry).Update(int64(len(newlyFound))) 248 » metrics2.GetInt64Metric("fuzzer.fuzzes.newly-found", map[string]string{" category": agg.Category}).Update(int64(len(newlyFound)))
249 » glog.Infof("[%s] %d newly found bad binary fuzzes", agg.Category, len(ne wlyFound)) 249 » glog.Infof("[%s] %d newly found bad fuzzes", agg.Category, len(newlyFoun d))
250 for _, f := range newlyFound { 250 for _, f := range newlyFound {
251 agg.forAnalysis <- f 251 agg.forAnalysis <- f
252 } 252 }
253 alreadyFoundFuzzes.Append(newlyFound) 253 alreadyFoundFuzzes.Append(newlyFound)
254 return nil 254 return nil
255 } 255 }
256 256
257 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and 257 // findBadFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and
258 // returns the path to all files that are in a crash* folder that are not alread y in 258 // returns the path to all files that are in a crash* folder that are not alread y in
259 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them. 259 // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them.
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
325 } 325 }
326 return badFuzzPaths, nil 326 return badFuzzPaths, nil
327 } 327 }
328 328
329 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of 329 // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) a nd makes a copy of
330 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied 330 // them in agg.fuzzPath with their hash as a file name. It then analyzes it usin g the supplied
331 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors 331 // AnalysisPackage and then signals the results should be uploaded. If any unrec overable errors
332 // happen, this method terminates. 332 // happen, this method terminates.
333 func (agg *Aggregator) waitForAnalysis(identifier int) { 333 func (agg *Aggregator) waitForAnalysis(identifier int) {
334 defer agg.aggregationWaitGroup.Done() 334 defer agg.aggregationWaitGroup.Done()
335 » defer go_metrics.GetOrRegisterCounter("analysis_process_count", go_metri cs.DefaultRegistry).Dec(int64(1)) 335 » defer metrics2.NewCounter("analysis-process-count", map[string]string{"c ategory": agg.Category}).Dec(int64(1))
336 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier) 336 glog.Infof("[%s] Spawning analyzer %d", agg.Category, identifier)
337 337
338 // our own unique working folder 338 // our own unique working folder
339 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier)) 339 executableDir := filepath.Join(agg.executablePath, fmt.Sprintf("analyzer %d", identifier))
340 if err := agg.setupAnalysis(executableDir); err != nil { 340 if err := agg.setupAnalysis(executableDir); err != nil {
341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err) 341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err)
342 return 342 return
343 } 343 }
344 for { 344 for {
345 select { 345 select {
(...skipping 155 matching lines...) Expand 10 before | Expand all | Expand 10 after
501 // Append adds all of the strings to the underlying slice and sorts it 501 // Append adds all of the strings to the underlying slice and sorts it
502 func (s *SortedStringSlice) Append(strs []string) { 502 func (s *SortedStringSlice) Append(strs []string) {
503 s.strings = append(s.strings, strs...) 503 s.strings = append(s.strings, strs...)
504 s.strings.Sort() 504 s.strings.Sort()
505 } 505 }
506 506
507 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads 507 // waitForUploads waits for uploadPackages to be sent through the forUpload chan nel and then uploads
508 // them. If any unrecoverable errors happen, this method terminates. 508 // them. If any unrecoverable errors happen, this method terminates.
509 func (agg *Aggregator) waitForUploads(identifier int) { 509 func (agg *Aggregator) waitForUploads(identifier int) {
510 defer agg.aggregationWaitGroup.Done() 510 defer agg.aggregationWaitGroup.Done()
511 » defer go_metrics.GetOrRegisterCounter("upload_process_count", go_metrics .DefaultRegistry).Dec(int64(1)) 511 » defer metrics2.NewCounter("upload-process-count", map[string]string{"cat egory": agg.Category}).Dec(int64(1))
512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) 512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier)
513 for { 513 for {
514 select { 514 select {
515 case p := <-agg.forUpload: 515 case p := <-agg.forUpload:
516 atomic.AddInt64(&agg.uploadCount, int64(1)) 516 atomic.AddInt64(&agg.uploadCount, int64(1))
517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { 517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ {
518 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name) 518 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name)
519 continue 519 continue
520 } 520 }
521 if err := agg.upload(p); err != nil { 521 if err := agg.upload(p); err != nil {
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
620 if agg.MakeBugOnBadFuzz && p.IsBadFuzz { 620 if agg.MakeBugOnBadFuzz && p.IsBadFuzz {
621 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName) 621 glog.Warningf("[%s] Should create bug for %s", agg.Category, p.F uzzName)
622 } 622 }
623 return nil 623 return nil
624 } 624 }
625 625
626 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how 626 // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how
627 // many processes are up. 627 // many processes are up.
628 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) { 628 func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses in t) {
629 defer agg.monitoringWaitGroup.Done() 629 defer agg.monitoringWaitGroup.Done()
630 » analysisProcessCount := go_metrics.GetOrRegisterCounter("analysis_proces s_count", go_metrics.DefaultRegistry) 630 » analysisProcessCount := metrics2.NewCounter("analysis-process-count", ma p[string]string{"category": agg.Category})
631 » analysisProcessCount.Clear() 631 » analysisProcessCount.Reset()
632 analysisProcessCount.Inc(int64(numAnalysisProcesses)) 632 analysisProcessCount.Inc(int64(numAnalysisProcesses))
633 » uploadProcessCount := go_metrics.GetOrRegisterCounter("upload_process_co unt", go_metrics.DefaultRegistry) 633 » uploadProcessCount := metrics2.NewCounter("upload-process-count", map[st ring]string{"category": agg.Category})
634 » uploadProcessCount.Clear() 634 » uploadProcessCount.Reset()
635 uploadProcessCount.Inc(int64(numUploadProcesses)) 635 uploadProcessCount.Inc(int64(numUploadProcesses))
636 636
637 t := time.Tick(config.Aggregator.StatusPeriod) 637 t := time.Tick(config.Aggregator.StatusPeriod)
638 for { 638 for {
639 select { 639 select {
640 case <-agg.monitoringShutdown: 640 case <-agg.monitoringShutdown:
641 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category) 641 glog.Infof("[%s] aggregator monitor got signal to shut d own", agg.Category)
642 return 642 return
643 case <-t: 643 case <-t:
644 » » » go_metrics.GetOrRegisterGauge("binary_analysis_queue_siz e", go_metrics.DefaultRegistry).Update(int64(len(agg.forAnalysis))) 644 » » » metrics2.GetInt64Metric("fuzzer.queue-size.analysis", ma p[string]string{"category": agg.Category}).Update(int64(len(agg.forAnalysis)))
645 » » » go_metrics.GetOrRegisterGauge("binary_upload_queue_size" , go_metrics.DefaultRegistry).Update(int64(len(agg.forUpload))) 645 » » » metrics2.GetInt64Metric("fuzzer.queue-size.upload", map[ string]string{"category": agg.Category}).Update(int64(len(agg.forUpload)))
646 » » » go_metrics.GetOrRegisterGauge("binary_bug_report_queue_s ize", go_metrics.DefaultRegistry).Update(int64(len(agg.forBugReporting))) 646 » » » metrics2.GetInt64Metric("fuzzer.queue-size.bug_report", map[string]string{"category": agg.Category}).Update(int64(len(agg.forBugReportin g)))
borenet 2016/02/11 12:23:28 Nit: bug_report -> bug-report
kjlubick 2016/02/11 17:54:41 Done.
647 } 647 }
648 } 648 }
649 } 649 }
650 650
651 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish 651 // Shutdown gracefully shuts down the aggregator. Anything that was being proces sed will finish
652 // prior to the shutdown. 652 // prior to the shutdown.
653 func (agg *Aggregator) ShutDown() { 653 func (agg *Aggregator) ShutDown() {
654 // once for the monitoring and once for the scanning routines 654 // once for the monitoring and once for the scanning routines
655 agg.monitoringShutdown <- true 655 agg.monitoringShutdown <- true
656 agg.monitoringShutdown <- true 656 agg.monitoringShutdown <- true
(...skipping 30 matching lines...) Expand all
687 for _ = range t { 687 for _ = range t {
688 a = len(agg.forAnalysis) 688 a = len(agg.forAnalysis)
689 u = len(agg.forUpload) 689 u = len(agg.forUpload)
690 b = len(agg.forBugReporting) 690 b = len(agg.forBugReporting)
691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b) 691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b)
692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t) 692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t)
693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount { 693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount {
694 break 694 break
695 } 695 }
696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod) 696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod)
697
697 } 698 }
698 } 699 }
699 700
700 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed, 701 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed,
701 // uploaded and possibly bug reported. 702 // uploaded and possibly bug reported.
702 func (agg *Aggregator) ForceAnalysis(path string) { 703 func (agg *Aggregator) ForceAnalysis(path string) {
703 agg.forAnalysis <- path 704 agg.forAnalysis <- path
704 } 705 }
OLDNEW
« no previous file with comments | « alertserver/alerts.cfg ('k') | fuzzer/go/fuzzer-be/main.go » ('j') | fuzzer/go/fuzzer-be/main.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698