Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(887)

Side by Side Diff: fuzzer/go/aggregator/aggregator.go

Issue 1691893002: Fuzzer now deduplicates on the analysis side instead of the download side (Closed) Base URL: https://skia.googlesource.com/buildbot@metrics
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | fuzzer/go/backend/version_updater.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 package aggregator 1 package aggregator
2 2
3 import ( 3 import (
4 "bytes" 4 "bytes"
5 "crypto/sha1" 5 "crypto/sha1"
6 "fmt" 6 "fmt"
7 "io" 7 "io"
8 "io/ioutil" 8 "io/ioutil"
9 "os" 9 "os"
10 "path/filepath" 10 "path/filepath"
11 "sort" 11 "sort"
12 "strings" 12 "strings"
13 "sync" 13 "sync"
14 "sync/atomic" 14 "sync/atomic"
15 "time" 15 "time"
16 16
17 "github.com/skia-dev/glog" 17 "github.com/skia-dev/glog"
18 "go.skia.org/infra/fuzzer/go/common" 18 "go.skia.org/infra/fuzzer/go/common"
19 "go.skia.org/infra/fuzzer/go/config" 19 "go.skia.org/infra/fuzzer/go/config"
20 » "go.skia.org/infra/fuzzer/go/frontend/data" 20 » "go.skia.org/infra/fuzzer/go/data"
21 » "go.skia.org/infra/fuzzer/go/deduplicator"
21 "go.skia.org/infra/go/exec" 22 "go.skia.org/infra/go/exec"
22 "go.skia.org/infra/go/fileutil" 23 "go.skia.org/infra/go/fileutil"
23 "go.skia.org/infra/go/metrics2" 24 "go.skia.org/infra/go/metrics2"
24 "go.skia.org/infra/go/util" 25 "go.skia.org/infra/go/util"
25 "golang.org/x/net/context" 26 "golang.org/x/net/context"
26 "google.golang.org/cloud/storage" 27 "google.golang.org/cloud/storage"
27 ) 28 )
28 29
29 // Aggregator is a key part of the fuzzing operation 30 // Aggregator is a key part of the fuzzing operation
30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). 31 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md).
(...skipping 18 matching lines...) Expand all
49 storageClient *storage.Client 50 storageClient *storage.Client
50 fuzzPath string 51 fuzzPath string
51 executablePath string 52 executablePath string
52 // For passing the paths of new binaries that should be scanned. 53 // For passing the paths of new binaries that should be scanned.
53 forAnalysis chan string 54 forAnalysis chan string
54 // For passing the file names of analyzed fuzzes that should be uploaded from where they rest on 55 // For passing the file names of analyzed fuzzes that should be uploaded from where they rest on
55 // disk in `fuzzPath` 56 // disk in `fuzzPath`
56 forUpload chan uploadPackage 57 forUpload chan uploadPackage
57 58
58 forBugReporting chan bugReportingPackage 59 forBugReporting chan bugReportingPackage
60
61 deduplicator *deduplicator.Deduplicator
62
59 // The shutdown channels are used to signal shutdowns. There are two gr oups, to 63 // The shutdown channels are used to signal shutdowns. There are two gr oups, to
60 // allow for a softer, cleaner shutdown w/ minimal lost work. 64 // allow for a softer, cleaner shutdown w/ minimal lost work.
61 // Group A (monitoring) includes the scanning and the monitoring routine . 65 // Group A (monitoring) includes the scanning and the monitoring routine .
62 // Group B (aggregation) include the analysis and upload routines and th e bug reporting routine. 66 // Group B (aggregation) include the analysis and upload routines and th e bug reporting routine.
63 monitoringShutdown chan bool 67 monitoringShutdown chan bool
64 monitoringWaitGroup *sync.WaitGroup 68 monitoringWaitGroup *sync.WaitGroup
65 aggregationShutdown chan bool 69 aggregationShutdown chan bool
66 aggregationWaitGroup *sync.WaitGroup 70 aggregationWaitGroup *sync.WaitGroup
67 // These three counts are used to determine if there is any pending work . 71 // These three counts are used to determine if there is any pending work .
68 // There is no pending work if all three of these values are equal and t he 72 // There is no pending work if all three of these values are equal and t he
69 // work queues are empty. 73 // work queues are empty.
70 analysisCount int64 74 analysisCount int64
71 uploadCount int64 75 uploadCount int64
72 bugReportCount int64 76 bugReportCount int64
77
78 greyNames []string
79 badNames []string
73 } 80 }
74 81
75 const ( 82 const (
76 » BAD_FUZZ = "bad" 83 » BAD_FUZZ = "bad"
77 » GREY_FUZZ = "grey" 84 » GREY_FUZZ = "grey"
85 » HANG_THRESHOLD = 100
78 ) 86 )
79 87
80 var ( 88 var (
81 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" 89 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug"
82 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" 90 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release"
83 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" 91 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug"
84 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" 92 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release"
85 ) 93 )
86 94
87 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS 95 // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS
88 type uploadPackage struct { 96 type uploadPackage struct {
89 Data data.GCSPackage 97 Data data.GCSPackage
90 FilePath string 98 FilePath string
91 // Must be BAD_FUZZ or GREY_FUZZ 99 // Must be BAD_FUZZ or GREY_FUZZ
92 FuzzType string 100 FuzzType string
93 } 101 }
94 102
95 // bugReportingPackage is a struct containing the pieces of a fuzz that may need to have 103 // bugReportingPackage is a struct containing the pieces of a fuzz that may need to have
96 // a bug filed or updated. 104 // a bug filed or updated.
97 type bugReportingPackage struct { 105 type bugReportingPackage struct {
98 FuzzName string 106 FuzzName string
99 CommitHash string 107 CommitHash string
100 IsBadFuzz bool 108 IsBadFuzz bool
101 } 109 }
102 110
103 // StartAggregator creates and starts a Aggregator. 111 // StartAggregator creates and starts a Aggregator.
104 // If there is a problem starting up, an error is returned. Other errors will b e logged. 112 // If there is a problem starting up, an error is returned. Other errors will b e logged.
105 func StartAggregator(s *storage.Client, category string) (*Aggregator, error) { 113 func StartAggregator(s *storage.Client, category string, startingReports <-chan data.FuzzReport) (*Aggregator, error) {
106 b := Aggregator{ 114 b := Aggregator{
107 Category: category, 115 Category: category,
108 storageClient: s, 116 storageClient: s,
109 fuzzPath: filepath.Join(config.Aggregator.FuzzPath, ca tegory), 117 fuzzPath: filepath.Join(config.Aggregator.FuzzPath, ca tegory),
110 executablePath: filepath.Join(config.Aggregator.ExecutablePa th, category), 118 executablePath: filepath.Join(config.Aggregator.ExecutablePa th, category),
111 forAnalysis: make(chan string, 10000), 119 forAnalysis: make(chan string, 10000),
112 forUpload: make(chan uploadPackage, 100), 120 forUpload: make(chan uploadPackage, 100),
113 forBugReporting: make(chan bugReportingPackage, 100), 121 forBugReporting: make(chan bugReportingPackage, 100),
114 MakeBugOnBadFuzz: false, 122 MakeBugOnBadFuzz: false,
115 UploadGreyFuzzes: false, 123 UploadGreyFuzzes: false,
124 deduplicator: deduplicator.New(),
116 monitoringShutdown: make(chan bool, 2), 125 monitoringShutdown: make(chan bool, 2),
117 // aggregationShutdown needs to be created with a calculated cap acity in start 126 // aggregationShutdown needs to be created with a calculated cap acity in start
118 } 127 }
119 128
129 // preload the duplicator
130 for report := range startingReports {
131 b.deduplicator.IsUnique(report)
132 }
133
120 return &b, b.start() 134 return &b, b.start()
121 } 135 }
122 136
123 // start starts up the Aggregator. It refreshes all status it needs and builds a debug and a 137 // start starts up the Aggregator. It refreshes all status it needs and builds a debug and a
124 // release version of Skia for use in analysis. It then spawns the aggregation pipeline and a 138 // release version of Skia for use in analysis. It then spawns the aggregation pipeline and a
125 // monitoring thread. 139 // monitoring thread.
126 func (agg *Aggregator) start() error { 140 func (agg *Aggregator) start() error {
127 // Set the wait groups to fresh 141 // Set the wait groups to fresh
128 agg.monitoringWaitGroup = &sync.WaitGroup{} 142 agg.monitoringWaitGroup = &sync.WaitGroup{}
129 agg.aggregationWaitGroup = &sync.WaitGroup{} 143 agg.aggregationWaitGroup = &sync.WaitGroup{}
(...skipping 210 matching lines...) Expand 10 before | Expand all | Expand 10 after
340 if err := agg.setupAnalysis(executableDir); err != nil { 354 if err := agg.setupAnalysis(executableDir); err != nil {
341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err) 355 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg. Category, identifier, err)
342 return 356 return
343 } 357 }
344 for { 358 for {
345 select { 359 select {
346 case badFuzzPath := <-agg.forAnalysis: 360 case badFuzzPath := <-agg.forAnalysis:
347 atomic.AddInt64(&agg.analysisCount, int64(1)) 361 atomic.AddInt64(&agg.analysisCount, int64(1))
348 err := agg.analysisHelper(executableDir, badFuzzPath) 362 err := agg.analysisHelper(executableDir, badFuzzPath)
349 if err != nil { 363 if err != nil {
364 atomic.AddInt64(&agg.analysisCount, int64(-1))
350 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.Category, identifier, err) 365 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.Category, identifier, err)
351 return 366 return
352 } 367 }
353 case <-agg.aggregationShutdown: 368 case <-agg.aggregationShutdown:
354 glog.Infof("[%s] Analyzer %d recieved shutdown signal", agg.Category, identifier) 369 glog.Infof("[%s] Analyzer %d recieved shutdown signal", agg.Category, identifier)
355 return 370 return
356 } 371 }
357 } 372 }
358 } 373 }
359 374
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after
509 func (agg *Aggregator) waitForUploads(identifier int) { 524 func (agg *Aggregator) waitForUploads(identifier int) {
510 defer agg.aggregationWaitGroup.Done() 525 defer agg.aggregationWaitGroup.Done()
511 defer metrics2.NewCounter("upload-process-count", map[string]string{"cat egory": agg.Category}).Dec(int64(1)) 526 defer metrics2.NewCounter("upload-process-count", map[string]string{"cat egory": agg.Category}).Dec(int64(1))
512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) 527 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier)
513 for { 528 for {
514 select { 529 select {
515 case p := <-agg.forUpload: 530 case p := <-agg.forUpload:
516 atomic.AddInt64(&agg.uploadCount, int64(1)) 531 atomic.AddInt64(&agg.uploadCount, int64(1))
517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { 532 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ {
518 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name) 533 glog.Infof("[%s] Skipping upload of grey fuzz %s ", agg.Category, p.Data.Name)
534 // We are skipping the bugReport, so increment t he counts.
535 atomic.AddInt64(&agg.bugReportCount, int64(1))
536 continue
537 }
538 if p.FuzzType != GREY_FUZZ && !agg.deduplicator.IsUnique (data.ParseReport(p.Data)) {
539 glog.Infof("[%s] Skipping upload of duplicate fu zz %s", agg.Category, p.Data.Name)
540 // We are skipping the bugReport, so increment t he counts.
541 atomic.AddInt64(&agg.bugReportCount, int64(1))
519 continue 542 continue
520 } 543 }
521 if err := agg.upload(p); err != nil { 544 if err := agg.upload(p); err != nil {
522 glog.Errorf("[%s] Uploader %d terminated due to error: %s", agg.Category, identifier, err) 545 glog.Errorf("[%s] Uploader %d terminated due to error: %s", agg.Category, identifier, err)
546 // We are skipping the bugReport, so increment t he counts.
547 atomic.AddInt64(&agg.bugReportCount, int64(1))
523 return 548 return
524 } 549 }
525 agg.forBugReporting <- bugReportingPackage{ 550 agg.forBugReporting <- bugReportingPackage{
526 FuzzName: p.Data.Name, 551 FuzzName: p.Data.Name,
527 CommitHash: config.Generator.SkiaVersion.Hash, 552 CommitHash: config.Generator.SkiaVersion.Hash,
528 IsBadFuzz: p.FuzzType == BAD_FUZZ, 553 IsBadFuzz: p.FuzzType == BAD_FUZZ,
529 } 554 }
530 case <-agg.aggregationShutdown: 555 case <-agg.aggregationShutdown:
531 glog.Infof("[%s] Uploader %d recieved shutdown signal", agg.Category, identifier) 556 glog.Infof("[%s] Uploader %d recieved shutdown signal", agg.Category, identifier)
532 return 557 return
533 } 558 }
534 } 559 }
535 } 560 }
536 561
537 // upload breaks apart the uploadPackage into its constituant parts and uploads them to GCS using 562 // upload breaks apart the uploadPackage into its constituant parts and uploads them to GCS using
538 // some helper methods. 563 // some helper methods.
539 func (agg *Aggregator) upload(p uploadPackage) error { 564 func (agg *Aggregator) upload(p uploadPackage) error {
540 glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|% d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.D ata.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data. Release.Dump), len(p.Data.Release.StdErr)) 565 glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|% d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.D ata.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data. Release.Dump), len(p.Data.Release.StdErr))
566 if p.FuzzType == GREY_FUZZ {
567 agg.greyNames = append(agg.greyNames, p.Data.Name)
568 } else {
569 agg.badNames = append(agg.badNames, p.Data.Name)
570 }
541 571
542 if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != n il { 572 if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != n il {
543 return err 573 return err
544 } 574 }
545 if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Debug.As an); err != nil { 575 if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Debug.As an); err != nil {
546 return err 576 return err
547 } 577 }
548 if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Debug.Du mp); err != nil { 578 if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Debug.Du mp); err != nil {
549 return err 579 return err
550 } 580 }
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
662 // aggregationShutdown channel. 692 // aggregationShutdown channel.
663 for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown); i++ { 693 for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown); i++ {
664 agg.aggregationShutdown <- true 694 agg.aggregationShutdown <- true
665 } 695 }
666 agg.aggregationWaitGroup.Wait() 696 agg.aggregationWaitGroup.Wait()
667 } 697 }
668 698
669 // RestartAnalysis restarts the shut down aggregator. Anything that is in the s canning directory 699 // RestartAnalysis restarts the shut down aggregator. Anything that is in the s canning directory
670 // should be cleared out, lest it be rescanned/analyzed. 700 // should be cleared out, lest it be rescanned/analyzed.
671 func (agg *Aggregator) RestartAnalysis() error { 701 func (agg *Aggregator) RestartAnalysis() error {
702 agg.deduplicator.Clear()
672 return agg.start() 703 return agg.start()
673 } 704 }
674 705
675 // WaitForEmptyQueues will return once there is nothing more in the analysis-upl oad-report 706 // WaitForEmptyQueues will return once there is nothing more in the analysis-upl oad-report
676 // pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is done. 707 // pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is done.
677 func (agg *Aggregator) WaitForEmptyQueues() { 708 func (agg *Aggregator) WaitForEmptyQueues() {
678 a := len(agg.forAnalysis) 709 a := len(agg.forAnalysis)
679 u := len(agg.forUpload) 710 u := len(agg.forUpload)
680 b := len(agg.forBugReporting) 711 b := len(agg.forBugReporting)
681 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount { 712 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount && agg.uploadCount == agg.bugReportCount {
682 glog.Infof("[%s] Queues were already empty", agg.Category) 713 glog.Infof("[%s] Queues were already empty", agg.Category)
683 return 714 return
684 } 715 }
685 t := time.Tick(config.Aggregator.StatusPeriod) 716 t := time.Tick(config.Aggregator.StatusPeriod)
686 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod) 717 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag g.Category, config.Aggregator.StatusPeriod)
718 hangCount := 0
687 for _ = range t { 719 for _ = range t {
688 a = len(agg.forAnalysis) 720 a = len(agg.forAnalysis)
689 u = len(agg.forUpload) 721 u = len(agg.forUpload)
690 b = len(agg.forBugReporting) 722 b = len(agg.forBugReporting)
691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b) 723 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin gQueue: %d", agg.Category, a, u, b)
692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t) 724 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun t)
693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount { 725 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload Count && agg.uploadCount == agg.bugReportCount {
694 break 726 break
695 } 727 }
728 // This prevents waiting forever if an upload crashes, aborts or otherwise hangs.
729 hangCount++
730 if hangCount >= HANG_THRESHOLD {
731 glog.Warningf("Was waiting for %d rounds and still wasn' t done. Quitting anyway.", hangCount)
732 }
733
696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod) 734 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em pty", agg.Category, config.Aggregator.StatusPeriod)
697
698 } 735 }
699 } 736 }
700 737
701 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed, 738 // ForceAnalysis directly adds the given path to the analysis queue, where it wi ll be analyzed,
702 // uploaded and possibly bug reported. 739 // uploaded and possibly bug reported.
703 func (agg *Aggregator) ForceAnalysis(path string) { 740 func (agg *Aggregator) ForceAnalysis(path string) {
704 agg.forAnalysis <- path 741 agg.forAnalysis <- path
705 } 742 }
743
744 func (agg *Aggregator) ClearUploadedFuzzNames() {
745 agg.greyNames = []string{}
746 agg.badNames = []string{}
747 }
748
749 func (agg *Aggregator) UploadedFuzzNames() (bad, grey []string) {
750 return agg.badNames, agg.greyNames
751 }
OLDNEW
« no previous file with comments | « no previous file | fuzzer/go/backend/version_updater.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698