OLD | NEW |
1 package gsloader | 1 package gsloader |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 "sort" | 5 "sort" |
6 "sync" | |
7 "sync/atomic" | |
8 | 6 |
9 "github.com/skia-dev/glog" | 7 "github.com/skia-dev/glog" |
10 "go.skia.org/infra/fuzzer/go/common" | 8 "go.skia.org/infra/fuzzer/go/common" |
11 "go.skia.org/infra/fuzzer/go/config" | 9 "go.skia.org/infra/fuzzer/go/config" |
12 » "go.skia.org/infra/fuzzer/go/deduplicator" | 10 » "go.skia.org/infra/fuzzer/go/data" |
13 » "go.skia.org/infra/fuzzer/go/frontend/data" | |
14 "go.skia.org/infra/fuzzer/go/fuzzcache" | 11 "go.skia.org/infra/fuzzer/go/fuzzcache" |
15 » "go.skia.org/infra/go/gs" | 12 » fstorage "go.skia.org/infra/fuzzer/go/storage" |
16 "google.golang.org/cloud/storage" | 13 "google.golang.org/cloud/storage" |
17 ) | 14 ) |
18 | 15 |
19 // LoadFromBoltDB loads the data.FuzzReport from FuzzReportCache associated with
the given hash. | 16 // LoadFromBoltDB loads the data.FuzzReport from FuzzReportCache associated with
the given hash. |
20 // The FuzzReport is first put into the staging fuzz cache, and then into the cu
rrent. | 17 // The FuzzReport is first put into the staging fuzz cache, and then into the cu
rrent. |
21 // If a cache for the commit does not exist, or there are other problems with th
e retrieval, | 18 // If a cache for the commit does not exist, or there are other problems with th
e retrieval, |
22 // an error is returned. We do not need to deduplicate on extraction because | 19 // an error is returned. We do not need to deduplicate on extraction because |
23 // the fuzzes were deduplicated on storage. | 20 // the fuzzes were deduplicated on storage. |
24 func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error { | 21 func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error { |
25 glog.Infof("Looking into cache for revision %s", config.FrontEnd.SkiaVer
sion.Hash) | 22 glog.Infof("Looking into cache for revision %s", config.FrontEnd.SkiaVer
sion.Hash) |
26 for _, category := range common.FUZZ_CATEGORIES { | 23 for _, category := range common.FUZZ_CATEGORIES { |
27 if staging, err := cache.LoadTree(category, config.FrontEnd.Skia
Version.Hash); err != nil { | 24 if staging, err := cache.LoadTree(category, config.FrontEnd.Skia
Version.Hash); err != nil { |
28 return fmt.Errorf("Problem decoding existing from bolt d
b: %s", err) | 25 return fmt.Errorf("Problem decoding existing from bolt d
b: %s", err) |
29 } else { | 26 } else { |
30 data.SetStaging(category, *staging) | 27 data.SetStaging(category, *staging) |
31 » » » glog.Infof("Successfully loaded %s fuzzes from bolt db c
ache", category) | 28 » » » glog.Infof("Successfully loaded %s fuzzes from bolt db c
ache with %d files", category, len(*staging)) |
32 } | 29 } |
33 } | 30 } |
34 data.StagingToCurrent() | 31 data.StagingToCurrent() |
35 return nil | 32 return nil |
36 } | 33 } |
37 | 34 |
38 // GSLoader is a struct that handles downloading fuzzes from Google Storage. | 35 // GSLoader is a struct that handles downloading fuzzes from Google Storage. |
39 type GSLoader struct { | 36 type GSLoader struct { |
40 storageClient *storage.Client | 37 storageClient *storage.Client |
41 Cache *fuzzcache.FuzzReportCache | 38 Cache *fuzzcache.FuzzReportCache |
42 deduplicator *deduplicator.Deduplicator | |
43 | |
44 // completedCounter is the number of fuzzes that have been downloaded fr
om GCS, used for logging. | |
45 completedCounter int32 | |
46 } | 39 } |
47 | 40 |
48 // New creates a GSLoader and returns it. | 41 // New creates a GSLoader and returns it. |
49 func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader { | 42 func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader { |
50 return &GSLoader{ | 43 return &GSLoader{ |
51 storageClient: s, | 44 storageClient: s, |
52 Cache: c, | 45 Cache: c, |
53 deduplicator: deduplicator.New(), | |
54 } | 46 } |
55 } | 47 } |
56 | 48 |
57 // LoadFreshFromGoogleStorage pulls all fuzzes out of GCS and loads them into me
mory. | 49 // LoadFreshFromGoogleStorage pulls all fuzzes out of GCS and loads them into me
mory. |
58 // The "fresh" in the name refers to the fact that all other loaded fuzzes (if a
ny) | 50 // The "fresh" in the name refers to the fact that all other loaded fuzzes (if a
ny) |
59 // are written over, including in the cache. | 51 // are written over, including in the cache. |
60 // Upon completion, the full results are cached to a boltDB instance and moved f
rom staging | 52 // Upon completion, the full results are cached to a boltDB instance and moved f
rom staging |
61 // to the current copy. | 53 // to the current copy. |
62 func (g *GSLoader) LoadFreshFromGoogleStorage() error { | 54 func (g *GSLoader) LoadFreshFromGoogleStorage() error { |
63 revision := config.FrontEnd.SkiaVersion.Hash | 55 revision := config.FrontEnd.SkiaVersion.Hash |
64 data.ClearStaging() | 56 data.ClearStaging() |
65 g.deduplicator.Clear() | |
66 fuzzNames := make([]string, 0, 100) | 57 fuzzNames := make([]string, 0, 100) |
| 58 |
67 for _, cat := range common.FUZZ_CATEGORIES { | 59 for _, cat := range common.FUZZ_CATEGORIES { |
68 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) | 60 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) |
69 » » reports, err := g.getBinaryReportsFromGS(badPath, cat, nil) | 61 » » reports, err := fstorage.GetReportsFromGS(g.storageClient, badPa
th, cat, nil, config.FrontEnd.NumDownloadProcesses) |
70 if err != nil { | 62 if err != nil { |
71 return err | 63 return err |
72 } | 64 } |
73 b := 0 | 65 b := 0 |
74 d := 0 | |
75 for report := range reports { | 66 for report := range reports { |
76 // We always add the fuzzName, to avoid redownloading du
plicates over and over again. | |
77 fuzzNames = append(fuzzNames, report.FuzzName) | 67 fuzzNames = append(fuzzNames, report.FuzzName) |
78 » » » if g.deduplicator.IsUnique(report) { | 68 » » » data.NewFuzzFound(cat, report) |
79 » » » » data.NewFuzzFound(cat, report) | 69 » » » b++ |
80 » » » » b++ | |
81 » » » } else { | |
82 » » » » d++ | |
83 » » » } | |
84 | |
85 } | 70 } |
86 » » glog.Infof("%d bad fuzzes (%d duplicate) freshly loaded from gs:
//%s/%s", b, d, config.GS.Bucket, badPath) | 71 » » glog.Infof("%d bad fuzzes freshly loaded from gs://%s/%s", b, co
nfig.GS.Bucket, badPath) |
87 » » data.StagingToCurrent() | |
88 } | 72 } |
| 73 // We must wait until after all the fuzzes are in staging, otherwise, we
'll only have a partial update |
| 74 data.StagingToCurrent() |
89 | 75 |
90 for _, category := range common.FUZZ_CATEGORIES { | 76 for _, category := range common.FUZZ_CATEGORIES { |
91 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { | 77 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { |
92 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) | 78 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) |
93 } | 79 } |
94 } | 80 } |
95 return g.Cache.StoreFuzzNames(fuzzNames, revision) | 81 return g.Cache.StoreFuzzNames(fuzzNames, revision) |
96 } | 82 } |
97 | 83 |
98 // LoadBinaryFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the
given whitelist | 84 // LoadFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given
whitelist |
99 // and loads them into memory (as staging). After loading them, it updates the
cache | 85 // and loads them into memory (as staging). After loading them, it updates the
cache |
100 // and moves them from staging to the current copy. | 86 // and moves them from staging to the current copy. |
101 func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error { | 87 func (g *GSLoader) LoadFuzzesFromGoogleStorage(whitelist []string) error { |
102 revision := config.FrontEnd.SkiaVersion.Hash | 88 revision := config.FrontEnd.SkiaVersion.Hash |
103 data.StagingFromCurrent() | 89 data.StagingFromCurrent() |
104 sort.Strings(whitelist) | 90 sort.Strings(whitelist) |
105 | 91 |
106 fuzzNames := make([]string, 0, 100) | 92 fuzzNames := make([]string, 0, 100) |
107 for _, cat := range common.FUZZ_CATEGORIES { | 93 for _, cat := range common.FUZZ_CATEGORIES { |
108 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) | 94 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) |
109 » » reports, err := g.getBinaryReportsFromGS(badPath, cat, whitelist
) | 95 » » reports, err := fstorage.GetReportsFromGS(g.storageClient, badPa
th, cat, whitelist, config.FrontEnd.NumDownloadProcesses) |
110 if err != nil { | 96 if err != nil { |
111 return err | 97 return err |
112 } | 98 } |
113 b := 0 | 99 b := 0 |
114 d := 0 | |
115 for report := range reports { | 100 for report := range reports { |
116 // We always add the fuzzName, to avoid redownloading du
plicates over and over again. | |
117 fuzzNames = append(fuzzNames, report.FuzzName) | 101 fuzzNames = append(fuzzNames, report.FuzzName) |
118 » » » if g.deduplicator.IsUnique(report) { | 102 » » » data.NewFuzzFound(cat, report) |
119 » » » » data.NewFuzzFound(cat, report) | 103 » » » b++ |
120 » » » » b++ | |
121 » » » } else { | |
122 » » » » d++ | |
123 » » » } | |
124 } | 104 } |
125 » » glog.Infof("%d bad fuzzes (%d duplicate) incrementally loaded fr
om gs://%s/%s", b, d, config.GS.Bucket, badPath) | 105 » » glog.Infof("%d bad fuzzes incrementally loaded from gs://%s/%s",
b, config.GS.Bucket, badPath) |
126 } | 106 } |
| 107 // We must wait until after all the fuzzes are in staging, otherwise, we
'll only have a partial update |
127 data.StagingToCurrent() | 108 data.StagingToCurrent() |
128 | 109 |
129 oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision) | 110 oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision) |
130 if err != nil { | 111 if err != nil { |
131 glog.Warningf("Could not read old binary fuzz names from cache.
Continuing...", err) | 112 glog.Warningf("Could not read old binary fuzz names from cache.
Continuing...", err) |
132 oldBinaryFuzzNames = []string{} | 113 oldBinaryFuzzNames = []string{} |
133 } | 114 } |
134 for _, category := range common.FUZZ_CATEGORIES { | 115 for _, category := range common.FUZZ_CATEGORIES { |
135 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { | 116 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { |
136 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) | 117 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) |
137 } | 118 } |
138 } | 119 } |
139 return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...),
revision) | 120 return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...),
revision) |
140 } | 121 } |
141 | |
142 // A fuzzPackage contains all the information about a fuzz, mostly the paths to
the files that | |
143 // need to be downloaded. | |
144 type fuzzPackage struct { | |
145 FuzzName string | |
146 FuzzCategory string | |
147 DebugASANName string | |
148 DebugDumpName string | |
149 DebugErrName string | |
150 ReleaseASANName string | |
151 ReleaseDumpName string | |
152 ReleaseErrName string | |
153 } | |
154 | |
155 // getBinaryReportsFromGS pulls all files in baseFolder from the skia-fuzzer buc
ket and | |
156 // groups them by fuzz. It parses these groups of files into a BinaryFuzzReport
and returns | |
157 // a channel through whcih all reports generated in this way will be streamed. | |
158 // The channel will be closed when all reports are done being sent. | |
159 func (g *GSLoader) getBinaryReportsFromGS(baseFolder, category string, whitelist
[]string) (<-chan data.FuzzReport, error) { | |
160 reports := make(chan data.FuzzReport, 10000) | |
161 | |
162 fuzzPackages, err := g.fetchFuzzPackages(baseFolder, category) | |
163 if err != nil { | |
164 close(reports) | |
165 return reports, err | |
166 } | |
167 | |
168 toDownload := make(chan fuzzPackage, len(fuzzPackages)) | |
169 g.completedCounter = 0 | |
170 | |
171 var wg sync.WaitGroup | |
172 for i := 0; i < config.FrontEnd.NumDownloadProcesses; i++ { | |
173 wg.Add(1) | |
174 go g.download(toDownload, reports, &wg) | |
175 } | |
176 | |
177 for _, d := range fuzzPackages { | |
178 if whitelist != nil { | |
179 name := d.FuzzName | |
180 if i := sort.SearchStrings(whitelist, name); i < len(whi
telist) && whitelist[i] == name { | |
181 // is on the whitelist | |
182 toDownload <- d | |
183 } | |
184 } else { | |
185 // no white list | |
186 toDownload <- d | |
187 } | |
188 } | |
189 close(toDownload) | |
190 | |
191 go func() { | |
192 wg.Wait() | |
193 close(reports) | |
194 }() | |
195 | |
196 return reports, nil | |
197 } | |
198 | |
199 // fetchFuzzPackages scans for all fuzzes in the given folder and returns a | |
200 // slice of all of the metadata for each fuzz, as a fuzz package. It returns | |
201 // error if it cannot access Google Storage. | |
202 func (g *GSLoader) fetchFuzzPackages(baseFolder, category string) (fuzzPackages
[]fuzzPackage, err error) { | |
203 | |
204 fuzzNames, err := common.GetAllFuzzNamesInFolder(g.storageClient, baseFo
lder) | |
205 if err != nil { | |
206 return nil, fmt.Errorf("Problem getting fuzz packages from %s: %
s", baseFolder, err) | |
207 } | |
208 for _, fuzzName := range fuzzNames { | |
209 prefix := fmt.Sprintf("%s/%s/%s", baseFolder, fuzzName, fuzzName
) | |
210 fuzzPackages = append(fuzzPackages, fuzzPackage{ | |
211 FuzzName: fuzzName, | |
212 FuzzCategory: category, | |
213 DebugASANName: fmt.Sprintf("%s_debug.asan", prefix), | |
214 DebugDumpName: fmt.Sprintf("%s_debug.dump", prefix), | |
215 DebugErrName: fmt.Sprintf("%s_debug.err", prefix), | |
216 ReleaseASANName: fmt.Sprintf("%s_release.asan", prefix), | |
217 ReleaseDumpName: fmt.Sprintf("%s_release.dump", prefix), | |
218 ReleaseErrName: fmt.Sprintf("%s_release.err", prefix), | |
219 }) | |
220 } | |
221 return fuzzPackages, nil | |
222 } | |
223 | |
224 // emptyStringOnError returns a string of the passed in bytes or empty string if
err is nil. | |
225 func emptyStringOnError(b []byte, err error) string { | |
226 if err != nil { | |
227 glog.Warningf("Ignoring error when fetching file contents: %v",
err) | |
228 return "" | |
229 } | |
230 return string(b) | |
231 } | |
232 | |
233 // download waits for fuzzPackages to appear on the toDownload channel and then
downloads | |
234 // the four pieces of the package. It then parses them into a BinaryFuzzReport
and sends | |
235 // the binary to the passed in channel. When there is no more work to be done,
this function. | |
236 // returns and writes out true to the done channel. | |
237 func (g *GSLoader) download(toDownload <-chan fuzzPackage, reports chan<- data.F
uzzReport, wg *sync.WaitGroup) { | |
238 defer wg.Done() | |
239 for job := range toDownload { | |
240 p := data.GCSPackage{ | |
241 Name: job.FuzzName, | |
242 FuzzCategory: job.FuzzCategory, | |
243 Debug: data.OutputFiles{ | |
244 Asan: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.DebugASANName)), | |
245 Dump: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.DebugDumpName)), | |
246 StdErr: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.DebugErrName)), | |
247 }, | |
248 Release: data.OutputFiles{ | |
249 Asan: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.ReleaseASANName)), | |
250 Dump: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.ReleaseDumpName)), | |
251 StdErr: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.ReleaseErrName)), | |
252 }, | |
253 } | |
254 | |
255 reports <- data.ParseReport(p) | |
256 atomic.AddInt32(&g.completedCounter, 1) | |
257 if g.completedCounter%100 == 0 { | |
258 glog.Infof("%d fuzzes downloaded", g.completedCounter) | |
259 } | |
260 } | |
261 } | |
OLD | NEW |