OLD | NEW |
1 // trybot is for ingesting Gold trybot results. | 1 // trybot is for ingesting Gold trybot results. |
2 package trybot | 2 package trybot |
3 | 3 |
4 import ( | 4 import ( |
5 "database/sql" | 5 "database/sql" |
6 "encoding/json" | 6 "encoding/json" |
7 "fmt" | 7 "fmt" |
8 | 8 |
9 metrics "github.com/rcrowley/go-metrics" | 9 metrics "github.com/rcrowley/go-metrics" |
10 "github.com/skia-dev/glog" | 10 "github.com/skia-dev/glog" |
(...skipping 14 matching lines...) Expand all Loading... |
25 | 25 |
26 type TrybotResultStorage struct { | 26 type TrybotResultStorage struct { |
27 vdb *database.VersionedDB | 27 vdb *database.VersionedDB |
28 } | 28 } |
29 | 29 |
30 // IssueListItem is returned by List containing the issue ids and when they | 30 // IssueListItem is returned by List containing the issue ids and when they |
31 // were last updated in GS. | 31 // were last updated in GS. |
32 type IssueListItem struct { | 32 type IssueListItem struct { |
33 Issue string | 33 Issue string |
34 LastUpdated int64 | 34 LastUpdated int64 |
| 35 MaxPatchset int64 |
35 } | 36 } |
36 | 37 |
37 func NewTrybotResultStorage(vdb *database.VersionedDB) *TrybotResultStorage { | 38 func NewTrybotResultStorage(vdb *database.VersionedDB) *TrybotResultStorage { |
38 return &TrybotResultStorage{ | 39 return &TrybotResultStorage{ |
39 vdb: vdb, | 40 vdb: vdb, |
40 } | 41 } |
41 } | 42 } |
42 | 43 |
43 // Write writes trybot results to the SQL database connected to vdb that was the | 44 // Write writes trybot results to the SQL database connected to vdb that was the |
44 // the argument to Init(..). | 45 // the argument to Init(..). |
45 func (t *TrybotResultStorage) Write(issue string, trybotResults *TryBotResults)
error { | 46 func (t *TrybotResultStorage) Write(issue string, maxPatchset int64, trybotResul
ts *TryBotResults) error { |
46 trybotResults.indexDigests() | 47 trybotResults.indexDigests() |
47 | 48 |
48 b, err := json.Marshal(trybotResults) | 49 b, err := json.Marshal(trybotResults) |
49 if err != nil { | 50 if err != nil { |
50 return fmt.Errorf("Failed to encode to JSON: %s", err) | 51 return fmt.Errorf("Failed to encode to JSON: %s", err) |
51 } | 52 } |
52 | 53 |
53 // Find the latest timestamp in the data. | 54 // Find the latest timestamp in the data. |
54 var timeStamp int64 = 0 | 55 var timeStamp int64 = 0 |
55 for _, entry := range trybotResults.Bots { | 56 for _, entry := range trybotResults.Bots { |
56 if entry.TS > timeStamp { | 57 if entry.TS > timeStamp { |
57 timeStamp = entry.TS | 58 timeStamp = entry.TS |
58 } | 59 } |
59 } | 60 } |
60 | 61 |
61 » _, err = t.vdb.DB.Exec("REPLACE INTO tries (issue, results, last_updated
) VALUES (?, ?, ?)", issue, string(b), timeStamp) | 62 » _, err = t.vdb.DB.Exec("REPLACE INTO tries (issue, max_patchset, results
, last_updated) VALUES (?, ?, ?, ?)", issue, maxPatchset, string(b), timeStamp) |
62 if err != nil { | 63 if err != nil { |
63 return fmt.Errorf("Failed to write trybot data to database: %s",
err) | 64 return fmt.Errorf("Failed to write trybot data to database: %s",
err) |
64 } | 65 } |
65 return nil | 66 return nil |
66 } | 67 } |
67 | 68 |
68 // Get returns the trybot results for the given issue from the datastore. | 69 // Get returns the trybot results for the given issue from the datastore. |
69 func (t *TrybotResultStorage) Get(issue string) (*TryBotResults, error) { | 70 func (t *TrybotResultStorage) Get(issue string) (*TryBotResults, error) { |
70 var results string | 71 var results string |
71 err := t.vdb.DB.QueryRow("SELECT results FROM tries WHERE issue=?", issu
e).Scan(&results) | 72 err := t.vdb.DB.QueryRow("SELECT results FROM tries WHERE issue=?", issu
e).Scan(&results) |
(...skipping 17 matching lines...) Expand all Loading... |
89 func (t *TrybotResultStorage) List(offset, size int) ([]*IssueListItem, int, err
or) { | 90 func (t *TrybotResultStorage) List(offset, size int) ([]*IssueListItem, int, err
or) { |
90 var total int | 91 var total int |
91 if err := t.vdb.DB.QueryRow("SELECT count(*) FROM tries").Scan(&total);
err != nil { | 92 if err := t.vdb.DB.QueryRow("SELECT count(*) FROM tries").Scan(&total);
err != nil { |
92 return nil, 0, err | 93 return nil, 0, err |
93 } | 94 } |
94 | 95 |
95 if total == 0 { | 96 if total == 0 { |
96 return []*IssueListItem{}, 0, nil | 97 return []*IssueListItem{}, 0, nil |
97 } | 98 } |
98 | 99 |
99 » rows, err := t.vdb.DB.Query("SELECT issue,last_updated FROM tries ORDER
BY last_updated DESC LIMIT ?,?", offset, size) | 100 » rows, err := t.vdb.DB.Query("SELECT issue, max_patchset, last_updated FR
OM tries ORDER BY last_updated DESC LIMIT ?,?", offset, size) |
100 if err != nil { | 101 if err != nil { |
101 return nil, 0, fmt.Errorf("Failed to read try data from database
: %s", err) | 102 return nil, 0, fmt.Errorf("Failed to read try data from database
: %s", err) |
102 } | 103 } |
103 defer util.Close(rows) | 104 defer util.Close(rows) |
104 | 105 |
105 ret := make([]*IssueListItem, 0, size) | 106 ret := make([]*IssueListItem, 0, size) |
106 for rows.Next() { | 107 for rows.Next() { |
107 listItem := &IssueListItem{} | 108 listItem := &IssueListItem{} |
108 » » if err := rows.Scan(&listItem.Issue, &listItem.LastUpdated); err
!= nil { | 109 » » if err := rows.Scan(&listItem.Issue, &listItem.MaxPatchset, &lis
tItem.LastUpdated); err != nil { |
109 return nil, 0, fmt.Errorf("List: Failed to read issue fr
om row: %s", err) | 110 return nil, 0, fmt.Errorf("List: Failed to read issue fr
om row: %s", err) |
110 } | 111 } |
111 ret = append(ret, listItem) | 112 ret = append(ret, listItem) |
112 } | 113 } |
113 return ret, total, nil | 114 return ret, total, nil |
114 } | 115 } |
115 | 116 |
116 // TrybotResultIngester implements the ingester.ResultIngester interface. | 117 // TrybotResultIngester implements the ingester.ResultIngester interface. |
117 type TrybotResultIngester struct { | 118 type TrybotResultIngester struct { |
118 tbrStorage *TrybotResultStorage | 119 tbrStorage *TrybotResultStorage |
(...skipping 16 matching lines...) Expand all Loading... |
135 dmResults, err := goldingester.ParseDMResultsFromReader(r) | 136 dmResults, err := goldingester.ParseDMResultsFromReader(r) |
136 if err != nil { | 137 if err != nil { |
137 return err | 138 return err |
138 } | 139 } |
139 | 140 |
140 if _, ok := t.resultsByIssue[dmResults.Issue]; !ok { | 141 if _, ok := t.resultsByIssue[dmResults.Issue]; !ok { |
141 t.resultsByIssue[dmResults.Issue] = NewTryBotResults() | 142 t.resultsByIssue[dmResults.Issue] = NewTryBotResults() |
142 } | 143 } |
143 | 144 |
144 // Add the entire file to our current knowledge about this issue. | 145 // Add the entire file to our current knowledge about this issue. |
145 » t.resultsByIssue[dmResults.Issue].update(dmResults.Key, dmResults.Result
s, fileInfo.LastUpdated) | 146 » t.resultsByIssue[dmResults.Issue].update(dmResults.Key, dmResults.Patchs
et, dmResults.Results, fileInfo.LastUpdated) |
146 counter.Inc(1) | 147 counter.Inc(1) |
147 glog.Infof("Finished processing file %s.", fileInfo.Name) | 148 glog.Infof("Finished processing file %s.", fileInfo.Name) |
148 return nil | 149 return nil |
149 } | 150 } |
150 | 151 |
151 // See the ingester.ResultIngester interface. | 152 // See the ingester.ResultIngester interface. |
152 func (t *TrybotResultIngester) BatchFinished(_ metrics.Counter) error { | 153 func (t *TrybotResultIngester) BatchFinished(_ metrics.Counter) error { |
153 // Reset this instance regardless of the outcome of this call. | 154 // Reset this instance regardless of the outcome of this call. |
154 defer func() { | 155 defer func() { |
155 t.resultsByIssue = map[string]*TryBotResults{} | 156 t.resultsByIssue = map[string]*TryBotResults{} |
156 }() | 157 }() |
157 | 158 |
158 for issue, tries := range t.resultsByIssue { | 159 for issue, tries := range t.resultsByIssue { |
159 // Get the current results. | 160 // Get the current results. |
160 pastTries, err := t.tbrStorage.Get(issue) | 161 pastTries, err := t.tbrStorage.Get(issue) |
161 if err != nil { | 162 if err != nil { |
162 return err | 163 return err |
163 } | 164 } |
164 | 165 |
165 » » needsUpdating := pastTries.updateIfNewer(tries) | 166 » » needsUpdating, maxPatchset := pastTries.updateIfNewer(tries) |
166 if needsUpdating { | 167 if needsUpdating { |
167 » » » if err := t.tbrStorage.Write(issue, pastTries); err != n
il { | 168 » » » if err := t.tbrStorage.Write(issue, maxPatchset, pastTri
es); err != nil { |
168 return err | 169 return err |
169 } | 170 } |
170 } | 171 } |
171 } | 172 } |
172 | 173 |
173 glog.Info("Finished processing ingestion batch.") | 174 glog.Info("Finished processing ingestion batch.") |
174 return nil | 175 return nil |
175 } | 176 } |
176 | 177 |
177 // TryBotResults stores the results of one entire issue. | 178 // TryBotResults stores the results of one entire issue. |
178 type TryBotResults struct { | 179 type TryBotResults struct { |
179 // Constains a list of all digests contained in the issue. | 180 // Constains a list of all digests contained in the issue. |
180 Digests []string | 181 Digests []string |
181 | 182 |
182 // Results for specific bots. | 183 // Results for specific bots. |
183 Bots map[string]*BotResults | 184 Bots map[string]*BotResults |
184 } | 185 } |
185 | 186 |
186 // BotResults contains the results of one bot run. | 187 // BotResults contains the results of one bot run. |
187 type BotResults struct { | 188 type BotResults struct { |
188 BotParams map[string]string | 189 BotParams map[string]string |
| 190 Patchset int64 |
189 TestResults []*TestResult | 191 TestResults []*TestResult |
190 TS int64 | 192 TS int64 |
191 } | 193 } |
192 | 194 |
193 // TestResult stores a digest and the params that are specific to one test. | 195 // TestResult stores a digest and the params that are specific to one test. |
194 type TestResult struct { | 196 type TestResult struct { |
195 Params map[string]string | 197 Params map[string]string |
196 DigestIdx int | 198 DigestIdx int |
197 digest string | 199 digest string |
198 } | 200 } |
199 | 201 |
200 // TryBotResults maps trace ids to trybot results. | 202 // TryBotResults maps trace ids to trybot results. |
201 // type TryBotResults map[string]*TBResult | 203 // type TryBotResults map[string]*TBResult |
202 func NewTryBotResults() *TryBotResults { | 204 func NewTryBotResults() *TryBotResults { |
203 return &TryBotResults{ | 205 return &TryBotResults{ |
204 Bots: map[string]*BotResults{}, | 206 Bots: map[string]*BotResults{}, |
205 } | 207 } |
206 } | 208 } |
207 | 209 |
208 // update incorporates the given restuls into the current results for this | 210 // update incorporates the given restuls into the current results for this |
209 // issue. | 211 // issue. |
210 func (t *TryBotResults) update(botParams map[string]string, testResults []*goldi
ngester.Result, timeStamp int64) { | 212 func (t *TryBotResults) update(botParams map[string]string, patchset int64, test
Results []*goldingester.Result, timeStamp int64) { |
211 botId, err := util.MD5Params(botParams) | 213 botId, err := util.MD5Params(botParams) |
212 if err != nil { | 214 if err != nil { |
213 glog.Errorf("Unable to hash bot parameters \n\n%v\n\n. Error: %s
", botParams, err) | 215 glog.Errorf("Unable to hash bot parameters \n\n%v\n\n. Error: %s
", botParams, err) |
214 return | 216 return |
215 } | 217 } |
216 | 218 |
217 current, ok := t.Bots[botId] | 219 current, ok := t.Bots[botId] |
218 if !ok || (current.TS < timeStamp) { | 220 if !ok || (current.TS < timeStamp) { |
219 // Replace the current entry for this bot. | 221 // Replace the current entry for this bot. |
220 current = &BotResults{ | 222 current = &BotResults{ |
221 BotParams: botParams, | 223 BotParams: botParams, |
| 224 Patchset: patchset, |
222 } | 225 } |
223 | 226 |
224 botTestResults := []*TestResult{} | 227 botTestResults := []*TestResult{} |
225 for _, result := range testResults { | 228 for _, result := range testResults { |
226 params := util.AddParams(result.Key, result.Options) | 229 params := util.AddParams(result.Key, result.Options) |
227 if !goldingester.IgnoreResult(params) { | 230 if !goldingester.IgnoreResult(params) { |
228 botTestResults = append(botTestResults, &TestRes
ult{ | 231 botTestResults = append(botTestResults, &TestRes
ult{ |
229 Params: params, | 232 Params: params, |
230 digest: result.Digest, | 233 digest: result.Digest, |
231 }) | 234 }) |
232 } | 235 } |
233 } | 236 } |
234 | 237 |
235 current.TestResults = botTestResults | 238 current.TestResults = botTestResults |
236 current.TS = timeStamp | 239 current.TS = timeStamp |
237 t.Bots[botId] = current | 240 t.Bots[botId] = current |
238 } | 241 } |
239 } | 242 } |
240 | 243 |
241 // updateIfNewer incorporates the results of trybot runs into this results | 244 // updateIfNewer incorporates the results of trybot runs into this results |
242 // if they are newer. | 245 // if they are newer. |
243 func (t *TryBotResults) updateIfNewer(tries *TryBotResults) bool { | 246 func (t *TryBotResults) updateIfNewer(tries *TryBotResults) (bool, int64) { |
244 updated := false | 247 updated := false |
| 248 var latestPatchset int64 = 0 |
245 for key, entry := range tries.Bots { | 249 for key, entry := range tries.Bots { |
246 found, ok := t.Bots[key] | 250 found, ok := t.Bots[key] |
247 if !ok || (found.TS < entry.TS) { | 251 if !ok || (found.TS < entry.TS) { |
| 252 latestPatchset = util.MaxInt64(latestPatchset, entry.Pat
chset) |
248 t.Bots[key] = entry | 253 t.Bots[key] = entry |
249 updated = true | 254 updated = true |
250 } | 255 } |
251 } | 256 } |
252 » return updated | 257 » return updated, latestPatchset |
253 } | 258 } |
254 | 259 |
255 func (t *TryBotResults) indexDigests() { | 260 func (t *TryBotResults) indexDigests() { |
256 digestIdx := map[string]int{} | 261 digestIdx := map[string]int{} |
257 digestList := []string{} | 262 digestList := []string{} |
258 for _, bot := range t.Bots { | 263 for _, bot := range t.Bots { |
259 for _, result := range bot.TestResults { | 264 for _, result := range bot.TestResults { |
260 if _, ok := digestIdx[result.digest]; !ok { | 265 if _, ok := digestIdx[result.digest]; !ok { |
261 digestIdx[result.digest] = len(digestList) | 266 digestIdx[result.digest] = len(digestList) |
262 digestList = append(digestList, result.digest) | 267 digestList = append(digestList, result.digest) |
263 } | 268 } |
264 result.DigestIdx = digestIdx[result.digest] | 269 result.DigestIdx = digestIdx[result.digest] |
265 } | 270 } |
266 } | 271 } |
267 t.Digests = digestList | 272 t.Digests = digestList |
268 } | 273 } |
269 | 274 |
270 func (t *TryBotResults) expandDigests() { | 275 func (t *TryBotResults) expandDigests() { |
271 indexDigestMap := make(map[int]string, len(t.Digests)) | 276 indexDigestMap := make(map[int]string, len(t.Digests)) |
272 for idx, digest := range t.Digests { | 277 for idx, digest := range t.Digests { |
273 indexDigestMap[idx] = digest | 278 indexDigestMap[idx] = digest |
274 } | 279 } |
275 for _, bot := range t.Bots { | 280 for _, bot := range t.Bots { |
276 for _, result := range bot.TestResults { | 281 for _, result := range bot.TestResults { |
277 result.digest = indexDigestMap[result.DigestIdx] | 282 result.digest = indexDigestMap[result.DigestIdx] |
278 } | 283 } |
279 } | 284 } |
280 } | 285 } |
OLD | NEW |