Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(572)

Side by Side Diff: fuzzer/go/frontend/gsloader/gsloader.go

Issue 1691893002: Fuzzer now deduplicates on the analysis side instead of the download side (Closed) Base URL: https://skia.googlesource.com/buildbot@metrics
Patch Set: Created 4 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 package gsloader 1 package gsloader
2 2
3 import ( 3 import (
4 "fmt" 4 "fmt"
5 "sort" 5 "sort"
6 "sync"
7 "sync/atomic"
8 6
9 "github.com/skia-dev/glog" 7 "github.com/skia-dev/glog"
10 "go.skia.org/infra/fuzzer/go/common" 8 "go.skia.org/infra/fuzzer/go/common"
11 "go.skia.org/infra/fuzzer/go/config" 9 "go.skia.org/infra/fuzzer/go/config"
12 » "go.skia.org/infra/fuzzer/go/deduplicator" 10 » "go.skia.org/infra/fuzzer/go/data"
13 » "go.skia.org/infra/fuzzer/go/frontend/data"
14 "go.skia.org/infra/fuzzer/go/fuzzcache" 11 "go.skia.org/infra/fuzzer/go/fuzzcache"
15 » "go.skia.org/infra/go/gs" 12 » fstorage "go.skia.org/infra/fuzzer/go/storage"
16 "google.golang.org/cloud/storage" 13 "google.golang.org/cloud/storage"
17 ) 14 )
18 15
19 // LoadFromBoltDB loads the data.FuzzReport from FuzzReportCache associated with the given hash. 16 // LoadFromBoltDB loads the data.FuzzReport from FuzzReportCache associated with the given hash.
20 // The FuzzReport is first put into the staging fuzz cache, and then into the cu rrent. 17 // The FuzzReport is first put into the staging fuzz cache, and then into the cu rrent.
21 // If a cache for the commit does not exist, or there are other problems with th e retrieval, 18 // If a cache for the commit does not exist, or there are other problems with th e retrieval,
22 // an error is returned. We do not need to deduplicate on extraction because 19 // an error is returned. We do not need to deduplicate on extraction because
23 // the fuzzes were deduplicated on storage. 20 // the fuzzes were deduplicated on storage.
24 func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error { 21 func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error {
25 glog.Infof("Looking into cache for revision %s", config.FrontEnd.SkiaVer sion.Hash) 22 glog.Infof("Looking into cache for revision %s", config.FrontEnd.SkiaVer sion.Hash)
26 for _, category := range common.FUZZ_CATEGORIES { 23 for _, category := range common.FUZZ_CATEGORIES {
27 if staging, err := cache.LoadTree(category, config.FrontEnd.Skia Version.Hash); err != nil { 24 if staging, err := cache.LoadTree(category, config.FrontEnd.Skia Version.Hash); err != nil {
28 return fmt.Errorf("Problem decoding existing from bolt d b: %s", err) 25 return fmt.Errorf("Problem decoding existing from bolt d b: %s", err)
29 } else { 26 } else {
30 data.SetStaging(category, *staging) 27 data.SetStaging(category, *staging)
31 » » » glog.Infof("Successfully loaded %s fuzzes from bolt db c ache", category) 28 » » » glog.Infof("Successfully loaded %s fuzzes from bolt db c ache with %d files", category, len(*staging))
32 } 29 }
33 } 30 }
34 data.StagingToCurrent() 31 data.StagingToCurrent()
35 return nil 32 return nil
36 } 33 }
37 34
38 // GSLoader is a struct that handles downloading fuzzes from Google Storage. 35 // GSLoader is a struct that handles downloading fuzzes from Google Storage.
39 type GSLoader struct { 36 type GSLoader struct {
40 storageClient *storage.Client 37 storageClient *storage.Client
41 Cache *fuzzcache.FuzzReportCache 38 Cache *fuzzcache.FuzzReportCache
42 deduplicator *deduplicator.Deduplicator
43
44 // completedCounter is the number of fuzzes that have been downloaded fr om GCS, used for logging.
45 completedCounter int32
46 } 39 }
47 40
48 // New creates a GSLoader and returns it. 41 // New creates a GSLoader and returns it.
49 func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader { 42 func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader {
50 return &GSLoader{ 43 return &GSLoader{
51 storageClient: s, 44 storageClient: s,
52 Cache: c, 45 Cache: c,
53 deduplicator: deduplicator.New(),
54 } 46 }
55 } 47 }
56 48
57 // LoadFreshFromGoogleStorage pulls all fuzzes out of GCS and loads them into me mory. 49 // LoadFreshFromGoogleStorage pulls all fuzzes out of GCS and loads them into me mory.
58 // The "fresh" in the name refers to the fact that all other loaded fuzzes (if a ny) 50 // The "fresh" in the name refers to the fact that all other loaded fuzzes (if a ny)
59 // are written over, including in the cache. 51 // are written over, including in the cache.
60 // Upon completion, the full results are cached to a boltDB instance and moved f rom staging 52 // Upon completion, the full results are cached to a boltDB instance and moved f rom staging
61 // to the current copy. 53 // to the current copy.
62 func (g *GSLoader) LoadFreshFromGoogleStorage() error { 54 func (g *GSLoader) LoadFreshFromGoogleStorage() error {
63 revision := config.FrontEnd.SkiaVersion.Hash 55 revision := config.FrontEnd.SkiaVersion.Hash
64 data.ClearStaging() 56 data.ClearStaging()
65 g.deduplicator.Clear()
66 fuzzNames := make([]string, 0, 100) 57 fuzzNames := make([]string, 0, 100)
58
67 for _, cat := range common.FUZZ_CATEGORIES { 59 for _, cat := range common.FUZZ_CATEGORIES {
68 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) 60 badPath := fmt.Sprintf("%s/%s/bad", cat, revision)
69 » » reports, err := g.getBinaryReportsFromGS(badPath, cat, nil) 61 » » reports, err := fstorage.GetReportsFromGS(g.storageClient, badPa th, cat, nil, config.FrontEnd.NumDownloadProcesses)
70 if err != nil { 62 if err != nil {
71 return err 63 return err
72 } 64 }
73 b := 0 65 b := 0
74 d := 0
75 for report := range reports { 66 for report := range reports {
76 // We always add the fuzzName, to avoid redownloading du plicates over and over again.
77 fuzzNames = append(fuzzNames, report.FuzzName) 67 fuzzNames = append(fuzzNames, report.FuzzName)
78 » » » if g.deduplicator.IsUnique(report) { 68 » » » data.NewFuzzFound(cat, report)
79 » » » » data.NewFuzzFound(cat, report) 69 » » » b++
80 » » » » b++
81 » » » } else {
82 » » » » d++
83 » » » }
84
85 } 70 }
86 » » glog.Infof("%d bad fuzzes (%d duplicate) freshly loaded from gs: //%s/%s", b, d, config.GS.Bucket, badPath) 71 » » glog.Infof("%d bad fuzzes freshly loaded from gs://%s/%s", b, co nfig.GS.Bucket, badPath)
87 » » data.StagingToCurrent()
88 } 72 }
73 // We must wait until after all the fuzzes are in staging, otherwise, we 'll only have a partial update
74 data.StagingToCurrent()
89 75
90 for _, category := range common.FUZZ_CATEGORIES { 76 for _, category := range common.FUZZ_CATEGORIES {
91 if err := g.Cache.StoreTree(data.StagingCopy(category), category , revision); err != nil { 77 if err := g.Cache.StoreTree(data.StagingCopy(category), category , revision); err != nil {
92 glog.Errorf("Problem storing category %s to boltDB: %s", category, err) 78 glog.Errorf("Problem storing category %s to boltDB: %s", category, err)
93 } 79 }
94 } 80 }
95 return g.Cache.StoreFuzzNames(fuzzNames, revision) 81 return g.Cache.StoreFuzzNames(fuzzNames, revision)
96 } 82 }
97 83
98 // LoadBinaryFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given whitelist 84 // LoadFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given whitelist
99 // and loads them into memory (as staging). After loading them, it updates the cache 85 // and loads them into memory (as staging). After loading them, it updates the cache
100 // and moves them from staging to the current copy. 86 // and moves them from staging to the current copy.
101 func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error { 87 func (g *GSLoader) LoadFuzzesFromGoogleStorage(whitelist []string) error {
102 revision := config.FrontEnd.SkiaVersion.Hash 88 revision := config.FrontEnd.SkiaVersion.Hash
103 data.StagingFromCurrent() 89 data.StagingFromCurrent()
104 sort.Strings(whitelist) 90 sort.Strings(whitelist)
105 91
106 fuzzNames := make([]string, 0, 100) 92 fuzzNames := make([]string, 0, 100)
107 for _, cat := range common.FUZZ_CATEGORIES { 93 for _, cat := range common.FUZZ_CATEGORIES {
108 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) 94 badPath := fmt.Sprintf("%s/%s/bad", cat, revision)
109 » » reports, err := g.getBinaryReportsFromGS(badPath, cat, whitelist ) 95 » » reports, err := fstorage.GetReportsFromGS(g.storageClient, badPa th, cat, whitelist, config.FrontEnd.NumDownloadProcesses)
110 if err != nil { 96 if err != nil {
111 return err 97 return err
112 } 98 }
113 b := 0 99 b := 0
114 d := 0
115 for report := range reports { 100 for report := range reports {
116 // We always add the fuzzName, to avoid redownloading du plicates over and over again.
117 fuzzNames = append(fuzzNames, report.FuzzName) 101 fuzzNames = append(fuzzNames, report.FuzzName)
118 » » » if g.deduplicator.IsUnique(report) { 102 » » » data.NewFuzzFound(cat, report)
119 » » » » data.NewFuzzFound(cat, report) 103 » » » b++
120 » » » » b++
121 » » » } else {
122 » » » » d++
123 » » » }
124 } 104 }
125 » » glog.Infof("%d bad fuzzes (%d duplicate) incrementally loaded fr om gs://%s/%s", b, d, config.GS.Bucket, badPath) 105 » » glog.Infof("%d bad fuzzes incrementally loaded from gs://%s/%s", b, config.GS.Bucket, badPath)
126 } 106 }
107 // We must wait until after all the fuzzes are in staging, otherwise, we 'll only have a partial update
127 data.StagingToCurrent() 108 data.StagingToCurrent()
128 109
129 oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision) 110 oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision)
130 if err != nil { 111 if err != nil {
131 glog.Warningf("Could not read old binary fuzz names from cache. Continuing...", err) 112 glog.Warningf("Could not read old binary fuzz names from cache. Continuing...", err)
132 oldBinaryFuzzNames = []string{} 113 oldBinaryFuzzNames = []string{}
133 } 114 }
134 for _, category := range common.FUZZ_CATEGORIES { 115 for _, category := range common.FUZZ_CATEGORIES {
135 if err := g.Cache.StoreTree(data.StagingCopy(category), category , revision); err != nil { 116 if err := g.Cache.StoreTree(data.StagingCopy(category), category , revision); err != nil {
136 glog.Errorf("Problem storing category %s to boltDB: %s", category, err) 117 glog.Errorf("Problem storing category %s to boltDB: %s", category, err)
137 } 118 }
138 } 119 }
139 return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...), revision) 120 return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...), revision)
140 } 121 }
141
142 // A fuzzPackage contains all the information about a fuzz, mostly the paths to the files that
143 // need to be downloaded.
144 type fuzzPackage struct {
145 FuzzName string
146 FuzzCategory string
147 DebugASANName string
148 DebugDumpName string
149 DebugErrName string
150 ReleaseASANName string
151 ReleaseDumpName string
152 ReleaseErrName string
153 }
154
155 // getBinaryReportsFromGS pulls all files in baseFolder from the skia-fuzzer buc ket and
156 // groups them by fuzz. It parses these groups of files into a BinaryFuzzReport and returns
157 // a channel through whcih all reports generated in this way will be streamed.
158 // The channel will be closed when all reports are done being sent.
159 func (g *GSLoader) getBinaryReportsFromGS(baseFolder, category string, whitelist []string) (<-chan data.FuzzReport, error) {
160 reports := make(chan data.FuzzReport, 10000)
161
162 fuzzPackages, err := g.fetchFuzzPackages(baseFolder, category)
163 if err != nil {
164 close(reports)
165 return reports, err
166 }
167
168 toDownload := make(chan fuzzPackage, len(fuzzPackages))
169 g.completedCounter = 0
170
171 var wg sync.WaitGroup
172 for i := 0; i < config.FrontEnd.NumDownloadProcesses; i++ {
173 wg.Add(1)
174 go g.download(toDownload, reports, &wg)
175 }
176
177 for _, d := range fuzzPackages {
178 if whitelist != nil {
179 name := d.FuzzName
180 if i := sort.SearchStrings(whitelist, name); i < len(whi telist) && whitelist[i] == name {
181 // is on the whitelist
182 toDownload <- d
183 }
184 } else {
185 // no white list
186 toDownload <- d
187 }
188 }
189 close(toDownload)
190
191 go func() {
192 wg.Wait()
193 close(reports)
194 }()
195
196 return reports, nil
197 }
198
199 // fetchFuzzPackages scans for all fuzzes in the given folder and returns a
200 // slice of all of the metadata for each fuzz, as a fuzz package. It returns
201 // error if it cannot access Google Storage.
202 func (g *GSLoader) fetchFuzzPackages(baseFolder, category string) (fuzzPackages []fuzzPackage, err error) {
203
204 fuzzNames, err := common.GetAllFuzzNamesInFolder(g.storageClient, baseFo lder)
205 if err != nil {
206 return nil, fmt.Errorf("Problem getting fuzz packages from %s: % s", baseFolder, err)
207 }
208 for _, fuzzName := range fuzzNames {
209 prefix := fmt.Sprintf("%s/%s/%s", baseFolder, fuzzName, fuzzName )
210 fuzzPackages = append(fuzzPackages, fuzzPackage{
211 FuzzName: fuzzName,
212 FuzzCategory: category,
213 DebugASANName: fmt.Sprintf("%s_debug.asan", prefix),
214 DebugDumpName: fmt.Sprintf("%s_debug.dump", prefix),
215 DebugErrName: fmt.Sprintf("%s_debug.err", prefix),
216 ReleaseASANName: fmt.Sprintf("%s_release.asan", prefix),
217 ReleaseDumpName: fmt.Sprintf("%s_release.dump", prefix),
218 ReleaseErrName: fmt.Sprintf("%s_release.err", prefix),
219 })
220 }
221 return fuzzPackages, nil
222 }
223
224 // emptyStringOnError returns a string of the passed in bytes or empty string if err is nil.
225 func emptyStringOnError(b []byte, err error) string {
226 if err != nil {
227 glog.Warningf("Ignoring error when fetching file contents: %v", err)
228 return ""
229 }
230 return string(b)
231 }
232
233 // download waits for fuzzPackages to appear on the toDownload channel and then downloads
234 // the four pieces of the package. It then parses them into a BinaryFuzzReport and sends
235 // the binary to the passed in channel. When there is no more work to be done, this function.
236 // returns and writes out true to the done channel.
237 func (g *GSLoader) download(toDownload <-chan fuzzPackage, reports chan<- data.F uzzReport, wg *sync.WaitGroup) {
238 defer wg.Done()
239 for job := range toDownload {
240 p := data.GCSPackage{
241 Name: job.FuzzName,
242 FuzzCategory: job.FuzzCategory,
243 Debug: data.OutputFiles{
244 Asan: emptyStringOnError(gs.FileContentsFromGS (g.storageClient, config.GS.Bucket, job.DebugASANName)),
245 Dump: emptyStringOnError(gs.FileContentsFromGS (g.storageClient, config.GS.Bucket, job.DebugDumpName)),
246 StdErr: emptyStringOnError(gs.FileContentsFromGS (g.storageClient, config.GS.Bucket, job.DebugErrName)),
247 },
248 Release: data.OutputFiles{
249 Asan: emptyStringOnError(gs.FileContentsFromGS (g.storageClient, config.GS.Bucket, job.ReleaseASANName)),
250 Dump: emptyStringOnError(gs.FileContentsFromGS (g.storageClient, config.GS.Bucket, job.ReleaseDumpName)),
251 StdErr: emptyStringOnError(gs.FileContentsFromGS (g.storageClient, config.GS.Bucket, job.ReleaseErrName)),
252 },
253 }
254
255 reports <- data.ParseReport(p)
256 atomic.AddInt32(&g.completedCounter, 1)
257 if g.completedCounter%100 == 0 {
258 glog.Infof("%d fuzzes downloaded", g.completedCounter)
259 }
260 }
261 }
OLDNEW
« no previous file with comments | « fuzzer/go/frontend/data/testdata/stacktrace/7bad_release.err ('k') | fuzzer/go/frontend/syncer/fuzz_syncer.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698