| OLD | NEW |
| 1 package gsloader | 1 package gsloader |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "fmt" | 4 "fmt" |
| 5 "sort" | 5 "sort" |
| 6 "sync" | |
| 7 "sync/atomic" | |
| 8 | 6 |
| 9 "github.com/skia-dev/glog" | 7 "github.com/skia-dev/glog" |
| 10 "go.skia.org/infra/fuzzer/go/common" | 8 "go.skia.org/infra/fuzzer/go/common" |
| 11 "go.skia.org/infra/fuzzer/go/config" | 9 "go.skia.org/infra/fuzzer/go/config" |
| 12 » "go.skia.org/infra/fuzzer/go/deduplicator" | 10 » "go.skia.org/infra/fuzzer/go/data" |
| 13 » "go.skia.org/infra/fuzzer/go/frontend/data" | |
| 14 "go.skia.org/infra/fuzzer/go/fuzzcache" | 11 "go.skia.org/infra/fuzzer/go/fuzzcache" |
| 15 » "go.skia.org/infra/go/gs" | 12 » fstorage "go.skia.org/infra/fuzzer/go/storage" |
| 16 "google.golang.org/cloud/storage" | 13 "google.golang.org/cloud/storage" |
| 17 ) | 14 ) |
| 18 | 15 |
| 19 // LoadFromBoltDB loads the data.FuzzReport from FuzzReportCache associated with
the given hash. | 16 // LoadFromBoltDB loads the data.FuzzReport from FuzzReportCache associated with
the given hash. |
| 20 // The FuzzReport is first put into the staging fuzz cache, and then into the cu
rrent. | 17 // The FuzzReport is first put into the staging fuzz cache, and then into the cu
rrent. |
| 21 // If a cache for the commit does not exist, or there are other problems with th
e retrieval, | 18 // If a cache for the commit does not exist, or there are other problems with th
e retrieval, |
| 22 // an error is returned. We do not need to deduplicate on extraction because | 19 // an error is returned. We do not need to deduplicate on extraction because |
| 23 // the fuzzes were deduplicated on storage. | 20 // the fuzzes were deduplicated on storage. |
| 24 func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error { | 21 func LoadFromBoltDB(cache *fuzzcache.FuzzReportCache) error { |
| 25 glog.Infof("Looking into cache for revision %s", config.FrontEnd.SkiaVer
sion.Hash) | 22 glog.Infof("Looking into cache for revision %s", config.FrontEnd.SkiaVer
sion.Hash) |
| 26 for _, category := range common.FUZZ_CATEGORIES { | 23 for _, category := range common.FUZZ_CATEGORIES { |
| 27 if staging, err := cache.LoadTree(category, config.FrontEnd.Skia
Version.Hash); err != nil { | 24 if staging, err := cache.LoadTree(category, config.FrontEnd.Skia
Version.Hash); err != nil { |
| 28 return fmt.Errorf("Problem decoding existing from bolt d
b: %s", err) | 25 return fmt.Errorf("Problem decoding existing from bolt d
b: %s", err) |
| 29 } else { | 26 } else { |
| 30 data.SetStaging(category, *staging) | 27 data.SetStaging(category, *staging) |
| 31 » » » glog.Infof("Successfully loaded %s fuzzes from bolt db c
ache", category) | 28 » » » glog.Infof("Successfully loaded %s fuzzes from bolt db c
ache with %d files", category, len(*staging)) |
| 32 } | 29 } |
| 33 } | 30 } |
| 34 data.StagingToCurrent() | 31 data.StagingToCurrent() |
| 35 return nil | 32 return nil |
| 36 } | 33 } |
| 37 | 34 |
| 38 // GSLoader is a struct that handles downloading fuzzes from Google Storage. | 35 // GSLoader is a struct that handles downloading fuzzes from Google Storage. |
| 39 type GSLoader struct { | 36 type GSLoader struct { |
| 40 storageClient *storage.Client | 37 storageClient *storage.Client |
| 41 Cache *fuzzcache.FuzzReportCache | 38 Cache *fuzzcache.FuzzReportCache |
| 42 deduplicator *deduplicator.Deduplicator | |
| 43 | |
| 44 // completedCounter is the number of fuzzes that have been downloaded fr
om GCS, used for logging. | |
| 45 completedCounter int32 | |
| 46 } | 39 } |
| 47 | 40 |
| 48 // New creates a GSLoader and returns it. | 41 // New creates a GSLoader and returns it. |
| 49 func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader { | 42 func New(s *storage.Client, c *fuzzcache.FuzzReportCache) *GSLoader { |
| 50 return &GSLoader{ | 43 return &GSLoader{ |
| 51 storageClient: s, | 44 storageClient: s, |
| 52 Cache: c, | 45 Cache: c, |
| 53 deduplicator: deduplicator.New(), | |
| 54 } | 46 } |
| 55 } | 47 } |
| 56 | 48 |
| 57 // LoadFreshFromGoogleStorage pulls all fuzzes out of GCS and loads them into me
mory. | 49 // LoadFreshFromGoogleStorage pulls all fuzzes out of GCS and loads them into me
mory. |
| 58 // The "fresh" in the name refers to the fact that all other loaded fuzzes (if a
ny) | 50 // The "fresh" in the name refers to the fact that all other loaded fuzzes (if a
ny) |
| 59 // are written over, including in the cache. | 51 // are written over, including in the cache. |
| 60 // Upon completion, the full results are cached to a boltDB instance and moved f
rom staging | 52 // Upon completion, the full results are cached to a boltDB instance and moved f
rom staging |
| 61 // to the current copy. | 53 // to the current copy. |
| 62 func (g *GSLoader) LoadFreshFromGoogleStorage() error { | 54 func (g *GSLoader) LoadFreshFromGoogleStorage() error { |
| 63 revision := config.FrontEnd.SkiaVersion.Hash | 55 revision := config.FrontEnd.SkiaVersion.Hash |
| 64 data.ClearStaging() | 56 data.ClearStaging() |
| 65 g.deduplicator.Clear() | |
| 66 fuzzNames := make([]string, 0, 100) | 57 fuzzNames := make([]string, 0, 100) |
| 58 |
| 67 for _, cat := range common.FUZZ_CATEGORIES { | 59 for _, cat := range common.FUZZ_CATEGORIES { |
| 68 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) | 60 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) |
| 69 » » reports, err := g.getBinaryReportsFromGS(badPath, cat, nil) | 61 » » reports, err := fstorage.GetReportsFromGS(g.storageClient, badPa
th, cat, nil, config.FrontEnd.NumDownloadProcesses) |
| 70 if err != nil { | 62 if err != nil { |
| 71 return err | 63 return err |
| 72 } | 64 } |
| 73 b := 0 | 65 b := 0 |
| 74 d := 0 | |
| 75 for report := range reports { | 66 for report := range reports { |
| 76 // We always add the fuzzName, to avoid redownloading du
plicates over and over again. | |
| 77 fuzzNames = append(fuzzNames, report.FuzzName) | 67 fuzzNames = append(fuzzNames, report.FuzzName) |
| 78 » » » if g.deduplicator.IsUnique(report) { | 68 » » » data.NewFuzzFound(cat, report) |
| 79 » » » » data.NewFuzzFound(cat, report) | 69 » » » b++ |
| 80 » » » » b++ | |
| 81 » » » } else { | |
| 82 » » » » d++ | |
| 83 » » » } | |
| 84 | |
| 85 } | 70 } |
| 86 » » glog.Infof("%d bad fuzzes (%d duplicate) freshly loaded from gs:
//%s/%s", b, d, config.GS.Bucket, badPath) | 71 » » glog.Infof("%d bad fuzzes freshly loaded from gs://%s/%s", b, co
nfig.GS.Bucket, badPath) |
| 87 » » data.StagingToCurrent() | |
| 88 } | 72 } |
| 73 // We must wait until after all the fuzzes are in staging, otherwise, we
'll only have a partial update |
| 74 data.StagingToCurrent() |
| 89 | 75 |
| 90 for _, category := range common.FUZZ_CATEGORIES { | 76 for _, category := range common.FUZZ_CATEGORIES { |
| 91 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { | 77 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { |
| 92 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) | 78 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) |
| 93 } | 79 } |
| 94 } | 80 } |
| 95 return g.Cache.StoreFuzzNames(fuzzNames, revision) | 81 return g.Cache.StoreFuzzNames(fuzzNames, revision) |
| 96 } | 82 } |
| 97 | 83 |
| 98 // LoadBinaryFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the
given whitelist | 84 // LoadFuzzesFromGoogleStorage pulls all fuzzes out of GCS that are on the given
whitelist |
| 99 // and loads them into memory (as staging). After loading them, it updates the
cache | 85 // and loads them into memory (as staging). After loading them, it updates the
cache |
| 100 // and moves them from staging to the current copy. | 86 // and moves them from staging to the current copy. |
| 101 func (g *GSLoader) LoadBinaryFuzzesFromGoogleStorage(whitelist []string) error { | 87 func (g *GSLoader) LoadFuzzesFromGoogleStorage(whitelist []string) error { |
| 102 revision := config.FrontEnd.SkiaVersion.Hash | 88 revision := config.FrontEnd.SkiaVersion.Hash |
| 103 data.StagingFromCurrent() | 89 data.StagingFromCurrent() |
| 104 sort.Strings(whitelist) | 90 sort.Strings(whitelist) |
| 105 | 91 |
| 106 fuzzNames := make([]string, 0, 100) | 92 fuzzNames := make([]string, 0, 100) |
| 107 for _, cat := range common.FUZZ_CATEGORIES { | 93 for _, cat := range common.FUZZ_CATEGORIES { |
| 108 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) | 94 badPath := fmt.Sprintf("%s/%s/bad", cat, revision) |
| 109 » » reports, err := g.getBinaryReportsFromGS(badPath, cat, whitelist
) | 95 » » reports, err := fstorage.GetReportsFromGS(g.storageClient, badPa
th, cat, whitelist, config.FrontEnd.NumDownloadProcesses) |
| 110 if err != nil { | 96 if err != nil { |
| 111 return err | 97 return err |
| 112 } | 98 } |
| 113 b := 0 | 99 b := 0 |
| 114 d := 0 | |
| 115 for report := range reports { | 100 for report := range reports { |
| 116 // We always add the fuzzName, to avoid redownloading du
plicates over and over again. | |
| 117 fuzzNames = append(fuzzNames, report.FuzzName) | 101 fuzzNames = append(fuzzNames, report.FuzzName) |
| 118 » » » if g.deduplicator.IsUnique(report) { | 102 » » » data.NewFuzzFound(cat, report) |
| 119 » » » » data.NewFuzzFound(cat, report) | 103 » » » b++ |
| 120 » » » » b++ | |
| 121 » » » } else { | |
| 122 » » » » d++ | |
| 123 » » » } | |
| 124 } | 104 } |
| 125 » » glog.Infof("%d bad fuzzes (%d duplicate) incrementally loaded fr
om gs://%s/%s", b, d, config.GS.Bucket, badPath) | 105 » » glog.Infof("%d bad fuzzes incrementally loaded from gs://%s/%s",
b, config.GS.Bucket, badPath) |
| 126 } | 106 } |
| 107 // We must wait until after all the fuzzes are in staging, otherwise, we
'll only have a partial update |
| 127 data.StagingToCurrent() | 108 data.StagingToCurrent() |
| 128 | 109 |
| 129 oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision) | 110 oldBinaryFuzzNames, err := g.Cache.LoadFuzzNames(revision) |
| 130 if err != nil { | 111 if err != nil { |
| 131 glog.Warningf("Could not read old binary fuzz names from cache.
Continuing...", err) | 112 glog.Warningf("Could not read old binary fuzz names from cache.
Continuing...", err) |
| 132 oldBinaryFuzzNames = []string{} | 113 oldBinaryFuzzNames = []string{} |
| 133 } | 114 } |
| 134 for _, category := range common.FUZZ_CATEGORIES { | 115 for _, category := range common.FUZZ_CATEGORIES { |
| 135 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { | 116 if err := g.Cache.StoreTree(data.StagingCopy(category), category
, revision); err != nil { |
| 136 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) | 117 glog.Errorf("Problem storing category %s to boltDB: %s",
category, err) |
| 137 } | 118 } |
| 138 } | 119 } |
| 139 return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...),
revision) | 120 return g.Cache.StoreFuzzNames(append(oldBinaryFuzzNames, whitelist...),
revision) |
| 140 } | 121 } |
| 141 | |
| 142 // A fuzzPackage contains all the information about a fuzz, mostly the paths to
the files that | |
| 143 // need to be downloaded. | |
| 144 type fuzzPackage struct { | |
| 145 FuzzName string | |
| 146 FuzzCategory string | |
| 147 DebugASANName string | |
| 148 DebugDumpName string | |
| 149 DebugErrName string | |
| 150 ReleaseASANName string | |
| 151 ReleaseDumpName string | |
| 152 ReleaseErrName string | |
| 153 } | |
| 154 | |
| 155 // getBinaryReportsFromGS pulls all files in baseFolder from the skia-fuzzer buc
ket and | |
| 156 // groups them by fuzz. It parses these groups of files into a BinaryFuzzReport
and returns | |
| 157 // a channel through whcih all reports generated in this way will be streamed. | |
| 158 // The channel will be closed when all reports are done being sent. | |
| 159 func (g *GSLoader) getBinaryReportsFromGS(baseFolder, category string, whitelist
[]string) (<-chan data.FuzzReport, error) { | |
| 160 reports := make(chan data.FuzzReport, 10000) | |
| 161 | |
| 162 fuzzPackages, err := g.fetchFuzzPackages(baseFolder, category) | |
| 163 if err != nil { | |
| 164 close(reports) | |
| 165 return reports, err | |
| 166 } | |
| 167 | |
| 168 toDownload := make(chan fuzzPackage, len(fuzzPackages)) | |
| 169 g.completedCounter = 0 | |
| 170 | |
| 171 var wg sync.WaitGroup | |
| 172 for i := 0; i < config.FrontEnd.NumDownloadProcesses; i++ { | |
| 173 wg.Add(1) | |
| 174 go g.download(toDownload, reports, &wg) | |
| 175 } | |
| 176 | |
| 177 for _, d := range fuzzPackages { | |
| 178 if whitelist != nil { | |
| 179 name := d.FuzzName | |
| 180 if i := sort.SearchStrings(whitelist, name); i < len(whi
telist) && whitelist[i] == name { | |
| 181 // is on the whitelist | |
| 182 toDownload <- d | |
| 183 } | |
| 184 } else { | |
| 185 // no white list | |
| 186 toDownload <- d | |
| 187 } | |
| 188 } | |
| 189 close(toDownload) | |
| 190 | |
| 191 go func() { | |
| 192 wg.Wait() | |
| 193 close(reports) | |
| 194 }() | |
| 195 | |
| 196 return reports, nil | |
| 197 } | |
| 198 | |
| 199 // fetchFuzzPackages scans for all fuzzes in the given folder and returns a | |
| 200 // slice of all of the metadata for each fuzz, as a fuzz package. It returns | |
| 201 // error if it cannot access Google Storage. | |
| 202 func (g *GSLoader) fetchFuzzPackages(baseFolder, category string) (fuzzPackages
[]fuzzPackage, err error) { | |
| 203 | |
| 204 fuzzNames, err := common.GetAllFuzzNamesInFolder(g.storageClient, baseFo
lder) | |
| 205 if err != nil { | |
| 206 return nil, fmt.Errorf("Problem getting fuzz packages from %s: %
s", baseFolder, err) | |
| 207 } | |
| 208 for _, fuzzName := range fuzzNames { | |
| 209 prefix := fmt.Sprintf("%s/%s/%s", baseFolder, fuzzName, fuzzName
) | |
| 210 fuzzPackages = append(fuzzPackages, fuzzPackage{ | |
| 211 FuzzName: fuzzName, | |
| 212 FuzzCategory: category, | |
| 213 DebugASANName: fmt.Sprintf("%s_debug.asan", prefix), | |
| 214 DebugDumpName: fmt.Sprintf("%s_debug.dump", prefix), | |
| 215 DebugErrName: fmt.Sprintf("%s_debug.err", prefix), | |
| 216 ReleaseASANName: fmt.Sprintf("%s_release.asan", prefix), | |
| 217 ReleaseDumpName: fmt.Sprintf("%s_release.dump", prefix), | |
| 218 ReleaseErrName: fmt.Sprintf("%s_release.err", prefix), | |
| 219 }) | |
| 220 } | |
| 221 return fuzzPackages, nil | |
| 222 } | |
| 223 | |
| 224 // emptyStringOnError returns a string of the passed in bytes or empty string if
err is nil. | |
| 225 func emptyStringOnError(b []byte, err error) string { | |
| 226 if err != nil { | |
| 227 glog.Warningf("Ignoring error when fetching file contents: %v",
err) | |
| 228 return "" | |
| 229 } | |
| 230 return string(b) | |
| 231 } | |
| 232 | |
| 233 // download waits for fuzzPackages to appear on the toDownload channel and then
downloads | |
| 234 // the four pieces of the package. It then parses them into a BinaryFuzzReport
and sends | |
| 235 // the binary to the passed in channel. When there is no more work to be done,
this function. | |
| 236 // returns and writes out true to the done channel. | |
| 237 func (g *GSLoader) download(toDownload <-chan fuzzPackage, reports chan<- data.F
uzzReport, wg *sync.WaitGroup) { | |
| 238 defer wg.Done() | |
| 239 for job := range toDownload { | |
| 240 p := data.GCSPackage{ | |
| 241 Name: job.FuzzName, | |
| 242 FuzzCategory: job.FuzzCategory, | |
| 243 Debug: data.OutputFiles{ | |
| 244 Asan: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.DebugASANName)), | |
| 245 Dump: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.DebugDumpName)), | |
| 246 StdErr: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.DebugErrName)), | |
| 247 }, | |
| 248 Release: data.OutputFiles{ | |
| 249 Asan: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.ReleaseASANName)), | |
| 250 Dump: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.ReleaseDumpName)), | |
| 251 StdErr: emptyStringOnError(gs.FileContentsFromGS
(g.storageClient, config.GS.Bucket, job.ReleaseErrName)), | |
| 252 }, | |
| 253 } | |
| 254 | |
| 255 reports <- data.ParseReport(p) | |
| 256 atomic.AddInt32(&g.completedCounter, 1) | |
| 257 if g.completedCounter%100 == 0 { | |
| 258 glog.Infof("%d fuzzes downloaded", g.completedCounter) | |
| 259 } | |
| 260 } | |
| 261 } | |
| OLD | NEW |