OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2014 The Chromium Authors. All rights reserved. | |
tfarina
2014/06/13 20:12:57
the new copyright boilerplate does not have (c), s
tfarina
2014/06/13 20:12:57
I know this is not the place to discuss this, but
jcgregorio
2014/06/13 20:22:25
This was cut and paste from another file in this p
tfarina
2014/06/13 20:55:46
that seems fine by me.
| |
2 // Use of this source code is governed by a BSD-style license that can be found | |
3 // in the LICENSE file. | |
4 | |
5 package main | |
6 | |
7 import ( | |
8 "encoding/json" | |
9 "fmt" | |
10 "io" | |
11 "net/http" | |
12 "os/exec" | |
13 "reflect" | |
14 "strconv" | |
15 "strings" | |
16 "time" | |
17 ) | |
18 | |
19 import ( | |
20 "code.google.com/p/goauth2/oauth" | |
21 "code.google.com/p/google-api-go-client/bigquery/v2" | |
22 "github.com/oxtoacart/webbrowser" | |
23 ) | |
24 | |
25 const ( | |
26 MISSING_DATA_SENTINEL = 1e100 | |
27 ) | |
28 | |
29 // Shouldn't need auth when running from GCE, but will need it for local dev. | |
30 var config = &oauth.Config{ | |
31 ClientId: "470362608618-nlbqngfl87f4b3mhqqe9ojgaoe11vrld.apps.google usercontent.com", | |
32 ClientSecret: "J4YCkfMXFJISGyuBuVEiH60T", | |
33 Scope: bigquery.BigqueryScope, | |
34 AuthURL: "https://accounts.google.com/o/oauth2/auth", | |
35 TokenURL: "https://accounts.google.com/o/oauth2/token", | |
36 RedirectURL: "urn:ietf:wg:oauth:2.0:oob", | |
37 TokenCache: oauth.CacheFile("bqtoken.data"), | |
38 } | |
39 | |
40 // runFlow runs through a 3LO OAuth 2.0 flow to get credentials for BigQuery. | |
41 func runFlow(config *oauth.Config) (*http.Client, error) { | |
42 transport := &oauth.Transport{Config: config} | |
43 if _, err := config.TokenCache.Token(); err != nil { | |
44 url := config.AuthCodeURL("") | |
45 fmt.Printf(`Your browser has been opened to visit: | |
46 | |
47 %s | |
48 | |
49 Enter the verification code:`, url) | |
50 webbrowser.Open(url) | |
51 var code string | |
52 fmt.Scan(&code) | |
53 if _, err := transport.Exchange(code); err != nil { | |
54 return nil, err | |
55 } | |
56 } | |
57 | |
58 return transport.Client(), nil | |
59 } | |
60 | |
61 // Trace represents all the values of a single measurement over time. | |
62 type Trace struct { | |
63 Key string `json:"key"` | |
64 Values []float64 `json:"values"` | |
65 Params map[string]string `json:"params"` | |
66 Trybot bool `json:"trybot"` | |
67 } | |
68 | |
69 // NewTrace allocates a new Trace set up for the given number of samples. | |
70 // | |
71 // The Trace Values are pre-filled in with the missing data sentinel since not | |
72 // all tests will be run on all commits. | |
73 func NewTrace(numSamples int) *Trace { | |
74 t := &Trace{ | |
75 Values: make([]float64, numSamples, numSamples), | |
76 Params: make(map[string]string), | |
77 Trybot: false, | |
78 } | |
79 for i, _ := range t.Values { | |
80 t.Values[i] = MISSING_DATA_SENTINEL | |
81 } | |
82 return t | |
83 } | |
84 | |
85 // Annotations for commits. | |
86 type Annotation struct { | |
87 ID int `json:"id"` | |
88 Notes string `json:"notes"` | |
89 Author string `json:"author"` | |
90 Type int `json:"type"` | |
91 } | |
92 | |
93 // Commit is information about each Git commit. | |
94 type Commit struct { | |
95 CommitTime time.Time `json:"commit_time"` | |
96 Hash string `json:"hash"` | |
97 GitNumber int `json:"git_number"` | |
98 CommitMessage string `json:"commit_msg"` | |
99 Annotations []Annotation `json:"annotations,omitempty"` | |
100 } | |
101 | |
102 type Choices []string | |
103 | |
104 // AllData is the top level struct we return via JSON to the UI. | |
105 // | |
106 // The length of the Commits array is the same length as all of the Values | |
107 // arrays in all of the Traces. | |
108 type AllData struct { | |
109 Traces []Trace `json:"traces"` | |
110 ParamSet map[string]Choices `json:"param_set"` | |
111 Commits []Commit `json:"commits"` | |
112 } | |
113 | |
114 // gitCommitsWithTestData returns the list of commits that have perf data | |
115 // associated with them. | |
116 // | |
117 // Not all commits will have perf data, the builders don't necessarily run for | |
118 // each commit. | |
119 func gitCommitsWithTestData(service *bigquery.Service) (map[string]bool, error) { | |
120 query := ` | |
121 SELECT | |
122 gitHash, | |
123 COUNT(gitHash) AS c | |
kelvinly
2014/06/14 00:57:31
So why are you counting them? It seems like you do
jcgregorio
2014/06/15 01:58:59
Leftover code, now removed.
On 2014/06/14 00:57:3
| |
124 FROM | |
125 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, | |
126 DATE_ADD(CURRENT_TIMESTAMP(), | |
127 -2, | |
128 'DAY'), | |
129 CURRENT_TIMESTAMP())) | |
130 GROUP BY | |
131 gitHash; | |
132 ` | |
133 iter, err := NewRowIter(service, query) | |
134 if err != nil { | |
135 return nil, fmt.Errorf("Failed to query for the Git hashes used: %s", err) | |
136 } | |
137 | |
138 hashes := make(map[string]bool) | |
139 for iter.Next() { | |
140 h := &struct { | |
141 Hash string `bq:"gitHash"` | |
142 }{} | |
143 err := iter.Decode(h) | |
144 if err != nil { | |
145 return nil, fmt.Errorf("Failed reading hashes from BigQu ery: %s", err) | |
146 } | |
147 hashes[h.Hash] = true | |
148 } | |
149 return hashes, nil | |
150 } | |
151 | |
152 // GitHash represents information on a single Git commit. | |
153 type GitHash struct { | |
154 Hash string | |
155 TimeStamp time.Time | |
156 } | |
157 | |
158 // readCommitsFromGit reads the commit history from a Git repository. | |
159 func readCommitsFromGit(dir string) ([]GitHash, error) { | |
kelvinly
2014/06/14 00:57:31
It looks like you can also get the data from https
benchen
2014/06/14 20:32:37
The problem with googlesource is pagination. It gi
kelvinly
2014/06/14 23:53:03
On the other hand, it gives you a fairly nice JSON
| |
160 cmd := exec.Command("git", strings.Split("log --format=%H%x20%ci", " "). ..) | |
161 cmd.Dir = dir | |
162 b, err := cmd.Output() | |
163 if err != nil { | |
164 return nil, fmt.Errorf("Failed to run Git: %s", err) | |
165 } | |
166 lines := strings.Split(string(b), "\n") | |
167 hashes := make([]GitHash, 0, len(lines)) | |
168 for _, line := range lines { | |
169 parts := strings.SplitN(line, " ", 2) | |
170 if len(parts) == 2 { | |
171 t, err := time.Parse("2006-01-02 15:04:05 -0700", parts[ 1]) | |
172 if err != nil { | |
173 return nil, fmt.Errorf("Failed parsing Git log t imestamp: %s", err) | |
174 } | |
175 hashes = append(hashes, GitHash{Hash: parts[0], TimeStam p: t}) | |
176 } | |
177 } | |
178 return hashes, nil | |
179 } | |
180 | |
181 // RowIter is a utility for reading data from a BigQuery query response. | |
182 // | |
183 // RowIter will iterate over all the results, even if they span more than one | |
184 // page of results. It automatically uses page tokens to iterate over all the | |
185 // pages to retrieve all results. | |
186 type RowIter struct { | |
187 response *bigquery.GetQueryResultsResponse | |
188 jobId string | |
189 service *bigquery.Service | |
190 nextPageToken string | |
191 row int | |
192 } | |
193 | |
194 // poll until the job is complete. | |
195 func (r *RowIter) poll() error { | |
196 var queryResponse *bigquery.GetQueryResultsResponse | |
197 for { | |
198 var err error | |
199 queryCall := r.service.Jobs.GetQueryResults("google.com:chrome-s kia", r.jobId) | |
200 if r.nextPageToken != "" { | |
201 queryCall.PageToken(r.nextPageToken) | |
202 } | |
203 queryResponse, err = queryCall.Do() | |
204 if err != nil { | |
205 return err | |
206 } | |
207 if queryResponse.JobComplete { | |
208 break | |
209 } | |
210 time.Sleep(time.Second) | |
211 } | |
212 r.nextPageToken = queryResponse.PageToken | |
213 r.response = queryResponse | |
214 return nil | |
215 } | |
216 | |
217 // NewRowIter starts a query and returns a RowIter for iterating through the | |
218 // results. | |
219 func NewRowIter(service *bigquery.Service, query string) (*RowIter, error) { | |
220 job := &bigquery.Job{ | |
221 Configuration: &bigquery.JobConfiguration{ | |
222 Query: &bigquery.JobConfigurationQuery{ | |
223 Query: query, | |
224 }, | |
225 }, | |
226 } | |
227 jobResponse, err := service.Jobs.Insert("google.com:chrome-skia", job).D o() | |
228 if err != nil { | |
229 return nil, err | |
230 } | |
231 | |
232 r := &RowIter{ | |
233 jobId: jobResponse.JobReference.JobId, | |
234 service: service, | |
235 row: -1, // Start at -1 so the first call to Next() puts us at the 0th Row. | |
236 } | |
237 return r, r.poll() | |
238 } | |
239 | |
240 // Next moves to the next row in the response and returns true as long as data | |
241 // is availble, returning false when the end of the results are reached. | |
242 // | |
243 // Calling Next() the first time actually points the iterator at the first row, | |
244 // which makes it possible to use Next if a for loop: | |
245 // | |
246 // for iter.Next() { ... } | |
247 // | |
248 func (r *RowIter) Next() bool { | |
249 r.row++ | |
250 if r.row >= len(r.response.Rows) { | |
251 if r.nextPageToken != "" { | |
252 r.poll() | |
253 r.row = 0 | |
254 return len(r.response.Rows) > 0 | |
255 } else { | |
256 return false | |
257 } | |
258 } | |
259 return true | |
260 } | |
261 | |
262 // DecodeParams pulls all the values in the params record out as a map[string]st ring. | |
263 // | |
264 // The schema for each table has a nested record called 'params' that contains | |
265 // various axes along which queries could be built, such as the gpu the test was | |
266 // run against. Pull out the entire record as a generic map[string]string. | |
267 func (r *RowIter) DecodeParams() map[string]string { | |
268 row := r.response.Rows[r.row] | |
269 schema := r.response.Schema | |
270 params := map[string]string{} | |
271 for i, cell := range row.F { | |
272 if cell.V != nil { | |
273 name := schema.Fields[i].Name | |
274 if strings.HasPrefix(name, "params_") { | |
275 params[strings.TrimPrefix(name, "params_")] = ce ll.V.(string) | |
276 } | |
277 } | |
278 } | |
279 return params | |
280 } | |
281 | |
282 // Decode uses struct tags to decode a single row into a struct. | |
283 // | |
284 // For example, given a struct: | |
285 // | |
286 // type A struct { | |
287 // Name string `bq:"name"` | |
288 // Value float64 `bq:"measurement"` | |
289 // } | |
290 // | |
291 // And a BigQuery table that contained two columns named "name" and | |
292 // "measurement". Then calling Decode as follows would parse the column values | |
293 // for "name" and "measurement" and place them in the Name and Value fields | |
294 // respectively. | |
295 // | |
296 // a = &A{} | |
297 // iter.Decode(a) | |
298 // | |
299 // Implementation Details: | |
300 // | |
301 // If a tag names a column that doesn't exist, the field is merely ignored, | |
302 // i.e. it is left unchanged from when it was passed into Decode. | |
303 // | |
304 // Not all columns need to be tagged in the struct. | |
305 // | |
306 // The decoder doesn't handle nested structs, only the top level fields are de coded. | |
307 // | |
308 // The decoder only handles struct fields of type string, int, int32, int64, | |
309 // float, float32 and float64. | |
310 func (r *RowIter) Decode(s interface{}) error { | |
311 row := r.response.Rows[r.row] | |
312 schema := r.response.Schema | |
313 // Collapse the data in the row into a map[string]string. | |
314 rowMap := map[string]string{} | |
315 for i, cell := range row.F { | |
316 if cell.V != nil { | |
317 rowMap[schema.Fields[i].Name] = cell.V.(string) | |
318 } | |
319 } | |
320 | |
321 // Then iter over the fields of 's' and set them from the row data. | |
322 sv := reflect.ValueOf(s).Elem() | |
323 st := sv.Type() | |
324 for i := 0; i < sv.NumField(); i++ { | |
325 columnName := st.Field(i).Tag.Get("bq") | |
326 if columnValue, ok := rowMap[columnName]; ok { | |
327 switch sv.Field(i).Kind() { | |
328 case reflect.String: | |
329 sv.Field(i).SetString(columnValue) | |
330 case reflect.Float32, reflect.Float64: | |
331 f, err := strconv.ParseFloat(columnValue, 64) | |
332 if err != nil { | |
333 return err | |
334 } | |
335 sv.Field(i).SetFloat(f) | |
336 case reflect.Int32, reflect.Int64: | |
337 parsedInt, err := strconv.ParseInt(columnValue, 10, 64) | |
338 if err != nil { | |
339 return err | |
340 } | |
341 sv.Field(i).SetInt(parsedInt) | |
342 default: | |
343 return fmt.Errorf("can't decode into field of ty pe: %s", sv.Field(i).Kind()) | |
344 } | |
345 } | |
346 } | |
347 return nil | |
348 } | |
349 | |
350 // populateTraces reads the measurement data from BigQuery and populates the Tra ces. | |
351 func populateTraces(service *bigquery.Service, all *AllData, hashToIndex map[str ing]int, numSamples int) error { | |
352 type Measurement struct { | |
353 Value float64 `bq:"value"` | |
354 Key string `bq:"key"` | |
355 Hash string `bq:"gitHash"` | |
356 } | |
357 | |
358 // Now query the actual samples. | |
359 query := ` | |
360 SELECT | |
361 * | |
362 FROM | |
363 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, | |
364 DATE_ADD(CURRENT_TIMESTAMP(), | |
365 -2, | |
366 'DAY'), | |
367 CURRENT_TIMESTAMP())) | |
368 WHERE | |
369 params.benchName="tabl_worldjournal.skp" | |
370 OR | |
371 params.benchName="desk_amazon.skp" | |
372 ORDER BY | |
373 key DESC, | |
374 timestamp DESC; | |
375 ` | |
376 iter, err := NewRowIter(service, query) | |
377 if err != nil { | |
378 return fmt.Errorf("Failed to query data from BigQuery: %s", err) | |
379 } | |
380 var trace *Trace = nil | |
381 currentKey := "" | |
382 for iter.Next() { | |
383 m := &Measurement{} | |
384 if err := iter.Decode(m); err != nil { | |
385 return fmt.Errorf("Failed to decode Measurement from Big Query: %s", err) | |
386 } | |
387 if m.Key != currentKey { | |
388 if trace != nil { | |
389 all.Traces = append(all.Traces, *trace) | |
390 } | |
391 currentKey = m.Key | |
392 trace = NewTrace(numSamples) | |
393 trace.Params = iter.DecodeParams() | |
394 trace.Key = m.Key | |
395 } | |
396 if index, ok := hashToIndex[m.Hash]; ok { | |
397 trace.Values[index] = m.Value | |
398 } | |
399 } | |
400 all.Traces = append(all.Traces, *trace) | |
401 | |
402 return nil | |
403 } | |
404 | |
405 // Data is the full set of traces for the last N days all parsed into structs. | |
406 type Data struct { | |
407 all *AllData | |
408 } | |
409 | |
410 // AsJSON serializes the data as JSON. | |
411 func (d *Data) AsJSON(w io.Writer) error { | |
412 // TODO(jcgregorio) Keep a cache of the gzipped JSON around and serve th at as long as it's fresh. | |
413 return json.NewEncoder(w).Encode(d.all) | |
414 } | |
415 | |
416 // populateParamSet returns the set of all possible values for all the 'params' | |
417 // in AllData. | |
418 func populateParamSet(all *AllData) { | |
419 // First pull the data out into a map of sets. | |
420 type ChoiceSet map[string]bool | |
421 c := make(map[string]ChoiceSet) | |
422 for _, t := range all.Traces { | |
423 for k, v := range t.Params { | |
424 if set, ok := c[k]; !ok { | |
425 c[k] = make(map[string]bool) | |
426 c[k][v] = true | |
427 } else { | |
428 set[v] = true | |
429 } | |
430 } | |
431 } | |
432 // Now flatten the sets into []string and populate all.ParamsSet with th at. | |
433 for k, v := range c { | |
434 allOptions := []string{} | |
435 for option, _ := range v { | |
436 allOptions = append(allOptions, option) | |
437 } | |
438 all.ParamSet[k] = allOptions | |
439 } | |
440 } | |
441 | |
442 // NewData loads the data the first time and then starts a go routine to | |
443 // preiodically refresh the data. | |
444 // | |
445 // TODO(jcgregorio) Actuall do the bit where we start a go routine. | |
446 func NewData(doOauth bool, gitRepoDir string) (*Data, error) { | |
447 var err error | |
448 var client *http.Client | |
449 if doOauth { | |
450 client, err = runFlow(config) | |
451 if err != nil { | |
452 return nil, fmt.Errorf("Failed to auth: %s", err) | |
453 } | |
454 } else { | |
455 client = http.DefaultClient | |
456 } | |
457 service, err := bigquery.New(client) | |
458 if err != nil { | |
459 return nil, fmt.Errorf("Failed to create a new BigQuery service object: %s", err) | |
460 } | |
461 | |
462 // First query and get the list of hashes we are interested in and use t hat | |
463 // and the git log results to fill in the Commits. | |
464 allGitHashes, err := readCommitsFromGit(gitRepoDir) | |
465 if err != nil { | |
466 return nil, fmt.Errorf("Failed to read hashes from Git log: %s", err) | |
467 } | |
468 | |
469 hashesTested, err := gitCommitsWithTestData(service) | |
470 if err != nil { | |
471 return nil, fmt.Errorf("Failed to read hashes from BigQuery: %s" , err) | |
472 } | |
473 | |
474 // Order the git hashes by commit log order. | |
475 commits := make([]Commit, 0, len(hashesTested)) | |
476 for i := len(allGitHashes) - 1; i >= 0; i-- { | |
477 h := allGitHashes[i] | |
478 if _, ok := hashesTested[h.Hash]; ok { | |
479 commits = append(commits, Commit{Hash: h.Hash, CommitTim e: h.TimeStamp}) | |
480 } | |
481 } | |
482 | |
483 // The number of samples that appear in each trace. | |
484 numSamples := len(commits) | |
485 | |
486 // A mapping of Git hashes to where they appear in the Commits array, al so the index | |
487 // at which a measurement gets stored in the Values array. | |
488 hashToIndex := make(map[string]int) | |
489 for i, commit := range commits { | |
490 hashToIndex[commit.Hash] = i | |
491 } | |
492 | |
493 all := &AllData{ | |
494 Traces: make([]Trace, 0, 0), | |
495 ParamSet: make(map[string]Choices), | |
496 Commits: commits, | |
497 } | |
498 | |
499 if err := populateTraces(service, all, hashToIndex, numSamples); err != nil { | |
500 panic(err) | |
501 } | |
502 | |
503 populateParamSet(all) | |
504 | |
505 return &Data{all: all}, nil | |
506 } | |
OLD | NEW |