OLD | NEW |
1 package aggregator | 1 package aggregator |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "crypto/sha1" | 5 "crypto/sha1" |
6 "fmt" | 6 "fmt" |
7 "io" | 7 "io" |
8 "io/ioutil" | 8 "io/ioutil" |
9 "os" | 9 "os" |
10 "path/filepath" | 10 "path/filepath" |
11 "sort" | 11 "sort" |
12 "strings" | 12 "strings" |
13 "sync" | 13 "sync" |
14 "sync/atomic" | 14 "sync/atomic" |
15 "time" | 15 "time" |
16 | 16 |
17 "github.com/skia-dev/glog" | 17 "github.com/skia-dev/glog" |
18 "go.skia.org/infra/fuzzer/go/common" | 18 "go.skia.org/infra/fuzzer/go/common" |
19 "go.skia.org/infra/fuzzer/go/config" | 19 "go.skia.org/infra/fuzzer/go/config" |
20 » "go.skia.org/infra/fuzzer/go/frontend/data" | 20 » "go.skia.org/infra/fuzzer/go/data" |
| 21 » "go.skia.org/infra/fuzzer/go/deduplicator" |
21 "go.skia.org/infra/go/exec" | 22 "go.skia.org/infra/go/exec" |
22 "go.skia.org/infra/go/fileutil" | 23 "go.skia.org/infra/go/fileutil" |
23 "go.skia.org/infra/go/metrics2" | 24 "go.skia.org/infra/go/metrics2" |
24 "go.skia.org/infra/go/util" | 25 "go.skia.org/infra/go/util" |
25 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
26 "google.golang.org/cloud/storage" | 27 "google.golang.org/cloud/storage" |
27 ) | 28 ) |
28 | 29 |
29 // Aggregator is a key part of the fuzzing operation | 30 // Aggregator is a key part of the fuzzing operation |
30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). | 31 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). |
(...skipping 18 matching lines...) Expand all Loading... |
49 storageClient *storage.Client | 50 storageClient *storage.Client |
50 fuzzPath string | 51 fuzzPath string |
51 executablePath string | 52 executablePath string |
52 // For passing the paths of new binaries that should be scanned. | 53 // For passing the paths of new binaries that should be scanned. |
53 forAnalysis chan string | 54 forAnalysis chan string |
54 // For passing the file names of analyzed fuzzes that should be uploaded
from where they rest on | 55 // For passing the file names of analyzed fuzzes that should be uploaded
from where they rest on |
55 // disk in `fuzzPath` | 56 // disk in `fuzzPath` |
56 forUpload chan uploadPackage | 57 forUpload chan uploadPackage |
57 | 58 |
58 forBugReporting chan bugReportingPackage | 59 forBugReporting chan bugReportingPackage |
| 60 |
| 61 deduplicator *deduplicator.Deduplicator |
| 62 |
59 // The shutdown channels are used to signal shutdowns. There are two gr
oups, to | 63 // The shutdown channels are used to signal shutdowns. There are two gr
oups, to |
60 // allow for a softer, cleaner shutdown w/ minimal lost work. | 64 // allow for a softer, cleaner shutdown w/ minimal lost work. |
61 // Group A (monitoring) includes the scanning and the monitoring routine
. | 65 // Group A (monitoring) includes the scanning and the monitoring routine
. |
62 // Group B (aggregation) include the analysis and upload routines and th
e bug reporting routine. | 66 // Group B (aggregation) include the analysis and upload routines and th
e bug reporting routine. |
63 monitoringShutdown chan bool | 67 monitoringShutdown chan bool |
64 monitoringWaitGroup *sync.WaitGroup | 68 monitoringWaitGroup *sync.WaitGroup |
65 aggregationShutdown chan bool | 69 aggregationShutdown chan bool |
66 aggregationWaitGroup *sync.WaitGroup | 70 aggregationWaitGroup *sync.WaitGroup |
67 // These three counts are used to determine if there is any pending work
. | 71 // These three counts are used to determine if there is any pending work
. |
68 // There is no pending work if all three of these values are equal and t
he | 72 // There is no pending work if all three of these values are equal and t
he |
69 // work queues are empty. | 73 // work queues are empty. |
70 analysisCount int64 | 74 analysisCount int64 |
71 uploadCount int64 | 75 uploadCount int64 |
72 bugReportCount int64 | 76 bugReportCount int64 |
| 77 |
| 78 greyNames []string |
| 79 badNames []string |
73 } | 80 } |
74 | 81 |
75 const ( | 82 const ( |
76 » BAD_FUZZ = "bad" | 83 » BAD_FUZZ = "bad" |
77 » GREY_FUZZ = "grey" | 84 » GREY_FUZZ = "grey" |
| 85 » HANG_THRESHOLD = 100 |
78 ) | 86 ) |
79 | 87 |
80 var ( | 88 var ( |
81 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" | 89 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" |
82 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" | 90 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" |
83 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" | 91 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" |
84 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" | 92 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" |
85 ) | 93 ) |
86 | 94 |
87 // uploadPackage is a struct containing all the pieces of a fuzz that need to be
uploaded to GCS | 95 // uploadPackage is a struct containing all the pieces of a fuzz that need to be
uploaded to GCS |
88 type uploadPackage struct { | 96 type uploadPackage struct { |
89 Data data.GCSPackage | 97 Data data.GCSPackage |
90 FilePath string | 98 FilePath string |
91 // Must be BAD_FUZZ or GREY_FUZZ | 99 // Must be BAD_FUZZ or GREY_FUZZ |
92 FuzzType string | 100 FuzzType string |
93 } | 101 } |
94 | 102 |
95 // bugReportingPackage is a struct containing the pieces of a fuzz that may need
to have | 103 // bugReportingPackage is a struct containing the pieces of a fuzz that may need
to have |
96 // a bug filed or updated. | 104 // a bug filed or updated. |
97 type bugReportingPackage struct { | 105 type bugReportingPackage struct { |
98 FuzzName string | 106 FuzzName string |
99 CommitHash string | 107 CommitHash string |
100 IsBadFuzz bool | 108 IsBadFuzz bool |
101 } | 109 } |
102 | 110 |
103 // StartAggregator creates and starts a Aggregator. | 111 // StartAggregator creates and starts a Aggregator. |
104 // If there is a problem starting up, an error is returned. Other errors will b
e logged. | 112 // If there is a problem starting up, an error is returned. Other errors will b
e logged. |
105 func StartAggregator(s *storage.Client, category string) (*Aggregator, error) { | 113 func StartAggregator(s *storage.Client, category string, startingReports <-chan
data.FuzzReport) (*Aggregator, error) { |
106 b := Aggregator{ | 114 b := Aggregator{ |
107 Category: category, | 115 Category: category, |
108 storageClient: s, | 116 storageClient: s, |
109 fuzzPath: filepath.Join(config.Aggregator.FuzzPath, ca
tegory), | 117 fuzzPath: filepath.Join(config.Aggregator.FuzzPath, ca
tegory), |
110 executablePath: filepath.Join(config.Aggregator.ExecutablePa
th, category), | 118 executablePath: filepath.Join(config.Aggregator.ExecutablePa
th, category), |
111 forAnalysis: make(chan string, 10000), | 119 forAnalysis: make(chan string, 10000), |
112 forUpload: make(chan uploadPackage, 100), | 120 forUpload: make(chan uploadPackage, 100), |
113 forBugReporting: make(chan bugReportingPackage, 100), | 121 forBugReporting: make(chan bugReportingPackage, 100), |
114 MakeBugOnBadFuzz: false, | 122 MakeBugOnBadFuzz: false, |
115 UploadGreyFuzzes: false, | 123 UploadGreyFuzzes: false, |
| 124 deduplicator: deduplicator.New(), |
116 monitoringShutdown: make(chan bool, 2), | 125 monitoringShutdown: make(chan bool, 2), |
117 // aggregationShutdown needs to be created with a calculated cap
acity in start | 126 // aggregationShutdown needs to be created with a calculated cap
acity in start |
118 } | 127 } |
119 | 128 |
| 129 // preload the duplicator |
| 130 for report := range startingReports { |
| 131 b.deduplicator.IsUnique(report) |
| 132 } |
| 133 |
120 return &b, b.start() | 134 return &b, b.start() |
121 } | 135 } |
122 | 136 |
123 // start starts up the Aggregator. It refreshes all status it needs and builds
a debug and a | 137 // start starts up the Aggregator. It refreshes all status it needs and builds
a debug and a |
124 // release version of Skia for use in analysis. It then spawns the aggregation
pipeline and a | 138 // release version of Skia for use in analysis. It then spawns the aggregation
pipeline and a |
125 // monitoring thread. | 139 // monitoring thread. |
126 func (agg *Aggregator) start() error { | 140 func (agg *Aggregator) start() error { |
127 // Set the wait groups to fresh | 141 // Set the wait groups to fresh |
128 agg.monitoringWaitGroup = &sync.WaitGroup{} | 142 agg.monitoringWaitGroup = &sync.WaitGroup{} |
129 agg.aggregationWaitGroup = &sync.WaitGroup{} | 143 agg.aggregationWaitGroup = &sync.WaitGroup{} |
(...skipping 210 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
340 if err := agg.setupAnalysis(executableDir); err != nil { | 354 if err := agg.setupAnalysis(executableDir); err != nil { |
341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.
Category, identifier, err) | 355 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.
Category, identifier, err) |
342 return | 356 return |
343 } | 357 } |
344 for { | 358 for { |
345 select { | 359 select { |
346 case badFuzzPath := <-agg.forAnalysis: | 360 case badFuzzPath := <-agg.forAnalysis: |
347 atomic.AddInt64(&agg.analysisCount, int64(1)) | 361 atomic.AddInt64(&agg.analysisCount, int64(1)) |
348 err := agg.analysisHelper(executableDir, badFuzzPath) | 362 err := agg.analysisHelper(executableDir, badFuzzPath) |
349 if err != nil { | 363 if err != nil { |
| 364 atomic.AddInt64(&agg.analysisCount, int64(-1)) |
350 glog.Errorf("[%s] Analyzer %d terminated due to
error: %s", agg.Category, identifier, err) | 365 glog.Errorf("[%s] Analyzer %d terminated due to
error: %s", agg.Category, identifier, err) |
351 return | 366 return |
352 } | 367 } |
353 case <-agg.aggregationShutdown: | 368 case <-agg.aggregationShutdown: |
354 glog.Infof("[%s] Analyzer %d recieved shutdown signal",
agg.Category, identifier) | 369 glog.Infof("[%s] Analyzer %d recieved shutdown signal",
agg.Category, identifier) |
355 return | 370 return |
356 } | 371 } |
357 } | 372 } |
358 } | 373 } |
359 | 374 |
(...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
509 func (agg *Aggregator) waitForUploads(identifier int) { | 524 func (agg *Aggregator) waitForUploads(identifier int) { |
510 defer agg.aggregationWaitGroup.Done() | 525 defer agg.aggregationWaitGroup.Done() |
511 defer metrics2.NewCounter("upload-process-count", map[string]string{"cat
egory": agg.Category}).Dec(int64(1)) | 526 defer metrics2.NewCounter("upload-process-count", map[string]string{"cat
egory": agg.Category}).Dec(int64(1)) |
512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) | 527 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) |
513 for { | 528 for { |
514 select { | 529 select { |
515 case p := <-agg.forUpload: | 530 case p := <-agg.forUpload: |
516 atomic.AddInt64(&agg.uploadCount, int64(1)) | 531 atomic.AddInt64(&agg.uploadCount, int64(1)) |
517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { | 532 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { |
518 glog.Infof("[%s] Skipping upload of grey fuzz %s
", agg.Category, p.Data.Name) | 533 glog.Infof("[%s] Skipping upload of grey fuzz %s
", agg.Category, p.Data.Name) |
| 534 // We are skipping the bugReport, so increment t
he counts. |
| 535 atomic.AddInt64(&agg.bugReportCount, int64(1)) |
| 536 continue |
| 537 } |
| 538 if p.FuzzType != GREY_FUZZ && !agg.deduplicator.IsUnique
(data.ParseReport(p.Data)) { |
| 539 glog.Infof("[%s] Skipping upload of duplicate fu
zz %s", agg.Category, p.Data.Name) |
| 540 // We are skipping the bugReport, so increment t
he counts. |
| 541 atomic.AddInt64(&agg.bugReportCount, int64(1)) |
519 continue | 542 continue |
520 } | 543 } |
521 if err := agg.upload(p); err != nil { | 544 if err := agg.upload(p); err != nil { |
522 glog.Errorf("[%s] Uploader %d terminated due to
error: %s", agg.Category, identifier, err) | 545 glog.Errorf("[%s] Uploader %d terminated due to
error: %s", agg.Category, identifier, err) |
| 546 // We are skipping the bugReport, so increment t
he counts. |
| 547 atomic.AddInt64(&agg.bugReportCount, int64(1)) |
523 return | 548 return |
524 } | 549 } |
525 agg.forBugReporting <- bugReportingPackage{ | 550 agg.forBugReporting <- bugReportingPackage{ |
526 FuzzName: p.Data.Name, | 551 FuzzName: p.Data.Name, |
527 CommitHash: config.Generator.SkiaVersion.Hash, | 552 CommitHash: config.Generator.SkiaVersion.Hash, |
528 IsBadFuzz: p.FuzzType == BAD_FUZZ, | 553 IsBadFuzz: p.FuzzType == BAD_FUZZ, |
529 } | 554 } |
530 case <-agg.aggregationShutdown: | 555 case <-agg.aggregationShutdown: |
531 glog.Infof("[%s] Uploader %d recieved shutdown signal",
agg.Category, identifier) | 556 glog.Infof("[%s] Uploader %d recieved shutdown signal",
agg.Category, identifier) |
532 return | 557 return |
533 } | 558 } |
534 } | 559 } |
535 } | 560 } |
536 | 561 |
537 // upload breaks apart the uploadPackage into its constituant parts and uploads
them to GCS using | 562 // upload breaks apart the uploadPackage into its constituant parts and uploads
them to GCS using |
538 // some helper methods. | 563 // some helper methods. |
539 func (agg *Aggregator) upload(p uploadPackage) error { | 564 func (agg *Aggregator) upload(p uploadPackage) error { |
540 glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|%
d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.D
ata.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data.
Release.Dump), len(p.Data.Release.StdErr)) | 565 glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|%
d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.D
ata.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data.
Release.Dump), len(p.Data.Release.StdErr)) |
| 566 if p.FuzzType == GREY_FUZZ { |
| 567 agg.greyNames = append(agg.greyNames, p.Data.Name) |
| 568 } else { |
| 569 agg.badNames = append(agg.badNames, p.Data.Name) |
| 570 } |
541 | 571 |
542 if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != n
il { | 572 if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != n
il { |
543 return err | 573 return err |
544 } | 574 } |
545 if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Debug.As
an); err != nil { | 575 if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Debug.As
an); err != nil { |
546 return err | 576 return err |
547 } | 577 } |
548 if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Debug.Du
mp); err != nil { | 578 if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Debug.Du
mp); err != nil { |
549 return err | 579 return err |
550 } | 580 } |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
662 // aggregationShutdown channel. | 692 // aggregationShutdown channel. |
663 for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown);
i++ { | 693 for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown);
i++ { |
664 agg.aggregationShutdown <- true | 694 agg.aggregationShutdown <- true |
665 } | 695 } |
666 agg.aggregationWaitGroup.Wait() | 696 agg.aggregationWaitGroup.Wait() |
667 } | 697 } |
668 | 698 |
669 // RestartAnalysis restarts the shut down aggregator. Anything that is in the s
canning directory | 699 // RestartAnalysis restarts the shut down aggregator. Anything that is in the s
canning directory |
670 // should be cleared out, lest it be rescanned/analyzed. | 700 // should be cleared out, lest it be rescanned/analyzed. |
671 func (agg *Aggregator) RestartAnalysis() error { | 701 func (agg *Aggregator) RestartAnalysis() error { |
| 702 agg.deduplicator.Clear() |
672 return agg.start() | 703 return agg.start() |
673 } | 704 } |
674 | 705 |
675 // WaitForEmptyQueues will return once there is nothing more in the analysis-upl
oad-report | 706 // WaitForEmptyQueues will return once there is nothing more in the analysis-upl
oad-report |
676 // pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is
done. | 707 // pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is
done. |
677 func (agg *Aggregator) WaitForEmptyQueues() { | 708 func (agg *Aggregator) WaitForEmptyQueues() { |
678 a := len(agg.forAnalysis) | 709 a := len(agg.forAnalysis) |
679 u := len(agg.forUpload) | 710 u := len(agg.forUpload) |
680 b := len(agg.forBugReporting) | 711 b := len(agg.forBugReporting) |
681 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount &&
agg.uploadCount == agg.bugReportCount { | 712 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount &&
agg.uploadCount == agg.bugReportCount { |
682 glog.Infof("[%s] Queues were already empty", agg.Category) | 713 glog.Infof("[%s] Queues were already empty", agg.Category) |
683 return | 714 return |
684 } | 715 } |
685 t := time.Tick(config.Aggregator.StatusPeriod) | 716 t := time.Tick(config.Aggregator.StatusPeriod) |
686 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag
g.Category, config.Aggregator.StatusPeriod) | 717 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag
g.Category, config.Aggregator.StatusPeriod) |
| 718 hangCount := 0 |
687 for _ = range t { | 719 for _ = range t { |
688 a = len(agg.forAnalysis) | 720 a = len(agg.forAnalysis) |
689 u = len(agg.forUpload) | 721 u = len(agg.forUpload) |
690 b = len(agg.forBugReporting) | 722 b = len(agg.forBugReporting) |
691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin
gQueue: %d", agg.Category, a, u, b) | 723 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin
gQueue: %d", agg.Category, a, u, b) |
692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin
gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun
t) | 724 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin
gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun
t) |
693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload
Count && agg.uploadCount == agg.bugReportCount { | 725 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload
Count && agg.uploadCount == agg.bugReportCount { |
694 break | 726 break |
695 } | 727 } |
| 728 // This prevents waiting forever if an upload crashes, aborts or
otherwise hangs. |
| 729 hangCount++ |
| 730 if hangCount >= HANG_THRESHOLD { |
| 731 glog.Warningf("Was waiting for %d rounds and still wasn'
t done. Quitting anyway.", hangCount) |
| 732 } |
| 733 |
696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em
pty", agg.Category, config.Aggregator.StatusPeriod) | 734 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em
pty", agg.Category, config.Aggregator.StatusPeriod) |
697 | |
698 } | 735 } |
699 } | 736 } |
700 | 737 |
701 // ForceAnalysis directly adds the given path to the analysis queue, where it wi
ll be analyzed, | 738 // ForceAnalysis directly adds the given path to the analysis queue, where it wi
ll be analyzed, |
702 // uploaded and possibly bug reported. | 739 // uploaded and possibly bug reported. |
703 func (agg *Aggregator) ForceAnalysis(path string) { | 740 func (agg *Aggregator) ForceAnalysis(path string) { |
704 agg.forAnalysis <- path | 741 agg.forAnalysis <- path |
705 } | 742 } |
| 743 |
| 744 func (agg *Aggregator) ClearUploadedFuzzNames() { |
| 745 agg.greyNames = []string{} |
| 746 agg.badNames = []string{} |
| 747 } |
| 748 |
| 749 func (agg *Aggregator) UploadedFuzzNames() (bad, grey []string) { |
| 750 return agg.badNames, agg.greyNames |
| 751 } |
OLD | NEW |