OLD | NEW |
---|---|
(Empty) | |
1 package main | |
tfarina
2014/06/13 19:01:04
don't we put copyright boilerplate in go files as
jcgregorio
2014/06/13 19:30:34
Done. Also added in the LICENSE file that the copy
| |
2 | |
3 import ( | |
4 "code.google.com/p/goauth2/oauth" | |
5 "code.google.com/p/google-api-go-client/bigquery/v2" | |
6 "encoding/json" | |
7 "fmt" | |
8 "github.com/oxtoacart/webbrowser" | |
9 "io" | |
10 "net/http" | |
11 "os/exec" | |
12 "reflect" | |
13 "strconv" | |
14 "strings" | |
15 "time" | |
16 ) | |
17 | |
18 const ( | |
19 MISSING_DATA_SENTINEL = 1e100 | |
20 ) | |
21 | |
22 // Shouldn't need auth when running from GCE, but will need it for local dev. | |
23 var config = &oauth.Config{ | |
24 ClientId: "470362608618-nlbqngfl87f4b3mhqqe9ojgaoe11vrld.apps.google usercontent.com", | |
25 ClientSecret: "J4YCkfMXFJISGyuBuVEiH60T", | |
26 Scope: bigquery.BigqueryScope, | |
27 AuthURL: "https://accounts.google.com/o/oauth2/auth", | |
28 TokenURL: "https://accounts.google.com/o/oauth2/token", | |
29 RedirectURL: "urn:ietf:wg:oauth:2.0:oob", | |
30 TokenCache: oauth.CacheFile("bqtoken.data"), | |
31 } | |
32 | |
33 // runFlow runs through a 3LO OAuth 2.0 flow to get credentials for BigQuery. | |
34 func runFlow(config *oauth.Config) (*http.Client, error) { | |
35 transport := &oauth.Transport{Config: config} | |
36 if _, err := config.TokenCache.Token(); err != nil { | |
37 url := config.AuthCodeURL("") | |
38 fmt.Printf(`Your browser has been opened to visit: | |
39 | |
40 %s | |
41 | |
42 Enter the verification code:`, url) | |
43 webbrowser.Open(url) | |
44 var code string | |
45 fmt.Scan(&code) | |
46 if _, err := transport.Exchange(code); err != nil { | |
47 return nil, err | |
48 } | |
49 } | |
50 | |
51 return transport.Client(), nil | |
52 } | |
53 | |
54 // Trace represents all the measurements of a single measurement over time. | |
55 type Trace struct { | |
56 Key string `json:"key"` | |
57 Values []float64 `json:"values"` | |
58 Params map[string]string `json:"params"` | |
59 Trybot bool `json:"trybot"` | |
60 } | |
61 | |
62 // NewTrace allocates a new Trace set up for the given number of samples. | |
63 func NewTrace(numSamples int) *Trace { | |
64 t := &Trace{ | |
65 Values: make([]float64, numSamples, numSamples), | |
66 Params: make(map[string]string), | |
67 Trybot: false, | |
68 } | |
69 for i, _ := range t.Values { | |
70 t.Values[i] = MISSING_DATA_SENTINEL | |
71 } | |
72 return t | |
73 } | |
74 | |
75 // Annotations for commits. | |
76 type Annotation struct { | |
77 ID int `json:"id"` | |
78 Notes string `json:"notes"` | |
79 Author string `json:"author"` | |
80 Type int `json:"type"` | |
81 } | |
82 | |
83 // Commit is information about each Git commit. | |
84 type Commit struct { | |
85 CommitTime time.Time `json:"commit_time"` | |
86 Hash string `json:"hash"` | |
87 GitNumber int `json:"git_number"` | |
88 CommitMessage string `json:"commit_msg"` | |
89 Annotations []Annotation `json:"annotations,omitempty"` | |
90 } | |
91 | |
92 type Choices []string | |
93 | |
94 // AllData is the top level struct we return via JSON to the UI. | |
95 type AllData struct { | |
96 Traces []Trace `json:"traces"` | |
97 ParamSet map[string]Choices `json:"param_set"` | |
98 Commits []Commit `json:"commits"` | |
99 } | |
100 | |
101 // gitCommitsWithTestData returns the list of commits that have perf data | |
102 // associated with them. | |
103 // | |
104 // Not all commits will have perf data, the builders don't necessarily run for | |
105 // each commit. | |
106 func gitCommitsWithTestData(service *bigquery.Service) (map[string]bool, error) { | |
107 query := ` | |
108 SELECT | |
109 gitHash, | |
110 COUNT(gitHash) AS c | |
111 FROM | |
112 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, | |
113 DATE_ADD(CURRENT_TIMESTAMP(), | |
114 -2, | |
115 'DAY'), | |
116 CURRENT_TIMESTAMP())) | |
117 GROUP BY | |
118 gitHash; | |
119 ` | |
120 iter, err := NewRowIter(service, query) | |
121 if err != nil { | |
122 return nil, fmt.Errorf("Failed to query for the Git hashes used: %s", err) | |
123 } | |
124 | |
125 hashes := make(map[string]bool) | |
126 for iter.Next() { | |
127 h := &struct { | |
128 Hash string `bq:"gitHash"` | |
129 }{} | |
130 err := iter.Decode(h) | |
131 if err != nil { | |
132 return nil, fmt.Errorf("Failed reading hashes from BigQu ery: %s", err) | |
133 } | |
134 hashes[h.Hash] = true | |
135 } | |
136 return hashes, nil | |
137 } | |
138 | |
139 // GitHash represents information on a single Git commit. | |
140 type GitHash struct { | |
141 Hash string | |
142 TimeStamp time.Time | |
143 } | |
144 | |
145 // readCommitsFromGit reads the commit history from a Git repository. | |
146 func readCommitsFromGit(dir string) ([]GitHash, error) { | |
benchen
2014/06/13 19:51:29
So this will be replace with reading the githash d
jcgregorio
2014/06/13 19:54:00
Yeah, if we have that info in MySQL it would proba
| |
147 cmd := exec.Command("git", strings.Split("log --format=%H%x20%ci", " "). ..) | |
148 cmd.Dir = dir | |
149 b, err := cmd.Output() | |
150 if err != nil { | |
151 return nil, fmt.Errorf("Failed to run Git: %s", err) | |
152 } | |
153 lines := strings.Split(string(b), "\n") | |
154 hashes := make([]GitHash, 0, len(lines)) | |
155 for _, line := range lines { | |
156 parts := strings.SplitN(line, " ", 2) | |
157 if len(parts) == 2 { | |
158 t, err := time.Parse("2006-01-02 15:04:05 -0700", parts[ 1]) | |
159 if err != nil { | |
160 return nil, fmt.Errorf("Failed parsing Git log t imestamp: %s", err) | |
161 } | |
162 hashes = append(hashes, GitHash{Hash: parts[0], TimeStam p: t}) | |
163 } | |
164 } | |
165 return hashes, nil | |
166 } | |
167 | |
168 // RowIter is a utility for reading data from a BigQuery query response. | |
169 // | |
170 // RowIter will iterate over all the results, even if they span more than one | |
171 // page of results. It automatically uses page tokens to iterate over all the | |
172 // pages to retrieve all results. | |
173 type RowIter struct { | |
174 response *bigquery.GetQueryResultsResponse | |
175 jobId string | |
176 service *bigquery.Service | |
177 nextPageToken string | |
178 row int | |
179 } | |
180 | |
181 // poll until the job is complete. | |
182 func (r *RowIter) poll() error { | |
183 var queryResponse *bigquery.GetQueryResultsResponse | |
184 for { | |
185 var err error | |
186 queryCall := r.service.Jobs.GetQueryResults("google.com:chrome-s kia", r.jobId) | |
187 if r.nextPageToken != "" { | |
188 queryCall.PageToken(r.nextPageToken) | |
189 } | |
190 queryResponse, err = queryCall.Do() | |
191 if err != nil { | |
192 return err | |
193 } | |
194 if queryResponse.JobComplete { | |
195 break | |
196 } | |
197 time.Sleep(time.Second) | |
198 } | |
199 r.nextPageToken = queryResponse.PageToken | |
200 r.response = queryResponse | |
201 return nil | |
202 } | |
203 | |
204 // NewRowIter starts a query and returns a RowIter for iterating through the | |
205 // results. | |
206 func NewRowIter(service *bigquery.Service, query string) (*RowIter, error) { | |
207 job := &bigquery.Job{ | |
208 Configuration: &bigquery.JobConfiguration{ | |
209 Query: &bigquery.JobConfigurationQuery{ | |
210 Query: query, | |
211 }, | |
212 }, | |
213 } | |
214 jobResponse, err := service.Jobs.Insert("google.com:chrome-skia", job).D o() | |
215 if err != nil { | |
216 return nil, err | |
217 } | |
218 | |
219 r := &RowIter{ | |
220 jobId: jobResponse.JobReference.JobId, | |
221 service: service, | |
222 row: -1, // Start at -1 so the first call to Next() puts us at the 0th Row. | |
223 } | |
224 return r, r.poll() | |
225 } | |
226 | |
227 // Next moves to the next row in the response and returns true as long as data | |
228 // is availble, returning false when the end of the results are reached. | |
229 // | |
230 // Calling Next() the first time actually points the iterator at the first row, | |
231 // which makes it possible to use Next if a for loop: | |
232 // | |
233 // for iter.Next() { ... } | |
234 // | |
235 func (r *RowIter) Next() bool { | |
236 r.row++ | |
237 if r.row >= len(r.response.Rows) { | |
238 if r.nextPageToken != "" { | |
239 r.poll() | |
240 r.row = 0 | |
241 return len(r.response.Rows) > 0 | |
242 } else { | |
243 return false | |
244 } | |
245 } | |
246 return true | |
247 } | |
248 | |
249 // DecodeParams pulls all the values in the params record out as a map[string]st ring. | |
250 // | |
251 // The schema for each table has a nested record called 'params' that contains | |
252 // various axes along which queries could be built, such as the gpu the test was | |
253 // run against. Pull out the entire record as a generic map[string]string. | |
254 func (r *RowIter) DecodeParams() map[string]string { | |
255 row := r.response.Rows[r.row] | |
256 schema := r.response.Schema | |
257 params := map[string]string{} | |
258 for i, cell := range row.F { | |
259 if cell.V != nil { | |
260 name := schema.Fields[i].Name | |
261 if strings.HasPrefix(name, "params_") { | |
262 params[strings.TrimPrefix(name, "params_")] = ce ll.V.(string) | |
263 } | |
264 } | |
265 } | |
266 return params | |
267 } | |
268 | |
269 // Decode uses struct tags to decode a single row into a struct. | |
270 // | |
271 // For example, given a struct: | |
272 // | |
273 // type A struct { | |
274 // Name string `bq:"name"` | |
275 // Value float64 `bq:"measurement"` | |
276 // } | |
277 // | |
278 // And a BigQuery table that contained two columns named "name" and | |
279 // "measurement". Then calling Decode as follows would parse the column values | |
280 // for "name" and "measurement" and place them in the Name and Value fields | |
281 // respectively. | |
282 // | |
283 // a = &A{} | |
284 // iter.Decode(a) | |
285 // | |
286 // Implementation Details: | |
287 // | |
288 // If a tag names a column that doesn't exist, the field is merely ignored, | |
289 // i.e. it is left unchanged from when it was passed into Decode. | |
290 // | |
291 // Not all columns need to be tagged in the struct. | |
292 // | |
293 // The decoder doesn't handle nested structs, only the top level fields are de coded. | |
294 // | |
295 // The decoder only handles struct fields of type string, int, int32, int64, | |
296 // float, float32 and float64. | |
297 func (r *RowIter) Decode(s interface{}) error { | |
298 row := r.response.Rows[r.row] | |
299 schema := r.response.Schema | |
300 // Collapse the data in the row into a map[string]string. | |
301 rowMap := map[string]string{} | |
302 for i, cell := range row.F { | |
303 if cell.V != nil { | |
304 rowMap[schema.Fields[i].Name] = cell.V.(string) | |
305 } | |
306 } | |
307 | |
308 // Then iter over the fields of 's' and set them from the row data. | |
309 sv := reflect.ValueOf(s).Elem() | |
310 st := sv.Type() | |
311 for i := 0; i < sv.NumField(); i++ { | |
312 columnName := st.Field(i).Tag.Get("bq") | |
313 if columnValue, ok := rowMap[columnName]; ok { | |
314 switch sv.Field(i).Kind() { | |
315 case reflect.String: | |
316 sv.Field(i).SetString(columnValue) | |
317 case reflect.Float32, reflect.Float64: | |
318 f, err := strconv.ParseFloat(columnValue, 64) | |
319 if err != nil { | |
320 return err | |
321 } | |
322 sv.Field(i).SetFloat(f) | |
323 case reflect.Int32, reflect.Int64: | |
324 parsedInt, err := strconv.ParseInt(columnValue, 10, 64) | |
325 if err != nil { | |
326 return err | |
327 } | |
328 sv.Field(i).SetInt(parsedInt) | |
329 default: | |
330 return fmt.Errorf("can't decode into field of ty pe: %s", sv.Field(i).Kind()) | |
331 } | |
332 } | |
333 } | |
334 return nil | |
335 } | |
336 | |
337 // populateTraces reads the measurement data from BigQuery and populates the Tra ces. | |
338 func populateTraces(service *bigquery.Service, all *AllData, hashToIndex map[str ing]int, numSamples int) error { | |
339 type Measurement struct { | |
340 Value float64 `bq:"value"` | |
341 Key string `bq:"key"` | |
342 Hash string `bq:"gitHash"` | |
343 } | |
344 | |
345 // Now query the actual samples. | |
346 query := ` | |
347 SELECT | |
348 * | |
349 FROM | |
350 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, | |
351 DATE_ADD(CURRENT_TIMESTAMP(), | |
352 -2, | |
353 'DAY'), | |
354 CURRENT_TIMESTAMP())) | |
355 WHERE | |
356 params.benchName="tabl_worldjournal.skp" | |
357 OR | |
358 params.benchName="desk_amazon.skp" | |
359 ORDER BY | |
360 key DESC, | |
361 timestamp DESC; | |
362 ` | |
363 iter, err := NewRowIter(service, query) | |
364 if err != nil { | |
365 return fmt.Errorf("Failed to query data from BigQuery: %s", err) | |
366 } | |
367 var trace *Trace = nil | |
368 currentKey := "" | |
369 for iter.Next() { | |
370 m := &Measurement{} | |
371 if err := iter.Decode(m); err != nil { | |
372 return fmt.Errorf("Failed to decode Measurement from Big Query: %s", err) | |
373 } | |
374 if m.Key != currentKey { | |
375 if trace != nil { | |
376 all.Traces = append(all.Traces, *trace) | |
377 } | |
378 currentKey = m.Key | |
379 trace = NewTrace(numSamples) | |
380 trace.Params = iter.DecodeParams() | |
381 trace.Key = m.Key | |
382 } | |
383 if index, ok := hashToIndex[m.Hash]; ok { | |
384 trace.Values[index] = m.Value | |
385 } | |
386 } | |
387 all.Traces = append(all.Traces, *trace) | |
388 | |
389 return nil | |
390 } | |
391 | |
392 // Data is the full set of traces for the last N days all parsed into structs. | |
393 type Data struct { | |
394 all *AllData | |
395 } | |
396 | |
397 // AsJSON serializes the data as JSON. | |
398 func (d *Data) AsJSON(w io.Writer) error { | |
399 return json.NewEncoder(w).Encode(d.all) | |
400 } | |
401 | |
402 // populateParamSet returns the set of all possible values for all the 'params' | |
403 // in AllData. | |
404 func populateParamSet(all *AllData) { | |
405 // First pull the data out into a map of sets. | |
406 type ChoiceSet map[string]bool | |
407 c := make(map[string]ChoiceSet) | |
408 for _, t := range all.Traces { | |
409 for k, v := range t.Params { | |
410 if set, ok := c[k]; !ok { | |
411 c[k] = make(map[string]bool) | |
412 c[k][v] = true | |
413 } else { | |
414 set[v] = true | |
415 } | |
416 } | |
417 } | |
418 // Now flatten the sets into []string and populate all.ParamsSet with th at. | |
419 for k, v := range c { | |
420 allOptions := []string{} | |
421 for option, _ := range v { | |
422 allOptions = append(allOptions, option) | |
423 } | |
424 all.ParamSet[k] = allOptions | |
425 } | |
426 } | |
427 | |
428 // NewData loads the data the first time and then starts a go routine to preiodi cally refresh the data. | |
429 // | |
430 // TODO(jcgregorio) Actuall do the bit where we start a go routine. | |
431 func NewData(doOauth bool, gitRepoDir string) (*Data, error) { | |
432 var err error | |
433 var client *http.Client | |
434 if doOauth { | |
435 client, err = runFlow(config) | |
436 if err != nil { | |
437 return nil, fmt.Errorf("Failed to auth: %s", err) | |
438 } | |
439 } else { | |
440 client = http.DefaultClient | |
441 } | |
442 service, err := bigquery.New(client) | |
443 if err != nil { | |
444 return nil, fmt.Errorf("Failed to create a new BigQuery service object: %s", err) | |
445 } | |
446 | |
447 // First query and get the list of hashes we are interested in and use t hat | |
448 // and the git log results to fill in the []Metadata. Now we know how lo ng to | |
449 // make each array. Also, create a map[hash]index into the array. | |
450 allGitHashes, err := readCommitsFromGit(gitRepoDir) | |
451 if err != nil { | |
452 return nil, fmt.Errorf("Failed to read hashes from Git log: %s", err) | |
453 } | |
454 | |
455 hashesTested, err := gitCommitsWithTestData(service) | |
456 if err != nil { | |
457 return nil, fmt.Errorf("Failed to read hashes from BigQuery: %s" , err) | |
458 } | |
459 | |
460 // Order the git hashes by commit log order. | |
461 commits := make([]Commit, 0, len(hashesTested)) | |
462 for i := len(allGitHashes) - 1; i >= 0; i-- { | |
463 h := allGitHashes[i] | |
464 if _, ok := hashesTested[h.Hash]; ok { | |
465 commits = append(commits, Commit{Hash: h.Hash, CommitTim e: h.TimeStamp}) | |
466 } | |
467 } | |
468 | |
469 // The number of samples that appear in each trace. | |
470 numSamples := len(commits) | |
471 | |
472 // A mapping of Git hashes to where they appear in the Commits array. | |
473 hashToIndex := make(map[string]int) | |
474 for i, commit := range commits { | |
475 hashToIndex[commit.Hash] = i | |
476 } | |
477 | |
478 all := &AllData{ | |
479 Traces: make([]Trace, 0, 0), | |
480 ParamSet: make(map[string]Choices), | |
481 Commits: commits, | |
482 } | |
483 | |
484 if err := populateTraces(service, all, hashToIndex, numSamples); err != nil { | |
485 panic(err) | |
486 } | |
487 | |
488 populateParamSet(all) | |
489 | |
490 return &Data{all: all}, nil | |
491 } | |
OLD | NEW |