OLD | NEW |
1 package backend | 1 package backend |
2 | 2 |
3 import ( | 3 import ( |
4 "fmt" | 4 "fmt" |
5 "io/ioutil" | |
6 "os" | |
7 "path/filepath" | 5 "path/filepath" |
8 "strings" | 6 "strings" |
9 "sync" | |
10 "sync/atomic" | |
11 | 7 |
12 "github.com/skia-dev/glog" | 8 "github.com/skia-dev/glog" |
13 "go.skia.org/infra/fuzzer/go/aggregator" | 9 "go.skia.org/infra/fuzzer/go/aggregator" |
14 "go.skia.org/infra/fuzzer/go/common" | 10 "go.skia.org/infra/fuzzer/go/common" |
15 "go.skia.org/infra/fuzzer/go/config" | 11 "go.skia.org/infra/fuzzer/go/config" |
16 "go.skia.org/infra/fuzzer/go/generator" | 12 "go.skia.org/infra/fuzzer/go/generator" |
17 "go.skia.org/infra/go/fileutil" | |
18 "go.skia.org/infra/go/gs" | 13 "go.skia.org/infra/go/gs" |
| 14 "go.skia.org/infra/go/util" |
19 "go.skia.org/infra/go/vcsinfo" | 15 "go.skia.org/infra/go/vcsinfo" |
20 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
21 "google.golang.org/cloud/storage" | 17 "google.golang.org/cloud/storage" |
22 ) | 18 ) |
23 | 19 |
24 // FuzzPipeline holds onto the generation/aggregation parts for a given fuzz cat
egory. This allows | 20 // FuzzPipeline holds onto the generation/aggregation parts for a given fuzz cat
egory. This allows |
25 // VersionUpdater to stop all active fuzz generation, download pre-existing fuzz
es, re-analyze | 21 // VersionUpdater to stop all active fuzz generation, download pre-existing fuzz
es, re-analyze |
26 // them, and then restart generation. | 22 // them, and then restart generation. |
27 type FuzzPipeline struct { | 23 type FuzzPipeline struct { |
28 Category string | 24 Category string |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
73 // change GCS version to have the current be up to date (fuzzer-fe will
be polling for it) | 69 // change GCS version to have the current be up to date (fuzzer-fe will
be polling for it) |
74 if err := v.replaceCurrentSkiaVersionWith(oldRevision, config.Generator.
SkiaVersion.Hash); err != nil { | 70 if err := v.replaceCurrentSkiaVersionWith(oldRevision, config.Generator.
SkiaVersion.Hash); err != nil { |
75 return nil, fmt.Errorf("Could not update skia error: %s", err) | 71 return nil, fmt.Errorf("Could not update skia error: %s", err) |
76 } | 72 } |
77 | 73 |
78 return config.Generator.SkiaVersion, nil | 74 return config.Generator.SkiaVersion, nil |
79 } | 75 } |
80 | 76 |
81 func (p *FuzzPipeline) reanalyzeAndRestart(storageClient *storage.Client, oldRev
ision string) error { | 77 func (p *FuzzPipeline) reanalyzeAndRestart(storageClient *storage.Client, oldRev
ision string) error { |
82 // download all bad and grey fuzzes | 78 // download all bad and grey fuzzes |
83 » badFuzzNames, greyFuzzNames, err := p.downloadAllBadAndGreyFuzzes(oldRev
ision, storageClient) | 79 » badFuzzPaths, greyFuzzPaths, err := p.downloadAllBadAndGreyFuzzes(oldRev
ision, storageClient) |
84 if err != nil { | 80 if err != nil { |
85 return fmt.Errorf("Problem downloading all previous fuzzes: %s",
err) | 81 return fmt.Errorf("Problem downloading all previous fuzzes: %s",
err) |
86 } | 82 } |
87 » glog.Infof("There are %d bad fuzzes and %d grey fuzzes to rescan.", len(
badFuzzNames), len(greyFuzzNames)) | 83 » glog.Infof("There are %d bad fuzzes and %d grey fuzzes of category %s to
rescan.", len(badFuzzPaths), len(greyFuzzPaths), p.Category) |
88 // This is a soft shutdown, i.e. it waits for aggregator's queues to be
empty | 84 // This is a soft shutdown, i.e. it waits for aggregator's queues to be
empty |
89 p.Agg.ShutDown() | 85 p.Agg.ShutDown() |
90 | 86 |
91 if config.Common.ForceReanalysis { | 87 if config.Common.ForceReanalysis { |
92 » » glog.Infof("Deleting previous fuzz results") | 88 » » glog.Infof("Deleting previous %s fuzz results", p.Category) |
93 if err := gs.DeleteAllFilesInDir(storageClient, config.GS.Bucket
, fmt.Sprintf("%s/%s/", p.Category, oldRevision), config.Aggregator.NumUploadPro
cesses); err != nil { | 89 if err := gs.DeleteAllFilesInDir(storageClient, config.GS.Bucket
, fmt.Sprintf("%s/%s/", p.Category, oldRevision), config.Aggregator.NumUploadPro
cesses); err != nil { |
94 return fmt.Errorf("Could not delete previous fuzzes: %s"
, err) | 90 return fmt.Errorf("Could not delete previous fuzzes: %s"
, err) |
95 } | 91 } |
96 } | 92 } |
97 | 93 |
98 if err := p.Gen.Clear(); err != nil { | 94 if err := p.Gen.Clear(); err != nil { |
99 return fmt.Errorf("Could not remove previous afl-fuzz results: %
s", err) | 95 return fmt.Errorf("Could not remove previous afl-fuzz results: %
s", err) |
100 } | 96 } |
101 | 97 |
102 if err := p.Agg.RestartAnalysis(); err != nil { | 98 if err := p.Agg.RestartAnalysis(); err != nil { |
103 return fmt.Errorf("Had problem restarting analysis/upload chain:
%s", err) | 99 return fmt.Errorf("Had problem restarting analysis/upload chain:
%s", err) |
104 } | 100 } |
| 101 // If we aren't reanalyzing, we should upload the names of anything that
is currently there. |
| 102 // If we are reanalyzing, we should re-write the names after we analyze
them (see below). |
| 103 if !config.Common.ForceReanalysis { |
| 104 p.uploadFuzzNames(storageClient, oldRevision, common.ExtractFuzz
NamesFromPaths(badFuzzPaths), common.ExtractFuzzNamesFromPaths(greyFuzzPaths)) |
| 105 } |
105 // Reanalyze and reupload the fuzzes, making a bug on regressions. | 106 // Reanalyze and reupload the fuzzes, making a bug on regressions. |
106 glog.Infof("Reanalyzing bad fuzzes") | 107 glog.Infof("Reanalyzing bad fuzzes") |
107 p.Agg.MakeBugOnBadFuzz = false | 108 p.Agg.MakeBugOnBadFuzz = false |
108 p.Agg.UploadGreyFuzzes = true | 109 p.Agg.UploadGreyFuzzes = true |
109 » for _, name := range badFuzzNames { | 110 » p.Agg.ClearUploadedFuzzNames() |
| 111 » for _, name := range badFuzzPaths { |
110 p.Agg.ForceAnalysis(name) | 112 p.Agg.ForceAnalysis(name) |
111 } | 113 } |
112 p.Agg.WaitForEmptyQueues() | 114 p.Agg.WaitForEmptyQueues() |
113 glog.Infof("Reanalyzing grey fuzzes") | 115 glog.Infof("Reanalyzing grey fuzzes") |
114 p.Agg.MakeBugOnBadFuzz = true | 116 p.Agg.MakeBugOnBadFuzz = true |
115 » for _, name := range greyFuzzNames { | 117 » for _, name := range greyFuzzPaths { |
116 p.Agg.ForceAnalysis(name) | 118 p.Agg.ForceAnalysis(name) |
117 } | 119 } |
118 p.Agg.WaitForEmptyQueues() | 120 p.Agg.WaitForEmptyQueues() |
119 p.Agg.MakeBugOnBadFuzz = false | 121 p.Agg.MakeBugOnBadFuzz = false |
120 p.Agg.UploadGreyFuzzes = false | 122 p.Agg.UploadGreyFuzzes = false |
121 » glog.Infof("Done reanalyzing") | 123 » bad, grey := p.Agg.UploadedFuzzNames() |
| 124 » glog.Infof("Done reanalyzing %s. Uploaded %d bad and %d grey fuzzes", p
.Category, len(bad), len(grey)) |
| 125 |
| 126 » if config.Common.ForceReanalysis { |
| 127 » » p.uploadFuzzNames(storageClient, oldRevision, bad, grey) |
| 128 » } |
122 | 129 |
123 // redownload samples (in case any are new) | 130 // redownload samples (in case any are new) |
124 if err := p.Gen.DownloadSeedFiles(storageClient); err != nil { | 131 if err := p.Gen.DownloadSeedFiles(storageClient); err != nil { |
125 return fmt.Errorf("Could not download binary seed files: %s", er
r) | 132 return fmt.Errorf("Could not download binary seed files: %s", er
r) |
126 } | 133 } |
127 // restart afl-fuzz | 134 // restart afl-fuzz |
128 return p.Gen.Start() | 135 return p.Gen.Start() |
129 } | 136 } |
130 | 137 |
131 // completedCounter is the number of fuzzes that have been downloaded from GCS,
used for logging. | |
132 var completedCounter int32 | |
133 | |
134 // downloadAllBadAndGreyFuzzes downloads just the fuzzes from a commit in GCS. I
t uses multiple | 138 // downloadAllBadAndGreyFuzzes downloads just the fuzzes from a commit in GCS. I
t uses multiple |
135 // processes to do so and puts them in config.Aggregator.FuzzPath/[category]. | 139 // processes to do so and puts them in config.Aggregator.FuzzPath/[category]. |
136 func (p *FuzzPipeline) downloadAllBadAndGreyFuzzes(commitHash string, storageCli
ent *storage.Client) (badFuzzNames []string, greyFuzzNames []string, err error)
{ | 140 func (p *FuzzPipeline) downloadAllBadAndGreyFuzzes(commitHash string, storageCli
ent *storage.Client) (badFuzzPaths []string, greyFuzzPaths []string, err error)
{ |
137 downloadPath := filepath.Join(config.Aggregator.FuzzPath, p.Category) | 141 downloadPath := filepath.Join(config.Aggregator.FuzzPath, p.Category) |
138 | 142 |
139 » toDownload := make(chan string, 100000) | 143 » bad, err := common.DownloadAllFuzzes(storageClient, downloadPath, p.Cate
gory, commitHash, "bad", config.Generator.NumDownloadProcesses) |
140 » completedCounter = 0 | 144 » if err != nil { |
141 | 145 » » return nil, nil, err |
142 » var wg sync.WaitGroup | |
143 » for i := 0; i < config.Generator.NumDownloadProcesses; i++ { | |
144 » » wg.Add(1) | |
145 » » go download(storageClient, toDownload, downloadPath, &wg) | |
146 } | 146 } |
147 | 147 » grey, err := common.DownloadAllFuzzes(storageClient, downloadPath, p.Cat
egory, commitHash, "grey", config.Generator.NumDownloadProcesses) |
148 » badFilter := func(item *storage.ObjectAttrs) { | 148 » return bad, grey, err |
149 » » name := item.Name | |
150 » » if strings.Contains(name, ".") { | |
151 » » » return | |
152 » » } | |
153 » » fuzzHash := name[strings.LastIndex(name, "/")+1:] | |
154 » » badFuzzNames = append(badFuzzNames, filepath.Join(downloadPath,
fuzzHash)) | |
155 » » toDownload <- item.Name | |
156 » } | |
157 | |
158 » greyFilter := func(item *storage.ObjectAttrs) { | |
159 » » name := item.Name | |
160 » » if strings.Contains(name, ".") { | |
161 » » » return | |
162 » » } | |
163 » » fuzzHash := name[strings.LastIndex(name, "/")+1:] | |
164 » » greyFuzzNames = append(greyFuzzNames, filepath.Join(downloadPath
, fuzzHash)) | |
165 » » toDownload <- item.Name | |
166 » } | |
167 | |
168 » if err := gs.AllFilesInDir(storageClient, config.GS.Bucket, fmt.Sprintf(
"%s/%s/bad", p.Category, commitHash), badFilter); err != nil { | |
169 » » return nil, nil, fmt.Errorf("Problem getting bad fuzzes: %s", er
r) | |
170 » } | |
171 | |
172 » if err := gs.AllFilesInDir(storageClient, config.GS.Bucket, fmt.Sprintf(
"%s/%s/grey", p.Category, commitHash), greyFilter); err != nil { | |
173 » » return nil, nil, fmt.Errorf("Problem getting grey fuzzes: %s", e
rr) | |
174 » } | |
175 | |
176 » close(toDownload) | |
177 » wg.Wait() | |
178 » return badFuzzNames, greyFuzzNames, nil | |
179 } | |
180 | |
181 // download starts a go routine that waits for files to download from Google Sto
rage and downloads | |
182 // them to downloadPath. When it is done (on error or when the channel is close
d), it signals to | |
183 // the WaitGroup that it is done. It also logs the progress on downloading the f
uzzes. | |
184 func download(storageClient *storage.Client, toDownload <-chan string, downloadP
ath string, wg *sync.WaitGroup) { | |
185 » defer wg.Done() | |
186 » for file := range toDownload { | |
187 » » hash := file[strings.LastIndex(file, "/")+1:] | |
188 » » onDisk := filepath.Join(downloadPath, hash) | |
189 » » if !fileutil.FileExists(onDisk) { | |
190 » » » contents, err := gs.FileContentsFromGS(storageClient, co
nfig.GS.Bucket, file) | |
191 » » » if err != nil { | |
192 » » » » glog.Warningf("Problem downloading fuzz %s, cont
inuing anyway: %s", file, err) | |
193 » » » » continue | |
194 » » » } | |
195 » » » if err = ioutil.WriteFile(onDisk, contents, 0644); err !
= nil && !os.IsExist(err) { | |
196 » » » » glog.Warningf("Problem writing fuzz to %s, conti
nuing anyway: %s", onDisk, err) | |
197 » » » } | |
198 » » } | |
199 » » atomic.AddInt32(&completedCounter, 1) | |
200 » » if completedCounter%100 == 0 { | |
201 » » » glog.Infof("%d fuzzes downloaded", completedCounter) | |
202 » » } | |
203 » } | |
204 } | 149 } |
205 | 150 |
206 // replaceCurrentSkiaVersionWith puts the oldHash in skia_version/old and the ne
wHash in | 151 // replaceCurrentSkiaVersionWith puts the oldHash in skia_version/old and the ne
wHash in |
207 // skia_version/current. It also removes all pending versions. | 152 // skia_version/current. It also removes all pending versions. |
208 func (v *VersionUpdater) replaceCurrentSkiaVersionWith(oldHash, newHash string)
error { | 153 func (v *VersionUpdater) replaceCurrentSkiaVersionWith(oldHash, newHash string)
error { |
209 // delete all pending requests | 154 // delete all pending requests |
210 if err := gs.DeleteAllFilesInDir(v.storageClient, config.GS.Bucket, "ski
a_version/pending/", 1); err != nil { | 155 if err := gs.DeleteAllFilesInDir(v.storageClient, config.GS.Bucket, "ski
a_version/pending/", 1); err != nil { |
211 return err | 156 return err |
212 } | 157 } |
213 if err := gs.DeleteAllFilesInDir(v.storageClient, config.GS.Bucket, "ski
a_version/current/", 1); err != nil { | 158 if err := gs.DeleteAllFilesInDir(v.storageClient, config.GS.Bucket, "ski
a_version/current/", 1); err != nil { |
214 return err | 159 return err |
215 } | 160 } |
216 if err := v.touch(fmt.Sprintf("skia_version/current/%s", newHash)); err
!= nil { | 161 if err := v.touch(fmt.Sprintf("skia_version/current/%s", newHash)); err
!= nil { |
217 return err | 162 return err |
218 } | 163 } |
219 return v.touch(fmt.Sprintf("skia_version/old/%s", oldHash)) | 164 return v.touch(fmt.Sprintf("skia_version/old/%s", oldHash)) |
220 } | 165 } |
221 | 166 |
222 // touch creates an empty file in Google Storage of the given name. | 167 // touch creates an empty file in Google Storage of the given name. |
223 func (v *VersionUpdater) touch(file string) error { | 168 func (v *VersionUpdater) touch(file string) error { |
224 w := v.storageClient.Bucket(config.GS.Bucket).Object(file).NewWriter(con
text.Background()) | 169 w := v.storageClient.Bucket(config.GS.Bucket).Object(file).NewWriter(con
text.Background()) |
225 if err := w.Close(); err != nil { | 170 if err := w.Close(); err != nil { |
226 return fmt.Errorf("Could not touch version file %s : %s", file,
err) | 171 return fmt.Errorf("Could not touch version file %s : %s", file,
err) |
227 } | 172 } |
228 return nil | 173 return nil |
229 } | 174 } |
| 175 |
| 176 // uploadFuzzNames creates two files in the /category/revision/ folder that cont
ain all of the bad fuzz names and the grey fuzz names that are in this folder |
| 177 func (p *FuzzPipeline) uploadFuzzNames(sc *storage.Client, oldRevision string, b
ad, grey []string) { |
| 178 uploadString := func(fileName, contents string) error { |
| 179 name := fmt.Sprintf("%s/%s/%s", p.Category, oldRevision, fileNam
e) |
| 180 w := sc.Bucket(config.GS.Bucket).Object(name).NewWriter(context.
Background()) |
| 181 defer util.Close(w) |
| 182 w.ObjectAttrs.ContentEncoding = "text/plain" |
| 183 |
| 184 if n, err := w.Write([]byte(contents)); err != nil { |
| 185 return fmt.Errorf("There was a problem uploading %s. On
ly uploaded %d bytes: %s", name, n, err) |
| 186 } |
| 187 return nil |
| 188 } |
| 189 |
| 190 if err := uploadString("bad_fuzz_names.txt", strings.Join(bad, "|")); er
r != nil { |
| 191 glog.Errorf("Problem uploading bad fuzz names: %s", err) |
| 192 } |
| 193 if err := uploadString("grey_fuzz_names.txt", strings.Join(grey, "|"));
err != nil { |
| 194 glog.Errorf("Problem uploading grey fuzz names: %s", err) |
| 195 } |
| 196 } |
OLD | NEW |