OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be found |
| 3 // in the LICENSE file. |
| 4 |
| 5 package main |
| 6 |
| 7 import ( |
| 8 "encoding/json" |
| 9 "fmt" |
| 10 "io" |
| 11 "net/http" |
| 12 "os/exec" |
| 13 "reflect" |
| 14 "strconv" |
| 15 "strings" |
| 16 "time" |
| 17 ) |
| 18 |
| 19 import ( |
| 20 "code.google.com/p/goauth2/oauth" |
| 21 "code.google.com/p/google-api-go-client/bigquery/v2" |
| 22 "github.com/oxtoacart/webbrowser" |
| 23 ) |
| 24 |
| 25 const ( |
| 26 // JSON doesn't support NaN or +/- Inf, so we need a valid float |
| 27 // to signal missing data that also has a compact JSON representation. |
| 28 MISSING_DATA_SENTINEL = 1e100 |
| 29 ) |
| 30 |
| 31 // Shouldn't need auth when running from GCE, but will need it for local dev. |
| 32 // TODO(jcgregorio) Move to reading this from client_secrets.json and void these
keys at that point. |
| 33 var config = &oauth.Config{ |
| 34 ClientId: "470362608618-nlbqngfl87f4b3mhqqe9ojgaoe11vrld.apps.google
usercontent.com", |
| 35 ClientSecret: "J4YCkfMXFJISGyuBuVEiH60T", |
| 36 Scope: bigquery.BigqueryScope, |
| 37 AuthURL: "https://accounts.google.com/o/oauth2/auth", |
| 38 TokenURL: "https://accounts.google.com/o/oauth2/token", |
| 39 RedirectURL: "urn:ietf:wg:oauth:2.0:oob", |
| 40 TokenCache: oauth.CacheFile("bqtoken.data"), |
| 41 } |
| 42 |
| 43 // runFlow runs through a 3LO OAuth 2.0 flow to get credentials for BigQuery. |
| 44 func runFlow(config *oauth.Config) (*http.Client, error) { |
| 45 transport := &oauth.Transport{Config: config} |
| 46 if _, err := config.TokenCache.Token(); err != nil { |
| 47 url := config.AuthCodeURL("") |
| 48 fmt.Printf(`Your browser has been opened to visit: |
| 49 |
| 50 %s |
| 51 |
| 52 Enter the verification code:`, url) |
| 53 webbrowser.Open(url) |
| 54 var code string |
| 55 fmt.Scan(&code) |
| 56 if _, err := transport.Exchange(code); err != nil { |
| 57 return nil, err |
| 58 } |
| 59 } |
| 60 |
| 61 return transport.Client(), nil |
| 62 } |
| 63 |
| 64 // Trace represents all the values of a single measurement over time. |
| 65 type Trace struct { |
| 66 Key string `json:"key"` |
| 67 Values []float64 `json:"values"` |
| 68 Params map[string]string `json:"params"` |
| 69 Trybot bool `json:"trybot"` |
| 70 } |
| 71 |
| 72 // NewTrace allocates a new Trace set up for the given number of samples. |
| 73 // |
| 74 // The Trace Values are pre-filled in with the missing data sentinel since not |
| 75 // all tests will be run on all commits. |
| 76 func NewTrace(numSamples int) *Trace { |
| 77 t := &Trace{ |
| 78 Values: make([]float64, numSamples, numSamples), |
| 79 Params: make(map[string]string), |
| 80 Trybot: false, |
| 81 } |
| 82 for i, _ := range t.Values { |
| 83 t.Values[i] = MISSING_DATA_SENTINEL |
| 84 } |
| 85 return t |
| 86 } |
| 87 |
| 88 // Annotations for commits. |
| 89 // |
| 90 // Will map to the table of annotation notes in MySQL. See DESIGN.md |
| 91 // for the MySQL schema. |
| 92 type Annotation struct { |
| 93 ID int `json:"id"` |
| 94 Notes string `json:"notes"` |
| 95 Author string `json:"author"` |
| 96 Type int `json:"type"` |
| 97 } |
| 98 |
| 99 // Commit is information about each Git commit. |
| 100 type Commit struct { |
| 101 CommitTime time.Time `json:"commit_time"` |
| 102 Hash string `json:"hash"` |
| 103 GitNumber int `json:"git_number"` |
| 104 CommitMessage string `json:"commit_msg"` |
| 105 Annotations []Annotation `json:"annotations,omitempty"` |
| 106 } |
| 107 |
| 108 // Choices is a list of possible values for a param. See AllData. |
| 109 type Choices []string |
| 110 |
| 111 // AllData is the top level struct we return via JSON to the UI. |
| 112 // |
| 113 // The length of the Commits array is the same length as all of the Values |
| 114 // arrays in all of the Traces. |
| 115 type AllData struct { |
| 116 Traces []Trace `json:"traces"` |
| 117 ParamSet map[string]Choices `json:"param_set"` |
| 118 Commits []Commit `json:"commits"` |
| 119 } |
| 120 |
| 121 // gitCommitsWithTestData returns the list of commits that have perf data |
| 122 // associated with them. |
| 123 // |
| 124 // Not all commits will have perf data, the builders don't necessarily run for |
| 125 // each commit. |
| 126 func gitCommitsWithTestData(service *bigquery.Service) (map[string]bool, error)
{ |
| 127 query := ` |
| 128 SELECT |
| 129 gitHash |
| 130 FROM |
| 131 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, |
| 132 DATE_ADD(CURRENT_TIMESTAMP(), |
| 133 -2, |
| 134 'DAY'), |
| 135 CURRENT_TIMESTAMP())) |
| 136 GROUP BY |
| 137 gitHash; |
| 138 ` |
| 139 iter, err := NewRowIter(service, query) |
| 140 if err != nil { |
| 141 return nil, fmt.Errorf("Failed to query for the Git hashes used:
%s", err) |
| 142 } |
| 143 |
| 144 hashes := make(map[string]bool) |
| 145 for iter.Next() { |
| 146 h := &struct { |
| 147 Hash string `bq:"gitHash"` |
| 148 }{} |
| 149 err := iter.Decode(h) |
| 150 if err != nil { |
| 151 return nil, fmt.Errorf("Failed reading hashes from BigQu
ery: %s", err) |
| 152 } |
| 153 hashes[h.Hash] = true |
| 154 } |
| 155 return hashes, nil |
| 156 } |
| 157 |
| 158 // GitHash represents information on a single Git commit. |
| 159 type GitHash struct { |
| 160 Hash string |
| 161 TimeStamp time.Time |
| 162 } |
| 163 |
| 164 // readCommitsFromGit reads the commit history from a Git repository. |
| 165 func readCommitsFromGit(dir string) ([]GitHash, error) { |
| 166 cmd := exec.Command("git", strings.Split("log --format=%H%x20%ci", " ").
..) |
| 167 cmd.Dir = dir |
| 168 b, err := cmd.Output() |
| 169 if err != nil { |
| 170 return nil, fmt.Errorf("Failed to run Git: %s", err) |
| 171 } |
| 172 lines := strings.Split(string(b), "\n") |
| 173 hashes := make([]GitHash, 0, len(lines)) |
| 174 for _, line := range lines { |
| 175 parts := strings.SplitN(line, " ", 2) |
| 176 if len(parts) == 2 { |
| 177 t, err := time.Parse("2006-01-02 15:04:05 -0700", parts[
1]) |
| 178 if err != nil { |
| 179 return nil, fmt.Errorf("Failed parsing Git log t
imestamp: %s", err) |
| 180 } |
| 181 hashes = append(hashes, GitHash{Hash: parts[0], TimeStam
p: t}) |
| 182 } |
| 183 } |
| 184 return hashes, nil |
| 185 } |
| 186 |
| 187 // RowIter is a utility for reading data from a BigQuery query response. |
| 188 // |
| 189 // RowIter will iterate over all the results, even if they span more than one |
| 190 // page of results. It automatically uses page tokens to iterate over all the |
| 191 // pages to retrieve all results. |
| 192 type RowIter struct { |
| 193 response *bigquery.GetQueryResultsResponse |
| 194 jobId string |
| 195 service *bigquery.Service |
| 196 nextPageToken string |
| 197 row int |
| 198 } |
| 199 |
| 200 // poll until the job is complete. |
| 201 func (r *RowIter) poll() error { |
| 202 var queryResponse *bigquery.GetQueryResultsResponse |
| 203 for { |
| 204 var err error |
| 205 queryCall := r.service.Jobs.GetQueryResults("google.com:chrome-s
kia", r.jobId) |
| 206 if r.nextPageToken != "" { |
| 207 queryCall.PageToken(r.nextPageToken) |
| 208 } |
| 209 queryResponse, err = queryCall.Do() |
| 210 if err != nil { |
| 211 return err |
| 212 } |
| 213 if queryResponse.JobComplete { |
| 214 break |
| 215 } |
| 216 time.Sleep(time.Second) |
| 217 } |
| 218 r.nextPageToken = queryResponse.PageToken |
| 219 r.response = queryResponse |
| 220 return nil |
| 221 } |
| 222 |
| 223 // NewRowIter starts a query and returns a RowIter for iterating through the |
| 224 // results. |
| 225 func NewRowIter(service *bigquery.Service, query string) (*RowIter, error) { |
| 226 job := &bigquery.Job{ |
| 227 Configuration: &bigquery.JobConfiguration{ |
| 228 Query: &bigquery.JobConfigurationQuery{ |
| 229 Query: query, |
| 230 }, |
| 231 }, |
| 232 } |
| 233 jobResponse, err := service.Jobs.Insert("google.com:chrome-skia", job).D
o() |
| 234 if err != nil { |
| 235 return nil, err |
| 236 } |
| 237 |
| 238 r := &RowIter{ |
| 239 jobId: jobResponse.JobReference.JobId, |
| 240 service: service, |
| 241 row: -1, // Start at -1 so the first call to Next() puts us
at the 0th Row. |
| 242 } |
| 243 return r, r.poll() |
| 244 } |
| 245 |
| 246 // Next moves to the next row in the response and returns true as long as data |
| 247 // is availble, returning false when the end of the results are reached. |
| 248 // |
| 249 // Calling Next() the first time actually points the iterator at the first row, |
| 250 // which makes it possible to use Next if a for loop: |
| 251 // |
| 252 // for iter.Next() { ... } |
| 253 // |
| 254 func (r *RowIter) Next() bool { |
| 255 r.row++ |
| 256 if r.row >= len(r.response.Rows) { |
| 257 if r.nextPageToken != "" { |
| 258 r.poll() |
| 259 r.row = 0 |
| 260 return len(r.response.Rows) > 0 |
| 261 } else { |
| 262 return false |
| 263 } |
| 264 } |
| 265 return true |
| 266 } |
| 267 |
| 268 // DecodeParams pulls all the values in the params record out as a map[string]st
ring. |
| 269 // |
| 270 // The schema for each table has a nested record called 'params' that contains |
| 271 // various axes along which queries could be built, such as the gpu the test was |
| 272 // run against. Pull out the entire record as a generic map[string]string. |
| 273 func (r *RowIter) DecodeParams() map[string]string { |
| 274 row := r.response.Rows[r.row] |
| 275 schema := r.response.Schema |
| 276 params := map[string]string{} |
| 277 for i, cell := range row.F { |
| 278 if cell.V != nil { |
| 279 name := schema.Fields[i].Name |
| 280 if strings.HasPrefix(name, "params_") { |
| 281 params[strings.TrimPrefix(name, "params_")] = ce
ll.V.(string) |
| 282 } |
| 283 } |
| 284 } |
| 285 return params |
| 286 } |
| 287 |
| 288 // Decode uses struct tags to decode a single row into a struct. |
| 289 // |
| 290 // For example, given a struct: |
| 291 // |
| 292 // type A struct { |
| 293 // Name string `bq:"name"` |
| 294 // Value float64 `bq:"measurement"` |
| 295 // } |
| 296 // |
| 297 // And a BigQuery table that contained two columns named "name" and |
| 298 // "measurement". Then calling Decode as follows would parse the column values |
| 299 // for "name" and "measurement" and place them in the Name and Value fields |
| 300 // respectively. |
| 301 // |
| 302 // a = &A{} |
| 303 // iter.Decode(a) |
| 304 // |
| 305 // Implementation Details: |
| 306 // |
| 307 // If a tag names a column that doesn't exist, the field is merely ignored, |
| 308 // i.e. it is left unchanged from when it was passed into Decode. |
| 309 // |
| 310 // Not all columns need to be tagged in the struct. |
| 311 // |
| 312 // The decoder doesn't handle nested structs, only the top level fields are de
coded. |
| 313 // |
| 314 // The decoder only handles struct fields of type string, int, int32, int64, |
| 315 // float, float32 and float64. |
| 316 func (r *RowIter) Decode(s interface{}) error { |
| 317 row := r.response.Rows[r.row] |
| 318 schema := r.response.Schema |
| 319 // Collapse the data in the row into a map[string]string. |
| 320 rowMap := map[string]string{} |
| 321 for i, cell := range row.F { |
| 322 if cell.V != nil { |
| 323 rowMap[schema.Fields[i].Name] = cell.V.(string) |
| 324 } |
| 325 } |
| 326 |
| 327 // Then iter over the fields of 's' and set them from the row data. |
| 328 sv := reflect.ValueOf(s).Elem() |
| 329 st := sv.Type() |
| 330 for i := 0; i < sv.NumField(); i++ { |
| 331 columnName := st.Field(i).Tag.Get("bq") |
| 332 if columnValue, ok := rowMap[columnName]; ok { |
| 333 switch sv.Field(i).Kind() { |
| 334 case reflect.String: |
| 335 sv.Field(i).SetString(columnValue) |
| 336 case reflect.Float32, reflect.Float64: |
| 337 f, err := strconv.ParseFloat(columnValue, 64) |
| 338 if err != nil { |
| 339 return err |
| 340 } |
| 341 sv.Field(i).SetFloat(f) |
| 342 case reflect.Int32, reflect.Int64: |
| 343 parsedInt, err := strconv.ParseInt(columnValue,
10, 64) |
| 344 if err != nil { |
| 345 return err |
| 346 } |
| 347 sv.Field(i).SetInt(parsedInt) |
| 348 default: |
| 349 return fmt.Errorf("can't decode into field of ty
pe: %s", sv.Field(i).Kind()) |
| 350 } |
| 351 } |
| 352 } |
| 353 return nil |
| 354 } |
| 355 |
| 356 // populateTraces reads the measurement data from BigQuery and populates the Tra
ces. |
| 357 func populateTraces(service *bigquery.Service, all *AllData, hashToIndex map[str
ing]int, numSamples int) error { |
| 358 type Measurement struct { |
| 359 Value float64 `bq:"value"` |
| 360 Key string `bq:"key"` |
| 361 Hash string `bq:"gitHash"` |
| 362 } |
| 363 |
| 364 // Now query the actual samples. |
| 365 query := ` |
| 366 SELECT |
| 367 * |
| 368 FROM |
| 369 (TABLE_DATE_RANGE(perf_skps_v2.skpbench, |
| 370 DATE_ADD(CURRENT_TIMESTAMP(), |
| 371 -2, |
| 372 'DAY'), |
| 373 CURRENT_TIMESTAMP())) |
| 374 WHERE |
| 375 params.benchName="tabl_worldjournal.skp" |
| 376 OR |
| 377 params.benchName="desk_amazon.skp" |
| 378 ORDER BY |
| 379 key DESC, |
| 380 timestamp DESC; |
| 381 ` |
| 382 iter, err := NewRowIter(service, query) |
| 383 if err != nil { |
| 384 return fmt.Errorf("Failed to query data from BigQuery: %s", err) |
| 385 } |
| 386 var trace *Trace = nil |
| 387 currentKey := "" |
| 388 for iter.Next() { |
| 389 m := &Measurement{} |
| 390 if err := iter.Decode(m); err != nil { |
| 391 return fmt.Errorf("Failed to decode Measurement from Big
Query: %s", err) |
| 392 } |
| 393 if m.Key != currentKey { |
| 394 if trace != nil { |
| 395 all.Traces = append(all.Traces, *trace) |
| 396 } |
| 397 currentKey = m.Key |
| 398 trace = NewTrace(numSamples) |
| 399 trace.Params = iter.DecodeParams() |
| 400 trace.Key = m.Key |
| 401 } |
| 402 if index, ok := hashToIndex[m.Hash]; ok { |
| 403 trace.Values[index] = m.Value |
| 404 } |
| 405 } |
| 406 all.Traces = append(all.Traces, *trace) |
| 407 |
| 408 return nil |
| 409 } |
| 410 |
| 411 // Data is the full set of traces for the last N days all parsed into structs. |
| 412 type Data struct { |
| 413 all *AllData |
| 414 } |
| 415 |
| 416 // AsJSON serializes the data as JSON. |
| 417 func (d *Data) AsJSON(w io.Writer) error { |
| 418 // TODO(jcgregorio) Keep a cache of the gzipped JSON around and serve th
at as long as it's fresh. |
| 419 return json.NewEncoder(w).Encode(d.all) |
| 420 } |
| 421 |
| 422 // populateParamSet returns the set of all possible values for all the 'params' |
| 423 // in AllData. |
| 424 func populateParamSet(all *AllData) { |
| 425 // First pull the data out into a map of sets. |
| 426 type ChoiceSet map[string]bool |
| 427 c := make(map[string]ChoiceSet) |
| 428 for _, t := range all.Traces { |
| 429 for k, v := range t.Params { |
| 430 if set, ok := c[k]; !ok { |
| 431 c[k] = make(map[string]bool) |
| 432 c[k][v] = true |
| 433 } else { |
| 434 set[v] = true |
| 435 } |
| 436 } |
| 437 } |
| 438 // Now flatten the sets into []string and populate all.ParamsSet with th
at. |
| 439 for k, v := range c { |
| 440 allOptions := []string{} |
| 441 for option, _ := range v { |
| 442 allOptions = append(allOptions, option) |
| 443 } |
| 444 all.ParamSet[k] = allOptions |
| 445 } |
| 446 } |
| 447 |
| 448 // NewData loads the data the first time and then starts a go routine to |
| 449 // preiodically refresh the data. |
| 450 // |
| 451 // TODO(jcgregorio) Actuall do the bit where we start a go routine. |
| 452 func NewData(doOauth bool, gitRepoDir string) (*Data, error) { |
| 453 var err error |
| 454 var client *http.Client |
| 455 if doOauth { |
| 456 client, err = runFlow(config) |
| 457 if err != nil { |
| 458 return nil, fmt.Errorf("Failed to auth: %s", err) |
| 459 } |
| 460 } else { |
| 461 client = http.DefaultClient |
| 462 } |
| 463 service, err := bigquery.New(client) |
| 464 if err != nil { |
| 465 return nil, fmt.Errorf("Failed to create a new BigQuery service
object: %s", err) |
| 466 } |
| 467 |
| 468 // First query and get the list of hashes we are interested in and use t
hat |
| 469 // and the git log results to fill in the Commits. |
| 470 allGitHashes, err := readCommitsFromGit(gitRepoDir) |
| 471 if err != nil { |
| 472 return nil, fmt.Errorf("Failed to read hashes from Git log: %s",
err) |
| 473 } |
| 474 |
| 475 hashesTested, err := gitCommitsWithTestData(service) |
| 476 if err != nil { |
| 477 return nil, fmt.Errorf("Failed to read hashes from BigQuery: %s"
, err) |
| 478 } |
| 479 |
| 480 // Order the git hashes by commit log order. |
| 481 commits := make([]Commit, 0, len(hashesTested)) |
| 482 for i := len(allGitHashes) - 1; i >= 0; i-- { |
| 483 h := allGitHashes[i] |
| 484 if _, ok := hashesTested[h.Hash]; ok { |
| 485 commits = append(commits, Commit{Hash: h.Hash, CommitTim
e: h.TimeStamp}) |
| 486 } |
| 487 } |
| 488 |
| 489 // The number of samples that appear in each trace. |
| 490 numSamples := len(commits) |
| 491 |
| 492 // A mapping of Git hashes to where they appear in the Commits array, al
so the index |
| 493 // at which a measurement gets stored in the Values array. |
| 494 hashToIndex := make(map[string]int) |
| 495 for i, commit := range commits { |
| 496 hashToIndex[commit.Hash] = i |
| 497 } |
| 498 |
| 499 all := &AllData{ |
| 500 Traces: make([]Trace, 0, 0), |
| 501 ParamSet: make(map[string]Choices), |
| 502 Commits: commits, |
| 503 } |
| 504 |
| 505 if err := populateTraces(service, all, hashToIndex, numSamples); err !=
nil { |
| 506 // Fail fast, monit will restart us if we fail for some reason. |
| 507 panic(err) |
| 508 } |
| 509 |
| 510 populateParamSet(all) |
| 511 |
| 512 return &Data{all: all}, nil |
| 513 } |
OLD | NEW |