Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(479)

Side by Side Diff: perf/server/data.go

Issue 335833002: Start loading the BigQuery data and serving it to the UI. (Closed) Base URL: https://skia.googlesource.com/buildbot.git@master
Patch Set: Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package main
tfarina 2014/06/13 19:01:04 don't we put copyright boilerplate in go files as
jcgregorio 2014/06/13 19:30:34 Done. Also added in the LICENSE file that the copy
2
3 import (
4 "code.google.com/p/goauth2/oauth"
5 "code.google.com/p/google-api-go-client/bigquery/v2"
6 "encoding/json"
7 "fmt"
8 "github.com/oxtoacart/webbrowser"
9 "io"
10 "net/http"
11 "os/exec"
12 "reflect"
13 "strconv"
14 "strings"
15 "time"
16 )
17
18 const (
19 MISSING_DATA_SENTINEL = 1e100
20 )
21
22 // Shouldn't need auth when running from GCE, but will need it for local dev.
23 var config = &oauth.Config{
24 ClientId: "470362608618-nlbqngfl87f4b3mhqqe9ojgaoe11vrld.apps.google usercontent.com",
25 ClientSecret: "J4YCkfMXFJISGyuBuVEiH60T",
26 Scope: bigquery.BigqueryScope,
27 AuthURL: "https://accounts.google.com/o/oauth2/auth",
28 TokenURL: "https://accounts.google.com/o/oauth2/token",
29 RedirectURL: "urn:ietf:wg:oauth:2.0:oob",
30 TokenCache: oauth.CacheFile("bqtoken.data"),
31 }
32
33 // runFlow runs through a 3LO OAuth 2.0 flow to get credentials for BigQuery.
34 func runFlow(config *oauth.Config) (*http.Client, error) {
35 transport := &oauth.Transport{Config: config}
36 if _, err := config.TokenCache.Token(); err != nil {
37 url := config.AuthCodeURL("")
38 fmt.Printf(`Your browser has been opened to visit:
39
40 %s
41
42 Enter the verification code:`, url)
43 webbrowser.Open(url)
44 var code string
45 fmt.Scan(&code)
46 if _, err := transport.Exchange(code); err != nil {
47 return nil, err
48 }
49 }
50
51 return transport.Client(), nil
52 }
53
54 // Trace represents all the measurements of a single measurement over time.
55 type Trace struct {
56 Key string `json:"key"`
57 Values []float64 `json:"values"`
58 Params map[string]string `json:"params"`
59 Trybot bool `json:"trybot"`
60 }
61
62 // NewTrace allocates a new Trace set up for the given number of samples.
63 func NewTrace(numSamples int) *Trace {
64 t := &Trace{
65 Values: make([]float64, numSamples, numSamples),
66 Params: make(map[string]string),
67 Trybot: false,
68 }
69 for i, _ := range t.Values {
70 t.Values[i] = MISSING_DATA_SENTINEL
71 }
72 return t
73 }
74
75 // Annotations for commits.
76 type Annotation struct {
77 ID int `json:"id"`
78 Notes string `json:"notes"`
79 Author string `json:"author"`
80 Type int `json:"type"`
81 }
82
83 // Commit is information about each Git commit.
84 type Commit struct {
85 CommitTime time.Time `json:"commit_time"`
86 Hash string `json:"hash"`
87 GitNumber int `json:"git_number"`
88 CommitMessage string `json:"commit_msg"`
89 Annotations []Annotation `json:"annotations,omitempty"`
90 }
91
92 type Choices []string
93
94 // AllData is the top level struct we return via JSON to the UI.
95 type AllData struct {
96 Traces []Trace `json:"traces"`
97 ParamSet map[string]Choices `json:"param_set"`
98 Commits []Commit `json:"commits"`
99 }
100
101 // gitCommitsWithTestData returns the list of commits that have perf data
102 // associated with them.
103 //
104 // Not all commits will have perf data, the builders don't necessarily run for
105 // each commit.
106 func gitCommitsWithTestData(service *bigquery.Service) (map[string]bool, error) {
107 query := `
108 SELECT
109 gitHash,
110 COUNT(gitHash) AS c
111 FROM
112 (TABLE_DATE_RANGE(perf_skps_v2.skpbench,
113 DATE_ADD(CURRENT_TIMESTAMP(),
114 -2,
115 'DAY'),
116 CURRENT_TIMESTAMP()))
117 GROUP BY
118 gitHash;
119 `
120 iter, err := NewRowIter(service, query)
121 if err != nil {
122 return nil, fmt.Errorf("Failed to query for the Git hashes used: %s", err)
123 }
124
125 hashes := make(map[string]bool)
126 for iter.Next() {
127 h := &struct {
128 Hash string `bq:"gitHash"`
129 }{}
130 err := iter.Decode(h)
131 if err != nil {
132 return nil, fmt.Errorf("Failed reading hashes from BigQu ery: %s", err)
133 }
134 hashes[h.Hash] = true
135 }
136 return hashes, nil
137 }
138
139 // GitHash represents information on a single Git commit.
140 type GitHash struct {
141 Hash string
142 TimeStamp time.Time
143 }
144
145 // readCommitsFromGit reads the commit history from a Git repository.
146 func readCommitsFromGit(dir string) ([]GitHash, error) {
benchen 2014/06/13 19:51:29 So this will be replace with reading the githash d
jcgregorio 2014/06/13 19:54:00 Yeah, if we have that info in MySQL it would proba
147 cmd := exec.Command("git", strings.Split("log --format=%H%x20%ci", " "). ..)
148 cmd.Dir = dir
149 b, err := cmd.Output()
150 if err != nil {
151 return nil, fmt.Errorf("Failed to run Git: %s", err)
152 }
153 lines := strings.Split(string(b), "\n")
154 hashes := make([]GitHash, 0, len(lines))
155 for _, line := range lines {
156 parts := strings.SplitN(line, " ", 2)
157 if len(parts) == 2 {
158 t, err := time.Parse("2006-01-02 15:04:05 -0700", parts[ 1])
159 if err != nil {
160 return nil, fmt.Errorf("Failed parsing Git log t imestamp: %s", err)
161 }
162 hashes = append(hashes, GitHash{Hash: parts[0], TimeStam p: t})
163 }
164 }
165 return hashes, nil
166 }
167
168 // RowIter is a utility for reading data from a BigQuery query response.
169 //
170 // RowIter will iterate over all the results, even if they span more than one
171 // page of results. It automatically uses page tokens to iterate over all the
172 // pages to retrieve all results.
173 type RowIter struct {
174 response *bigquery.GetQueryResultsResponse
175 jobId string
176 service *bigquery.Service
177 nextPageToken string
178 row int
179 }
180
181 // poll until the job is complete.
182 func (r *RowIter) poll() error {
183 var queryResponse *bigquery.GetQueryResultsResponse
184 for {
185 var err error
186 queryCall := r.service.Jobs.GetQueryResults("google.com:chrome-s kia", r.jobId)
187 if r.nextPageToken != "" {
188 queryCall.PageToken(r.nextPageToken)
189 }
190 queryResponse, err = queryCall.Do()
191 if err != nil {
192 return err
193 }
194 if queryResponse.JobComplete {
195 break
196 }
197 time.Sleep(time.Second)
198 }
199 r.nextPageToken = queryResponse.PageToken
200 r.response = queryResponse
201 return nil
202 }
203
204 // NewRowIter starts a query and returns a RowIter for iterating through the
205 // results.
206 func NewRowIter(service *bigquery.Service, query string) (*RowIter, error) {
207 job := &bigquery.Job{
208 Configuration: &bigquery.JobConfiguration{
209 Query: &bigquery.JobConfigurationQuery{
210 Query: query,
211 },
212 },
213 }
214 jobResponse, err := service.Jobs.Insert("google.com:chrome-skia", job).D o()
215 if err != nil {
216 return nil, err
217 }
218
219 r := &RowIter{
220 jobId: jobResponse.JobReference.JobId,
221 service: service,
222 row: -1, // Start at -1 so the first call to Next() puts us at the 0th Row.
223 }
224 return r, r.poll()
225 }
226
227 // Next moves to the next row in the response and returns true as long as data
228 // is availble, returning false when the end of the results are reached.
229 //
230 // Calling Next() the first time actually points the iterator at the first row,
231 // which makes it possible to use Next if a for loop:
232 //
233 // for iter.Next() { ... }
234 //
235 func (r *RowIter) Next() bool {
236 r.row++
237 if r.row >= len(r.response.Rows) {
238 if r.nextPageToken != "" {
239 r.poll()
240 r.row = 0
241 return len(r.response.Rows) > 0
242 } else {
243 return false
244 }
245 }
246 return true
247 }
248
249 // DecodeParams pulls all the values in the params record out as a map[string]st ring.
250 //
251 // The schema for each table has a nested record called 'params' that contains
252 // various axes along which queries could be built, such as the gpu the test was
253 // run against. Pull out the entire record as a generic map[string]string.
254 func (r *RowIter) DecodeParams() map[string]string {
255 row := r.response.Rows[r.row]
256 schema := r.response.Schema
257 params := map[string]string{}
258 for i, cell := range row.F {
259 if cell.V != nil {
260 name := schema.Fields[i].Name
261 if strings.HasPrefix(name, "params_") {
262 params[strings.TrimPrefix(name, "params_")] = ce ll.V.(string)
263 }
264 }
265 }
266 return params
267 }
268
269 // Decode uses struct tags to decode a single row into a struct.
270 //
271 // For example, given a struct:
272 //
273 // type A struct {
274 // Name string `bq:"name"`
275 // Value float64 `bq:"measurement"`
276 // }
277 //
278 // And a BigQuery table that contained two columns named "name" and
279 // "measurement". Then calling Decode as follows would parse the column values
280 // for "name" and "measurement" and place them in the Name and Value fields
281 // respectively.
282 //
283 // a = &A{}
284 // iter.Decode(a)
285 //
286 // Implementation Details:
287 //
288 // If a tag names a column that doesn't exist, the field is merely ignored,
289 // i.e. it is left unchanged from when it was passed into Decode.
290 //
291 // Not all columns need to be tagged in the struct.
292 //
293 // The decoder doesn't handle nested structs, only the top level fields are de coded.
294 //
295 // The decoder only handles struct fields of type string, int, int32, int64,
296 // float, float32 and float64.
297 func (r *RowIter) Decode(s interface{}) error {
298 row := r.response.Rows[r.row]
299 schema := r.response.Schema
300 // Collapse the data in the row into a map[string]string.
301 rowMap := map[string]string{}
302 for i, cell := range row.F {
303 if cell.V != nil {
304 rowMap[schema.Fields[i].Name] = cell.V.(string)
305 }
306 }
307
308 // Then iter over the fields of 's' and set them from the row data.
309 sv := reflect.ValueOf(s).Elem()
310 st := sv.Type()
311 for i := 0; i < sv.NumField(); i++ {
312 columnName := st.Field(i).Tag.Get("bq")
313 if columnValue, ok := rowMap[columnName]; ok {
314 switch sv.Field(i).Kind() {
315 case reflect.String:
316 sv.Field(i).SetString(columnValue)
317 case reflect.Float32, reflect.Float64:
318 f, err := strconv.ParseFloat(columnValue, 64)
319 if err != nil {
320 return err
321 }
322 sv.Field(i).SetFloat(f)
323 case reflect.Int32, reflect.Int64:
324 parsedInt, err := strconv.ParseInt(columnValue, 10, 64)
325 if err != nil {
326 return err
327 }
328 sv.Field(i).SetInt(parsedInt)
329 default:
330 return fmt.Errorf("can't decode into field of ty pe: %s", sv.Field(i).Kind())
331 }
332 }
333 }
334 return nil
335 }
336
337 // populateTraces reads the measurement data from BigQuery and populates the Tra ces.
338 func populateTraces(service *bigquery.Service, all *AllData, hashToIndex map[str ing]int, numSamples int) error {
339 type Measurement struct {
340 Value float64 `bq:"value"`
341 Key string `bq:"key"`
342 Hash string `bq:"gitHash"`
343 }
344
345 // Now query the actual samples.
346 query := `
347 SELECT
348 *
349 FROM
350 (TABLE_DATE_RANGE(perf_skps_v2.skpbench,
351 DATE_ADD(CURRENT_TIMESTAMP(),
352 -2,
353 'DAY'),
354 CURRENT_TIMESTAMP()))
355 WHERE
356 params.benchName="tabl_worldjournal.skp"
357 OR
358 params.benchName="desk_amazon.skp"
359 ORDER BY
360 key DESC,
361 timestamp DESC;
362 `
363 iter, err := NewRowIter(service, query)
364 if err != nil {
365 return fmt.Errorf("Failed to query data from BigQuery: %s", err)
366 }
367 var trace *Trace = nil
368 currentKey := ""
369 for iter.Next() {
370 m := &Measurement{}
371 if err := iter.Decode(m); err != nil {
372 return fmt.Errorf("Failed to decode Measurement from Big Query: %s", err)
373 }
374 if m.Key != currentKey {
375 if trace != nil {
376 all.Traces = append(all.Traces, *trace)
377 }
378 currentKey = m.Key
379 trace = NewTrace(numSamples)
380 trace.Params = iter.DecodeParams()
381 trace.Key = m.Key
382 }
383 if index, ok := hashToIndex[m.Hash]; ok {
384 trace.Values[index] = m.Value
385 }
386 }
387 all.Traces = append(all.Traces, *trace)
388
389 return nil
390 }
391
392 // Data is the full set of traces for the last N days all parsed into structs.
393 type Data struct {
394 all *AllData
395 }
396
397 // AsJSON serializes the data as JSON.
398 func (d *Data) AsJSON(w io.Writer) error {
399 return json.NewEncoder(w).Encode(d.all)
400 }
401
402 // populateParamSet returns the set of all possible values for all the 'params'
403 // in AllData.
404 func populateParamSet(all *AllData) {
405 // First pull the data out into a map of sets.
406 type ChoiceSet map[string]bool
407 c := make(map[string]ChoiceSet)
408 for _, t := range all.Traces {
409 for k, v := range t.Params {
410 if set, ok := c[k]; !ok {
411 c[k] = make(map[string]bool)
412 c[k][v] = true
413 } else {
414 set[v] = true
415 }
416 }
417 }
418 // Now flatten the sets into []string and populate all.ParamsSet with th at.
419 for k, v := range c {
420 allOptions := []string{}
421 for option, _ := range v {
422 allOptions = append(allOptions, option)
423 }
424 all.ParamSet[k] = allOptions
425 }
426 }
427
428 // NewData loads the data the first time and then starts a go routine to preiodi cally refresh the data.
429 //
430 // TODO(jcgregorio) Actuall do the bit where we start a go routine.
431 func NewData(doOauth bool, gitRepoDir string) (*Data, error) {
432 var err error
433 var client *http.Client
434 if doOauth {
435 client, err = runFlow(config)
436 if err != nil {
437 return nil, fmt.Errorf("Failed to auth: %s", err)
438 }
439 } else {
440 client = http.DefaultClient
441 }
442 service, err := bigquery.New(client)
443 if err != nil {
444 return nil, fmt.Errorf("Failed to create a new BigQuery service object: %s", err)
445 }
446
447 // First query and get the list of hashes we are interested in and use t hat
448 // and the git log results to fill in the []Metadata. Now we know how lo ng to
449 // make each array. Also, create a map[hash]index into the array.
450 allGitHashes, err := readCommitsFromGit(gitRepoDir)
451 if err != nil {
452 return nil, fmt.Errorf("Failed to read hashes from Git log: %s", err)
453 }
454
455 hashesTested, err := gitCommitsWithTestData(service)
456 if err != nil {
457 return nil, fmt.Errorf("Failed to read hashes from BigQuery: %s" , err)
458 }
459
460 // Order the git hashes by commit log order.
461 commits := make([]Commit, 0, len(hashesTested))
462 for i := len(allGitHashes) - 1; i >= 0; i-- {
463 h := allGitHashes[i]
464 if _, ok := hashesTested[h.Hash]; ok {
465 commits = append(commits, Commit{Hash: h.Hash, CommitTim e: h.TimeStamp})
466 }
467 }
468
469 // The number of samples that appear in each trace.
470 numSamples := len(commits)
471
472 // A mapping of Git hashes to where they appear in the Commits array.
473 hashToIndex := make(map[string]int)
474 for i, commit := range commits {
475 hashToIndex[commit.Hash] = i
476 }
477
478 all := &AllData{
479 Traces: make([]Trace, 0, 0),
480 ParamSet: make(map[string]Choices),
481 Commits: commits,
482 }
483
484 if err := populateTraces(service, all, hashToIndex, numSamples); err != nil {
485 panic(err)
486 }
487
488 populateParamSet(all)
489
490 return &Data{all: all}, nil
491 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698