OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be found | |
3 // in the LICENSE file. | |
4 | |
5 package main | |
6 | |
7 import ( | |
8 "encoding/json" | |
9 "fmt" | |
10 "io" | |
11 "net/http" | |
12 "os/exec" | |
13 "reflect" | |
14 "strconv" | |
15 "strings" | |
16 "time" | |
17 ) | |
18 | |
19 import ( | |
20 "code.google.com/p/goauth2/oauth" | |
21 "code.google.com/p/google-api-go-client/bigquery/v2" | |
22 "github.com/oxtoacart/webbrowser" | |
23 ) | |
24 | |
25 const ( | |
26 MISSING_DATA_SENTINEL = 1e100 | |
mtklein
2014/06/15 15:12:10
Cute. Go has +Inf, -Inf, and NaN too I think.
kelvinly
2014/06/15 15:49:26
Tested those, Go doesn't seem to marshal them corr
jcgregorio
2014/06/15 16:25:39
Go may have +-Ing and Nan, but JSON doesn't, thus
jcgregorio
2014/06/16 02:46:25
Added a note explaining the requirements for the s
| |
27 ) | |
28 | |
29 // Shouldn't need auth when running from GCE, but will need it for local dev. | |
30 var config = &oauth.Config{ | |
31 ClientId: "470362608618-nlbqngfl87f4b3mhqqe9ojgaoe11vrld.apps.google usercontent.com", | |
32 ClientSecret: "J4YCkfMXFJISGyuBuVEiH60T", | |
mtklein
2014/06/15 15:12:12
Um... secret?
kelvinly
2014/06/15 15:49:26
Time to get new keys!
jcgregorio
2014/06/16 02:46:24
In a future CL I will move this to reading the cli
| |
33 Scope: bigquery.BigqueryScope, | |
34 AuthURL: "https://accounts.google.com/o/oauth2/auth", | |
35 TokenURL: "https://accounts.google.com/o/oauth2/token", | |
36 RedirectURL: "urn:ietf:wg:oauth:2.0:oob", | |
37 TokenCache: oauth.CacheFile("bqtoken.data"), | |
38 } | |
39 | |
40 // runFlow runs through a 3LO OAuth 2.0 flow to get credentials for BigQuery. | |
41 func runFlow(config *oauth.Config) (*http.Client, error) { | |
42 transport := &oauth.Transport{Config: config} | |
43 if _, err := config.TokenCache.Token(); err != nil { | |
44 url := config.AuthCodeURL("") | |
45 fmt.Printf(`Your browser has been opened to visit: | |
46 | |
47 %s | |
48 | |
49 Enter the verification code:`, url) | |
50 webbrowser.Open(url) | |
51 var code string | |
52 fmt.Scan(&code) | |
53 if _, err := transport.Exchange(code); err != nil { | |
54 return nil, err | |
55 } | |
56 } | |
57 | |
58 return transport.Client(), nil | |
59 } | |
60 | |
61 // Trace represents all the values of a single measurement over time. | |
62 type Trace struct { | |
63 Key string `json:"key"` | |
64 Values []float64 `json:"values"` | |
65 Params map[string]string `json:"params"` | |
66 Trybot bool `json:"trybot"` | |
67 } | |
68 | |
69 // NewTrace allocates a new Trace set up for the given number of samples. | |
70 // | |
71 // The Trace Values are pre-filled in with the missing data sentinel since not | |
72 // all tests will be run on all commits. | |
73 func NewTrace(numSamples int) *Trace { | |
74 t := &Trace{ | |
75 Values: make([]float64, numSamples, numSamples), | |
76 Params: make(map[string]string), | |
77 Trybot: false, | |
78 } | |
79 for i, _ := range t.Values { | |
80 t.Values[i] = MISSING_DATA_SENTINEL | |
81 } | |
82 return t | |
83 } | |
84 | |
85 // Annotations for commits. | |
86 type Annotation struct { | |
87 ID int `json:"id"` | |
mtklein
2014/06/15 15:12:10
Might help to explain ID and Type here a bit.
jcgregorio
2014/06/16 02:46:24
Added note about this mapping to the MySQL databas
| |
88 Notes string `json:"notes"` | |
89 Author string `json:"author"` | |
mtklein
2014/06/15 15:12:10
This is author-of-annotation right? Not author-of
jcgregorio
2014/06/16 02:46:25
Same as above.
On 2014/06/15 15:12:10, mtklein wr
| |
90 Type int `json:"type"` | |
91 } | |
92 | |
93 // Commit is information about each Git commit. | |
94 type Commit struct { | |
95 CommitTime time.Time `json:"commit_time"` | |
96 Hash string `json:"hash"` | |
97 GitNumber int `json:"git_number"` | |
98 CommitMessage string `json:"commit_msg"` | |
99 Annotations []Annotation `json:"annotations,omitempty"` | |
100 } | |
101 | |
102 type Choices []string | |
mtklein
2014/06/15 15:12:10
Explain this a bit?
jcgregorio
2014/06/16 02:46:25
Done.
| |
103 | |
104 // AllData is the top level struct we return via JSON to the UI. | |
105 // | |
106 // The length of the Commits array is the same length as all of the Values | |
107 // arrays in all of the Traces. | |
108 type AllData struct { | |
109 Traces []Trace `json:"traces"` | |
110 ParamSet map[string]Choices `json:"param_set"` | |
111 Commits []Commit `json:"commits"` | |
112 } | |
113 | |
114 // gitCommitsWithTestData returns the list of commits that have perf data | |
115 // associated with them. | |
116 // | |
117 // Not all commits will have perf data, the builders don't necessarily run for | |
118 // each commit. | |
119 func gitCommitsWithTestData(service *bigquery.Service) (map[string]bool, error) { | |
120 query := ` | |
121 SELECT | |
122 gitHash | |
123 FROM | |
124 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, | |
125 DATE_ADD(CURRENT_TIMESTAMP(), | |
126 -2, | |
127 'DAY'), | |
128 CURRENT_TIMESTAMP())) | |
129 GROUP BY | |
130 gitHash; | |
131 ` | |
132 iter, err := NewRowIter(service, query) | |
133 if err != nil { | |
134 return nil, fmt.Errorf("Failed to query for the Git hashes used: %s", err) | |
135 } | |
136 | |
137 hashes := make(map[string]bool) | |
138 for iter.Next() { | |
139 h := &struct { | |
140 Hash string `bq:"gitHash"` | |
mtklein
2014/06/15 15:12:11
Neat.
| |
141 }{} | |
142 err := iter.Decode(h) | |
143 if err != nil { | |
144 return nil, fmt.Errorf("Failed reading hashes from BigQu ery: %s", err) | |
145 } | |
146 hashes[h.Hash] = true | |
147 } | |
148 return hashes, nil | |
149 } | |
150 | |
151 // GitHash represents information on a single Git commit. | |
mtklein
2014/06/15 15:12:11
Seems like a subset of Commit above? TimeStamp ==
jcgregorio
2014/06/16 02:46:24
Yeah, but this code may be changing soon as we may
| |
152 type GitHash struct { | |
153 Hash string | |
154 TimeStamp time.Time | |
155 } | |
156 | |
157 // readCommitsFromGit reads the commit history from a Git repository. | |
158 func readCommitsFromGit(dir string) ([]GitHash, error) { | |
159 cmd := exec.Command("git", strings.Split("log --format=%H%x20%ci", " "). ..) | |
160 cmd.Dir = dir | |
161 b, err := cmd.Output() | |
162 if err != nil { | |
163 return nil, fmt.Errorf("Failed to run Git: %s", err) | |
164 } | |
165 lines := strings.Split(string(b), "\n") | |
166 hashes := make([]GitHash, 0, len(lines)) | |
167 for _, line := range lines { | |
168 parts := strings.SplitN(line, " ", 2) | |
169 if len(parts) == 2 { | |
170 t, err := time.Parse("2006-01-02 15:04:05 -0700", parts[ 1]) | |
171 if err != nil { | |
172 return nil, fmt.Errorf("Failed parsing Git log t imestamp: %s", err) | |
173 } | |
174 hashes = append(hashes, GitHash{Hash: parts[0], TimeStam p: t}) | |
175 } | |
176 } | |
177 return hashes, nil | |
178 } | |
179 | |
180 // RowIter is a utility for reading data from a BigQuery query response. | |
181 // | |
182 // RowIter will iterate over all the results, even if they span more than one | |
183 // page of results. It automatically uses page tokens to iterate over all the | |
184 // pages to retrieve all results. | |
185 type RowIter struct { | |
186 response *bigquery.GetQueryResultsResponse | |
187 jobId string | |
188 service *bigquery.Service | |
189 nextPageToken string | |
190 row int | |
191 } | |
192 | |
193 // poll until the job is complete. | |
194 func (r *RowIter) poll() error { | |
mtklein
2014/06/15 15:12:10
Odd that bigquery doesn't provide this sort of thi
kelvinly
2014/06/15 15:49:26
I think the Go library was autogenerated from some
jcgregorio
2014/06/16 02:46:25
Yeah, that is true, but I was able to provide pagi
| |
195 var queryResponse *bigquery.GetQueryResultsResponse | |
196 for { | |
197 var err error | |
198 queryCall := r.service.Jobs.GetQueryResults("google.com:chrome-s kia", r.jobId) | |
199 if r.nextPageToken != "" { | |
200 queryCall.PageToken(r.nextPageToken) | |
201 } | |
202 queryResponse, err = queryCall.Do() | |
203 if err != nil { | |
204 return err | |
205 } | |
206 if queryResponse.JobComplete { | |
207 break | |
208 } | |
209 time.Sleep(time.Second) | |
210 } | |
211 r.nextPageToken = queryResponse.PageToken | |
212 r.response = queryResponse | |
213 return nil | |
214 } | |
215 | |
216 // NewRowIter starts a query and returns a RowIter for iterating through the | |
217 // results. | |
218 func NewRowIter(service *bigquery.Service, query string) (*RowIter, error) { | |
219 job := &bigquery.Job{ | |
220 Configuration: &bigquery.JobConfiguration{ | |
mtklein
2014/06/15 15:12:11
Can't you elide the nested type names in this sort
jcgregorio
2014/06/16 02:46:25
AFAIK only if I depend on the order and supply all
| |
221 Query: &bigquery.JobConfigurationQuery{ | |
222 Query: query, | |
223 }, | |
224 }, | |
225 } | |
226 jobResponse, err := service.Jobs.Insert("google.com:chrome-skia", job).D o() | |
227 if err != nil { | |
228 return nil, err | |
229 } | |
230 | |
231 r := &RowIter{ | |
232 jobId: jobResponse.JobReference.JobId, | |
233 service: service, | |
234 row: -1, // Start at -1 so the first call to Next() puts us at the 0th Row. | |
235 } | |
236 return r, r.poll() | |
237 } | |
238 | |
239 // Next moves to the next row in the response and returns true as long as data | |
240 // is availble, returning false when the end of the results are reached. | |
241 // | |
242 // Calling Next() the first time actually points the iterator at the first row, | |
243 // which makes it possible to use Next if a for loop: | |
244 // | |
245 // for iter.Next() { ... } | |
246 // | |
247 func (r *RowIter) Next() bool { | |
248 r.row++ | |
249 if r.row >= len(r.response.Rows) { | |
250 if r.nextPageToken != "" { | |
251 r.poll() | |
252 r.row = 0 | |
253 return len(r.response.Rows) > 0 | |
254 } else { | |
255 return false | |
256 } | |
257 } | |
258 return true | |
259 } | |
260 | |
261 // DecodeParams pulls all the values in the params record out as a map[string]st ring. | |
262 // | |
263 // The schema for each table has a nested record called 'params' that contains | |
264 // various axes along which queries could be built, such as the gpu the test was | |
265 // run against. Pull out the entire record as a generic map[string]string. | |
266 func (r *RowIter) DecodeParams() map[string]string { | |
267 row := r.response.Rows[r.row] | |
268 schema := r.response.Schema | |
269 params := map[string]string{} | |
270 for i, cell := range row.F { | |
271 if cell.V != nil { | |
272 name := schema.Fields[i].Name | |
273 if strings.HasPrefix(name, "params_") { | |
274 params[strings.TrimPrefix(name, "params_")] = ce ll.V.(string) | |
275 } | |
276 } | |
277 } | |
278 return params | |
279 } | |
280 | |
281 // Decode uses struct tags to decode a single row into a struct. | |
282 // | |
283 // For example, given a struct: | |
284 // | |
285 // type A struct { | |
286 // Name string `bq:"name"` | |
287 // Value float64 `bq:"measurement"` | |
288 // } | |
289 // | |
290 // And a BigQuery table that contained two columns named "name" and | |
291 // "measurement". Then calling Decode as follows would parse the column values | |
292 // for "name" and "measurement" and place them in the Name and Value fields | |
293 // respectively. | |
294 // | |
295 // a = &A{} | |
296 // iter.Decode(a) | |
297 // | |
298 // Implementation Details: | |
299 // | |
300 // If a tag names a column that doesn't exist, the field is merely ignored, | |
301 // i.e. it is left unchanged from when it was passed into Decode. | |
302 // | |
303 // Not all columns need to be tagged in the struct. | |
304 // | |
305 // The decoder doesn't handle nested structs, only the top level fields are de coded. | |
306 // | |
307 // The decoder only handles struct fields of type string, int, int32, int64, | |
308 // float, float32 and float64. | |
309 func (r *RowIter) Decode(s interface{}) error { | |
310 row := r.response.Rows[r.row] | |
311 schema := r.response.Schema | |
312 // Collapse the data in the row into a map[string]string. | |
313 rowMap := map[string]string{} | |
314 for i, cell := range row.F { | |
315 if cell.V != nil { | |
316 rowMap[schema.Fields[i].Name] = cell.V.(string) | |
317 } | |
318 } | |
319 | |
320 // Then iter over the fields of 's' and set them from the row data. | |
321 sv := reflect.ValueOf(s).Elem() | |
322 st := sv.Type() | |
323 for i := 0; i < sv.NumField(); i++ { | |
324 columnName := st.Field(i).Tag.Get("bq") | |
325 if columnValue, ok := rowMap[columnName]; ok { | |
326 switch sv.Field(i).Kind() { | |
327 case reflect.String: | |
328 sv.Field(i).SetString(columnValue) | |
329 case reflect.Float32, reflect.Float64: | |
330 f, err := strconv.ParseFloat(columnValue, 64) | |
331 if err != nil { | |
332 return err | |
333 } | |
334 sv.Field(i).SetFloat(f) | |
335 case reflect.Int32, reflect.Int64: | |
336 parsedInt, err := strconv.ParseInt(columnValue, 10, 64) | |
337 if err != nil { | |
338 return err | |
339 } | |
340 sv.Field(i).SetInt(parsedInt) | |
341 default: | |
342 return fmt.Errorf("can't decode into field of ty pe: %s", sv.Field(i).Kind()) | |
343 } | |
344 } | |
345 } | |
346 return nil | |
347 } | |
348 | |
349 // populateTraces reads the measurement data from BigQuery and populates the Tra ces. | |
350 func populateTraces(service *bigquery.Service, all *AllData, hashToIndex map[str ing]int, numSamples int) error { | |
351 type Measurement struct { | |
352 Value float64 `bq:"value"` | |
353 Key string `bq:"key"` | |
354 Hash string `bq:"gitHash"` | |
355 } | |
356 | |
357 // Now query the actual samples. | |
358 query := ` | |
359 SELECT | |
360 * | |
361 FROM | |
362 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, | |
363 DATE_ADD(CURRENT_TIMESTAMP(), | |
364 -2, | |
365 'DAY'), | |
366 CURRENT_TIMESTAMP())) | |
367 WHERE | |
368 params.benchName="tabl_worldjournal.skp" | |
369 OR | |
370 params.benchName="desk_amazon.skp" | |
371 ORDER BY | |
372 key DESC, | |
373 timestamp DESC; | |
374 ` | |
375 iter, err := NewRowIter(service, query) | |
376 if err != nil { | |
377 return fmt.Errorf("Failed to query data from BigQuery: %s", err) | |
378 } | |
379 var trace *Trace = nil | |
380 currentKey := "" | |
381 for iter.Next() { | |
382 m := &Measurement{} | |
383 if err := iter.Decode(m); err != nil { | |
384 return fmt.Errorf("Failed to decode Measurement from Big Query: %s", err) | |
385 } | |
386 if m.Key != currentKey { | |
387 if trace != nil { | |
388 all.Traces = append(all.Traces, *trace) | |
389 } | |
390 currentKey = m.Key | |
391 trace = NewTrace(numSamples) | |
392 trace.Params = iter.DecodeParams() | |
393 trace.Key = m.Key | |
394 } | |
395 if index, ok := hashToIndex[m.Hash]; ok { | |
396 trace.Values[index] = m.Value | |
397 } | |
398 } | |
399 all.Traces = append(all.Traces, *trace) | |
400 | |
401 return nil | |
402 } | |
403 | |
404 // Data is the full set of traces for the last N days all parsed into structs. | |
405 type Data struct { | |
406 all *AllData | |
407 } | |
408 | |
409 // AsJSON serializes the data as JSON. | |
410 func (d *Data) AsJSON(w io.Writer) error { | |
411 // TODO(jcgregorio) Keep a cache of the gzipped JSON around and serve th at as long as it's fresh. | |
mtklein
2014/06/15 15:12:11
How slow is the JSON encoding?
jcgregorio
2014/06/16 02:46:24
Not so much the JSON encoding as the gzip. The ung
jcgregorio
2014/06/16 12:59:55
Sorry wrong numbers, that should be 50MB and 5MB r
| |
412 return json.NewEncoder(w).Encode(d.all) | |
413 } | |
414 | |
415 // populateParamSet returns the set of all possible values for all the 'params' | |
416 // in AllData. | |
417 func populateParamSet(all *AllData) { | |
418 // First pull the data out into a map of sets. | |
419 type ChoiceSet map[string]bool | |
420 c := make(map[string]ChoiceSet) | |
421 for _, t := range all.Traces { | |
422 for k, v := range t.Params { | |
423 if set, ok := c[k]; !ok { | |
424 c[k] = make(map[string]bool) | |
425 c[k][v] = true | |
426 } else { | |
427 set[v] = true | |
428 } | |
429 } | |
430 } | |
431 // Now flatten the sets into []string and populate all.ParamsSet with th at. | |
432 for k, v := range c { | |
433 allOptions := []string{} | |
434 for option, _ := range v { | |
435 allOptions = append(allOptions, option) | |
436 } | |
437 all.ParamSet[k] = allOptions | |
438 } | |
439 } | |
440 | |
441 // NewData loads the data the first time and then starts a go routine to | |
442 // preiodically refresh the data. | |
443 // | |
444 // TODO(jcgregorio) Actuall do the bit where we start a go routine. | |
mtklein
2014/06/15 15:12:11
So Data is going to keep itself updated asynchrono
jcgregorio
2014/06/16 02:46:25
Once I add the go routine for updating I will also
| |
445 func NewData(doOauth bool, gitRepoDir string) (*Data, error) { | |
446 var err error | |
447 var client *http.Client | |
448 if doOauth { | |
449 client, err = runFlow(config) | |
450 if err != nil { | |
451 return nil, fmt.Errorf("Failed to auth: %s", err) | |
452 } | |
453 } else { | |
454 client = http.DefaultClient | |
455 } | |
456 service, err := bigquery.New(client) | |
457 if err != nil { | |
458 return nil, fmt.Errorf("Failed to create a new BigQuery service object: %s", err) | |
459 } | |
460 | |
461 // First query and get the list of hashes we are interested in and use t hat | |
462 // and the git log results to fill in the Commits. | |
463 allGitHashes, err := readCommitsFromGit(gitRepoDir) | |
464 if err != nil { | |
465 return nil, fmt.Errorf("Failed to read hashes from Git log: %s", err) | |
466 } | |
467 | |
468 hashesTested, err := gitCommitsWithTestData(service) | |
469 if err != nil { | |
470 return nil, fmt.Errorf("Failed to read hashes from BigQuery: %s" , err) | |
471 } | |
472 | |
473 // Order the git hashes by commit log order. | |
474 commits := make([]Commit, 0, len(hashesTested)) | |
475 for i := len(allGitHashes) - 1; i >= 0; i-- { | |
476 h := allGitHashes[i] | |
477 if _, ok := hashesTested[h.Hash]; ok { | |
478 commits = append(commits, Commit{Hash: h.Hash, CommitTim e: h.TimeStamp}) | |
479 } | |
480 } | |
481 | |
482 // The number of samples that appear in each trace. | |
483 numSamples := len(commits) | |
484 | |
485 // A mapping of Git hashes to where they appear in the Commits array, al so the index | |
486 // at which a measurement gets stored in the Values array. | |
487 hashToIndex := make(map[string]int) | |
488 for i, commit := range commits { | |
489 hashToIndex[commit.Hash] = i | |
490 } | |
491 | |
492 all := &AllData{ | |
493 Traces: make([]Trace, 0, 0), | |
494 ParamSet: make(map[string]Choices), | |
495 Commits: commits, | |
496 } | |
497 | |
498 if err := populateTraces(service, all, hashToIndex, numSamples); err != nil { | |
499 panic(err) | |
mtklein
2014/06/15 15:12:10
Seems like we'd want something a bit more robust?
jcgregorio
2014/06/16 02:46:24
This is only done once and on startup and I don't
| |
500 } | |
501 | |
502 populateParamSet(all) | |
503 | |
504 return &Data{all: all}, nil | |
505 } | |
OLD | NEW |