| OLD | NEW |
| 1 package aggregator | 1 package aggregator |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "bytes" | 4 "bytes" |
| 5 "crypto/sha1" | 5 "crypto/sha1" |
| 6 "fmt" | 6 "fmt" |
| 7 "io" | 7 "io" |
| 8 "io/ioutil" | 8 "io/ioutil" |
| 9 "os" | 9 "os" |
| 10 "path/filepath" | 10 "path/filepath" |
| 11 "sort" | 11 "sort" |
| 12 "strings" | 12 "strings" |
| 13 "sync" | 13 "sync" |
| 14 "sync/atomic" | 14 "sync/atomic" |
| 15 "time" | 15 "time" |
| 16 | 16 |
| 17 "github.com/skia-dev/glog" | 17 "github.com/skia-dev/glog" |
| 18 "go.skia.org/infra/fuzzer/go/common" | 18 "go.skia.org/infra/fuzzer/go/common" |
| 19 "go.skia.org/infra/fuzzer/go/config" | 19 "go.skia.org/infra/fuzzer/go/config" |
| 20 » "go.skia.org/infra/fuzzer/go/frontend/data" | 20 » "go.skia.org/infra/fuzzer/go/data" |
| 21 » "go.skia.org/infra/fuzzer/go/deduplicator" |
| 21 "go.skia.org/infra/go/exec" | 22 "go.skia.org/infra/go/exec" |
| 22 "go.skia.org/infra/go/fileutil" | 23 "go.skia.org/infra/go/fileutil" |
| 23 "go.skia.org/infra/go/metrics2" | 24 "go.skia.org/infra/go/metrics2" |
| 24 "go.skia.org/infra/go/util" | 25 "go.skia.org/infra/go/util" |
| 25 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 26 "google.golang.org/cloud/storage" | 27 "google.golang.org/cloud/storage" |
| 27 ) | 28 ) |
| 28 | 29 |
| 29 // Aggregator is a key part of the fuzzing operation | 30 // Aggregator is a key part of the fuzzing operation |
| 30 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). | 31 // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). |
| (...skipping 18 matching lines...) Expand all Loading... |
| 49 storageClient *storage.Client | 50 storageClient *storage.Client |
| 50 fuzzPath string | 51 fuzzPath string |
| 51 executablePath string | 52 executablePath string |
| 52 // For passing the paths of new binaries that should be scanned. | 53 // For passing the paths of new binaries that should be scanned. |
| 53 forAnalysis chan string | 54 forAnalysis chan string |
| 54 // For passing the file names of analyzed fuzzes that should be uploaded
from where they rest on | 55 // For passing the file names of analyzed fuzzes that should be uploaded
from where they rest on |
| 55 // disk in `fuzzPath` | 56 // disk in `fuzzPath` |
| 56 forUpload chan uploadPackage | 57 forUpload chan uploadPackage |
| 57 | 58 |
| 58 forBugReporting chan bugReportingPackage | 59 forBugReporting chan bugReportingPackage |
| 60 |
| 61 deduplicator *deduplicator.Deduplicator |
| 62 |
| 59 // The shutdown channels are used to signal shutdowns. There are two gr
oups, to | 63 // The shutdown channels are used to signal shutdowns. There are two gr
oups, to |
| 60 // allow for a softer, cleaner shutdown w/ minimal lost work. | 64 // allow for a softer, cleaner shutdown w/ minimal lost work. |
| 61 // Group A (monitoring) includes the scanning and the monitoring routine
. | 65 // Group A (monitoring) includes the scanning and the monitoring routine
. |
| 62 // Group B (aggregation) include the analysis and upload routines and th
e bug reporting routine. | 66 // Group B (aggregation) include the analysis and upload routines and th
e bug reporting routine. |
| 63 monitoringShutdown chan bool | 67 monitoringShutdown chan bool |
| 64 monitoringWaitGroup *sync.WaitGroup | 68 monitoringWaitGroup *sync.WaitGroup |
| 65 aggregationShutdown chan bool | 69 aggregationShutdown chan bool |
| 66 aggregationWaitGroup *sync.WaitGroup | 70 aggregationWaitGroup *sync.WaitGroup |
| 67 // These three counts are used to determine if there is any pending work
. | 71 // These three counts are used to determine if there is any pending work
. |
| 68 // There is no pending work if all three of these values are equal and t
he | 72 // There is no pending work if all three of these values are equal and t
he |
| 69 // work queues are empty. | 73 // work queues are empty. |
| 70 analysisCount int64 | 74 analysisCount int64 |
| 71 uploadCount int64 | 75 uploadCount int64 |
| 72 bugReportCount int64 | 76 bugReportCount int64 |
| 77 |
| 78 greyNames []string |
| 79 badNames []string |
| 73 } | 80 } |
| 74 | 81 |
| 75 const ( | 82 const ( |
| 76 » BAD_FUZZ = "bad" | 83 » BAD_FUZZ = "bad" |
| 77 » GREY_FUZZ = "grey" | 84 » GREY_FUZZ = "grey" |
| 85 » HANG_THRESHOLD = 100 |
| 78 ) | 86 ) |
| 79 | 87 |
| 80 var ( | 88 var ( |
| 81 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" | 89 CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" |
| 82 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" | 90 CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" |
| 83 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" | 91 ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" |
| 84 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" | 92 ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" |
| 85 ) | 93 ) |
| 86 | 94 |
| 87 // uploadPackage is a struct containing all the pieces of a fuzz that need to be
uploaded to GCS | 95 // uploadPackage is a struct containing all the pieces of a fuzz that need to be
uploaded to GCS |
| 88 type uploadPackage struct { | 96 type uploadPackage struct { |
| 89 Data data.GCSPackage | 97 Data data.GCSPackage |
| 90 FilePath string | 98 FilePath string |
| 91 // Must be BAD_FUZZ or GREY_FUZZ | 99 // Must be BAD_FUZZ or GREY_FUZZ |
| 92 FuzzType string | 100 FuzzType string |
| 93 } | 101 } |
| 94 | 102 |
| 95 // bugReportingPackage is a struct containing the pieces of a fuzz that may need
to have | 103 // bugReportingPackage is a struct containing the pieces of a fuzz that may need
to have |
| 96 // a bug filed or updated. | 104 // a bug filed or updated. |
| 97 type bugReportingPackage struct { | 105 type bugReportingPackage struct { |
| 98 FuzzName string | 106 FuzzName string |
| 99 CommitHash string | 107 CommitHash string |
| 100 IsBadFuzz bool | 108 IsBadFuzz bool |
| 101 } | 109 } |
| 102 | 110 |
| 103 // StartAggregator creates and starts a Aggregator. | 111 // StartAggregator creates and starts a Aggregator. |
| 104 // If there is a problem starting up, an error is returned. Other errors will b
e logged. | 112 // If there is a problem starting up, an error is returned. Other errors will b
e logged. |
| 105 func StartAggregator(s *storage.Client, category string) (*Aggregator, error) { | 113 func StartAggregator(s *storage.Client, category string, startingReports <-chan
data.FuzzReport) (*Aggregator, error) { |
| 106 b := Aggregator{ | 114 b := Aggregator{ |
| 107 Category: category, | 115 Category: category, |
| 108 storageClient: s, | 116 storageClient: s, |
| 109 fuzzPath: filepath.Join(config.Aggregator.FuzzPath, ca
tegory), | 117 fuzzPath: filepath.Join(config.Aggregator.FuzzPath, ca
tegory), |
| 110 executablePath: filepath.Join(config.Aggregator.ExecutablePa
th, category), | 118 executablePath: filepath.Join(config.Aggregator.ExecutablePa
th, category), |
| 111 forAnalysis: make(chan string, 10000), | 119 forAnalysis: make(chan string, 10000), |
| 112 forUpload: make(chan uploadPackage, 100), | 120 forUpload: make(chan uploadPackage, 100), |
| 113 forBugReporting: make(chan bugReportingPackage, 100), | 121 forBugReporting: make(chan bugReportingPackage, 100), |
| 114 MakeBugOnBadFuzz: false, | 122 MakeBugOnBadFuzz: false, |
| 115 UploadGreyFuzzes: false, | 123 UploadGreyFuzzes: false, |
| 124 deduplicator: deduplicator.New(), |
| 116 monitoringShutdown: make(chan bool, 2), | 125 monitoringShutdown: make(chan bool, 2), |
| 117 // aggregationShutdown needs to be created with a calculated cap
acity in start | 126 // aggregationShutdown needs to be created with a calculated cap
acity in start |
| 118 } | 127 } |
| 119 | 128 |
| 129 // preload the duplicator |
| 130 for report := range startingReports { |
| 131 b.deduplicator.IsUnique(report) |
| 132 } |
| 133 |
| 120 return &b, b.start() | 134 return &b, b.start() |
| 121 } | 135 } |
| 122 | 136 |
| 123 // start starts up the Aggregator. It refreshes all status it needs and builds
a debug and a | 137 // start starts up the Aggregator. It refreshes all status it needs and builds
a debug and a |
| 124 // release version of Skia for use in analysis. It then spawns the aggregation
pipeline and a | 138 // release version of Skia for use in analysis. It then spawns the aggregation
pipeline and a |
| 125 // monitoring thread. | 139 // monitoring thread. |
| 126 func (agg *Aggregator) start() error { | 140 func (agg *Aggregator) start() error { |
| 127 // Set the wait groups to fresh | 141 // Set the wait groups to fresh |
| 128 agg.monitoringWaitGroup = &sync.WaitGroup{} | 142 agg.monitoringWaitGroup = &sync.WaitGroup{} |
| 129 agg.aggregationWaitGroup = &sync.WaitGroup{} | 143 agg.aggregationWaitGroup = &sync.WaitGroup{} |
| (...skipping 210 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 340 if err := agg.setupAnalysis(executableDir); err != nil { | 354 if err := agg.setupAnalysis(executableDir); err != nil { |
| 341 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.
Category, identifier, err) | 355 glog.Errorf("[%s] Analyzer %d terminated due to error: %s", agg.
Category, identifier, err) |
| 342 return | 356 return |
| 343 } | 357 } |
| 344 for { | 358 for { |
| 345 select { | 359 select { |
| 346 case badFuzzPath := <-agg.forAnalysis: | 360 case badFuzzPath := <-agg.forAnalysis: |
| 347 atomic.AddInt64(&agg.analysisCount, int64(1)) | 361 atomic.AddInt64(&agg.analysisCount, int64(1)) |
| 348 err := agg.analysisHelper(executableDir, badFuzzPath) | 362 err := agg.analysisHelper(executableDir, badFuzzPath) |
| 349 if err != nil { | 363 if err != nil { |
| 364 atomic.AddInt64(&agg.analysisCount, int64(-1)) |
| 350 glog.Errorf("[%s] Analyzer %d terminated due to
error: %s", agg.Category, identifier, err) | 365 glog.Errorf("[%s] Analyzer %d terminated due to
error: %s", agg.Category, identifier, err) |
| 351 return | 366 return |
| 352 } | 367 } |
| 353 case <-agg.aggregationShutdown: | 368 case <-agg.aggregationShutdown: |
| 354 glog.Infof("[%s] Analyzer %d recieved shutdown signal",
agg.Category, identifier) | 369 glog.Infof("[%s] Analyzer %d recieved shutdown signal",
agg.Category, identifier) |
| 355 return | 370 return |
| 356 } | 371 } |
| 357 } | 372 } |
| 358 } | 373 } |
| 359 | 374 |
| (...skipping 149 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 509 func (agg *Aggregator) waitForUploads(identifier int) { | 524 func (agg *Aggregator) waitForUploads(identifier int) { |
| 510 defer agg.aggregationWaitGroup.Done() | 525 defer agg.aggregationWaitGroup.Done() |
| 511 defer metrics2.NewCounter("upload-process-count", map[string]string{"cat
egory": agg.Category}).Dec(int64(1)) | 526 defer metrics2.NewCounter("upload-process-count", map[string]string{"cat
egory": agg.Category}).Dec(int64(1)) |
| 512 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) | 527 glog.Infof("[%s] Spawning uploader %d", agg.Category, identifier) |
| 513 for { | 528 for { |
| 514 select { | 529 select { |
| 515 case p := <-agg.forUpload: | 530 case p := <-agg.forUpload: |
| 516 atomic.AddInt64(&agg.uploadCount, int64(1)) | 531 atomic.AddInt64(&agg.uploadCount, int64(1)) |
| 517 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { | 532 if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { |
| 518 glog.Infof("[%s] Skipping upload of grey fuzz %s
", agg.Category, p.Data.Name) | 533 glog.Infof("[%s] Skipping upload of grey fuzz %s
", agg.Category, p.Data.Name) |
| 534 // We are skipping the bugReport, so increment t
he counts. |
| 535 atomic.AddInt64(&agg.bugReportCount, int64(1)) |
| 536 continue |
| 537 } |
| 538 if p.FuzzType != GREY_FUZZ && !agg.deduplicator.IsUnique
(data.ParseReport(p.Data)) { |
| 539 glog.Infof("[%s] Skipping upload of duplicate fu
zz %s", agg.Category, p.Data.Name) |
| 540 // We are skipping the bugReport, so increment t
he counts. |
| 541 atomic.AddInt64(&agg.bugReportCount, int64(1)) |
| 519 continue | 542 continue |
| 520 } | 543 } |
| 521 if err := agg.upload(p); err != nil { | 544 if err := agg.upload(p); err != nil { |
| 522 glog.Errorf("[%s] Uploader %d terminated due to
error: %s", agg.Category, identifier, err) | 545 glog.Errorf("[%s] Uploader %d terminated due to
error: %s", agg.Category, identifier, err) |
| 546 // We are skipping the bugReport, so increment t
he counts. |
| 547 atomic.AddInt64(&agg.bugReportCount, int64(1)) |
| 523 return | 548 return |
| 524 } | 549 } |
| 525 agg.forBugReporting <- bugReportingPackage{ | 550 agg.forBugReporting <- bugReportingPackage{ |
| 526 FuzzName: p.Data.Name, | 551 FuzzName: p.Data.Name, |
| 527 CommitHash: config.Generator.SkiaVersion.Hash, | 552 CommitHash: config.Generator.SkiaVersion.Hash, |
| 528 IsBadFuzz: p.FuzzType == BAD_FUZZ, | 553 IsBadFuzz: p.FuzzType == BAD_FUZZ, |
| 529 } | 554 } |
| 530 case <-agg.aggregationShutdown: | 555 case <-agg.aggregationShutdown: |
| 531 glog.Infof("[%s] Uploader %d recieved shutdown signal",
agg.Category, identifier) | 556 glog.Infof("[%s] Uploader %d recieved shutdown signal",
agg.Category, identifier) |
| 532 return | 557 return |
| 533 } | 558 } |
| 534 } | 559 } |
| 535 } | 560 } |
| 536 | 561 |
| 537 // upload breaks apart the uploadPackage into its constituant parts and uploads
them to GCS using | 562 // upload breaks apart the uploadPackage into its constituant parts and uploads
them to GCS using |
| 538 // some helper methods. | 563 // some helper methods. |
| 539 func (agg *Aggregator) upload(p uploadPackage) error { | 564 func (agg *Aggregator) upload(p uploadPackage) error { |
| 540 glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|%
d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.D
ata.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data.
Release.Dump), len(p.Data.Release.StdErr)) | 565 glog.Infof("[%s] uploading %s with file %s and analysis bytes %d;%d;%d|%
d;%d;%d", agg.Category, p.Data.Name, p.FilePath, len(p.Data.Debug.Asan), len(p.D
ata.Debug.Dump), len(p.Data.Debug.StdErr), len(p.Data.Release.Asan), len(p.Data.
Release.Dump), len(p.Data.Release.StdErr)) |
| 566 if p.FuzzType == GREY_FUZZ { |
| 567 agg.greyNames = append(agg.greyNames, p.Data.Name) |
| 568 } else { |
| 569 agg.badNames = append(agg.badNames, p.Data.Name) |
| 570 } |
| 541 | 571 |
| 542 if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != n
il { | 572 if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != n
il { |
| 543 return err | 573 return err |
| 544 } | 574 } |
| 545 if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Debug.As
an); err != nil { | 575 if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Debug.As
an); err != nil { |
| 546 return err | 576 return err |
| 547 } | 577 } |
| 548 if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Debug.Du
mp); err != nil { | 578 if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Debug.Du
mp); err != nil { |
| 549 return err | 579 return err |
| 550 } | 580 } |
| (...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 662 // aggregationShutdown channel. | 692 // aggregationShutdown channel. |
| 663 for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown);
i++ { | 693 for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown);
i++ { |
| 664 agg.aggregationShutdown <- true | 694 agg.aggregationShutdown <- true |
| 665 } | 695 } |
| 666 agg.aggregationWaitGroup.Wait() | 696 agg.aggregationWaitGroup.Wait() |
| 667 } | 697 } |
| 668 | 698 |
| 669 // RestartAnalysis restarts the shut down aggregator. Anything that is in the s
canning directory | 699 // RestartAnalysis restarts the shut down aggregator. Anything that is in the s
canning directory |
| 670 // should be cleared out, lest it be rescanned/analyzed. | 700 // should be cleared out, lest it be rescanned/analyzed. |
| 671 func (agg *Aggregator) RestartAnalysis() error { | 701 func (agg *Aggregator) RestartAnalysis() error { |
| 702 agg.deduplicator.Clear() |
| 672 return agg.start() | 703 return agg.start() |
| 673 } | 704 } |
| 674 | 705 |
| 675 // WaitForEmptyQueues will return once there is nothing more in the analysis-upl
oad-report | 706 // WaitForEmptyQueues will return once there is nothing more in the analysis-upl
oad-report |
| 676 // pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is
done. | 707 // pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is
done. |
| 677 func (agg *Aggregator) WaitForEmptyQueues() { | 708 func (agg *Aggregator) WaitForEmptyQueues() { |
| 678 a := len(agg.forAnalysis) | 709 a := len(agg.forAnalysis) |
| 679 u := len(agg.forUpload) | 710 u := len(agg.forUpload) |
| 680 b := len(agg.forBugReporting) | 711 b := len(agg.forBugReporting) |
| 681 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount &&
agg.uploadCount == agg.bugReportCount { | 712 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.uploadCount &&
agg.uploadCount == agg.bugReportCount { |
| 682 glog.Infof("[%s] Queues were already empty", agg.Category) | 713 glog.Infof("[%s] Queues were already empty", agg.Category) |
| 683 return | 714 return |
| 684 } | 715 } |
| 685 t := time.Tick(config.Aggregator.StatusPeriod) | 716 t := time.Tick(config.Aggregator.StatusPeriod) |
| 686 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag
g.Category, config.Aggregator.StatusPeriod) | 717 glog.Infof("[%s] Waiting %s for the aggregator's queues to be empty", ag
g.Category, config.Aggregator.StatusPeriod) |
| 718 hangCount := 0 |
| 687 for _ = range t { | 719 for _ = range t { |
| 688 a = len(agg.forAnalysis) | 720 a = len(agg.forAnalysis) |
| 689 u = len(agg.forUpload) | 721 u = len(agg.forUpload) |
| 690 b = len(agg.forBugReporting) | 722 b = len(agg.forBugReporting) |
| 691 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin
gQueue: %d", agg.Category, a, u, b) | 723 glog.Infof("[%s] AnalysisQueue: %d, UploadQueue: %d, BugReportin
gQueue: %d", agg.Category, a, u, b) |
| 692 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin
gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun
t) | 724 glog.Infof("[%s] AnalysisTotal: %d, UploadTotal: %d, BugReportin
gTotal: %d", agg.Category, agg.analysisCount, agg.uploadCount, agg.bugReportCoun
t) |
| 693 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload
Count && agg.uploadCount == agg.bugReportCount { | 725 if a == 0 && u == 0 && b == 0 && agg.analysisCount == agg.upload
Count && agg.uploadCount == agg.bugReportCount { |
| 694 break | 726 break |
| 695 } | 727 } |
| 728 // This prevents waiting forever if an upload crashes, aborts or
otherwise hangs. |
| 729 hangCount++ |
| 730 if hangCount >= HANG_THRESHOLD { |
| 731 glog.Warningf("Was waiting for %d rounds and still wasn'
t done. Quitting anyway.", hangCount) |
| 732 } |
| 733 |
| 696 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em
pty", agg.Category, config.Aggregator.StatusPeriod) | 734 glog.Infof("[%s] Waiting %s for the aggregator's queues to be em
pty", agg.Category, config.Aggregator.StatusPeriod) |
| 697 | |
| 698 } | 735 } |
| 699 } | 736 } |
| 700 | 737 |
| 701 // ForceAnalysis directly adds the given path to the analysis queue, where it wi
ll be analyzed, | 738 // ForceAnalysis directly adds the given path to the analysis queue, where it wi
ll be analyzed, |
| 702 // uploaded and possibly bug reported. | 739 // uploaded and possibly bug reported. |
| 703 func (agg *Aggregator) ForceAnalysis(path string) { | 740 func (agg *Aggregator) ForceAnalysis(path string) { |
| 704 agg.forAnalysis <- path | 741 agg.forAnalysis <- path |
| 705 } | 742 } |
| 743 |
| 744 func (agg *Aggregator) ClearUploadedFuzzNames() { |
| 745 agg.greyNames = []string{} |
| 746 agg.badNames = []string{} |
| 747 } |
| 748 |
| 749 func (agg *Aggregator) UploadedFuzzNames() (bad, grey []string) { |
| 750 return agg.badNames, agg.greyNames |
| 751 } |
| OLD | NEW |