Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 package frontend | |
| 2 | |
| 3 import ( | |
| 4 "bytes" | |
| 5 "encoding/json" | |
| 6 "errors" | |
| 7 "io" | |
| 8 "mime/multipart" | |
| 9 "net/http" | |
| 10 "strconv" | |
| 11 "strings" | |
| 12 "sync" | |
| 13 | |
| 14 "golang.org/x/net/context" | |
| 15 | |
| 16 "github.com/luci/gae/service/datastore" | |
| 17 "github.com/luci/luci-go/server/router" | |
| 18 | |
| 19 "infra/appengine/test-results/model" | |
| 20 ) | |
| 21 | |
| 22 type statusError struct { | |
| 23 error | |
| 24 code int | |
| 25 } | |
| 26 | |
| 27 func (se *statusError) StatusCode() int { | |
| 28 return se.code | |
| 29 } | |
| 30 | |
| 31 // MarshalJSON marshals status error to JSON. | |
| 32 func (se *statusError) MarshalJSON() ([]byte, error) { | |
| 33 m := map[string]interface{}{} | |
| 34 | |
| 35 if se == nil || se.error == nil { | |
| 36 return json.Marshal(m) | |
| 37 } | |
| 38 | |
| 39 m["error"] = se.Error() | |
| 40 m["status"] = se.code | |
| 41 return json.Marshal(m) | |
| 42 } | |
| 43 | |
| 44 // UploadParams is the multipart form values in a | |
| 45 // TestFile upload request. | |
| 46 type UploadParams struct { | |
| 47 Master, DeprMaster, Builder, TestType string | |
|
estaab
2016/08/12 15:29:07
What's DeprMaster? perhaps document since it's not
nishanths
2016/08/16 05:16:29
Done.
| |
| 48 } | |
| 49 | |
| 50 type contextKey int | |
| 51 | |
| 52 const uploadContextKey = contextKey(0) | |
| 53 | |
| 54 // GetUploadParams returns the UploadParams from the context if | |
| 55 // present or nil otherwise. | |
| 56 func GetUploadParams(c context.Context) *UploadParams { | |
| 57 if v := c.Value(uploadContextKey); v != nil { | |
| 58 return v.(*UploadParams) | |
| 59 } | |
| 60 return nil | |
| 61 } | |
| 62 | |
| 63 // SetUploadParams returns a new context with the supplied | |
| 64 // UploadParams added to it. | |
| 65 func SetUploadParams(c context.Context, p *UploadParams) context.Context { | |
| 66 return context.WithValue(c, uploadContextKey, p) | |
| 67 } | |
| 68 | |
| 69 func cleanTestType(test string) string { | |
|
estaab
2016/08/12 15:29:07
isn't this also in builders.go? Would there be a w
nishanths
2016/08/16 05:16:29
Done. Removed from here. Both instances are in the
| |
| 70 patched := false | |
| 71 if strings.Contains(test, " (with patch)") { | |
| 72 patched = true | |
| 73 test = strings.Replace(test, " (with patch)", "", 1) | |
| 74 } | |
| 75 if strings.HasPrefix(test, "Instrumentation test ") { | |
| 76 test = test[len("Instrumentation test "):] | |
| 77 } | |
| 78 if i := strings.Index(test, " "); i != -1 { | |
| 79 test = test[:i] | |
| 80 } | |
| 81 if !patched { | |
| 82 return test | |
| 83 } | |
| 84 return test + " (with patch)" | |
| 85 } | |
| 86 | |
| 87 // withParsedUploadForm is middleware that verifies and adds | |
| 88 // multipart form upload data to the context. | |
| 89 // | |
| 90 // If there is an error parsing the form or required | |
| 91 // values are missing, WithParsed writes the HTTP error | |
| 92 // to the response writer and stops execution of the request. | |
| 93 func withParsedUploadForm(ctx *router.Context, next router.Handler) { | |
| 94 w, r := ctx.Writer, ctx.Request | |
| 95 const _1M = 1 << 20 | |
| 96 | |
| 97 if err := r.ParseMultipartForm(_1M); err != nil { | |
| 98 http.Error(w, err.Error(), http.StatusInternalServerError) | |
| 99 return | |
| 100 } | |
| 101 | |
| 102 u := &UploadParams{} | |
| 103 | |
| 104 if v := r.MultipartForm.Value["master"]; len(v) > 0 { | |
| 105 if m := model.MasterByName(v[0]); m != nil { | |
| 106 u.Master = m.Identifier | |
| 107 u.DeprMaster = v[0] | |
| 108 } else { | |
| 109 u.Master = v[0] | |
| 110 } | |
| 111 } | |
| 112 | |
| 113 if v := r.MultipartForm.Value["builder"]; len(v) > 0 { | |
| 114 u.Builder = v[0] | |
| 115 } else { | |
| 116 http.Error(w, "missing builder", http.StatusBadRequest) | |
| 117 return | |
| 118 } | |
| 119 | |
| 120 if v := r.MultipartForm.Value["testtype"]; len(v) > 0 { | |
| 121 u.TestType = cleanTestType(v[0]) | |
| 122 } | |
| 123 | |
| 124 if _, ok := r.MultipartForm.File["file"]; !ok { | |
| 125 http.Error(w, "missing file", http.StatusBadRequest) | |
| 126 return | |
| 127 } | |
| 128 | |
| 129 ctx.Context = SetUploadParams(ctx.Context, u) | |
| 130 next(ctx) | |
| 131 } | |
| 132 | |
| 133 // uploadHandler is the HTTP handler for upload | |
| 134 // requests. | |
| 135 func uploadHandler(ctx *router.Context) { | |
| 136 c, w, r := ctx.Context, ctx.Writer, ctx.Request | |
| 137 fileheaders := r.MultipartForm.File["file"] | |
| 138 | |
| 139 for _, fh := range fileheaders { | |
| 140 if err := doFileUpload(c, fh); err != nil { | |
| 141 code := http.StatusInternalServerError | |
| 142 if se, ok := err.(statusError); ok { | |
| 143 code = se.code | |
| 144 } | |
| 145 http.Error(w, err.Error(), code) | |
| 146 return | |
| 147 } | |
| 148 } | |
| 149 | |
| 150 io.WriteString(w, "OK") | |
| 151 } | |
| 152 | |
| 153 func doFileUpload(c context.Context, fh *multipart.FileHeader) error { | |
| 154 file, err := fh.Open() | |
| 155 if err != nil { | |
| 156 return statusError{err, http.StatusInternalServerError} | |
| 157 } | |
| 158 defer file.Close() | |
| 159 | |
| 160 switch fh.Filename { | |
| 161 case "incremental_results.json": | |
| 162 var incr model.AggregateResult | |
| 163 if err := json.NewDecoder(file).Decode(&incr); err != nil { | |
| 164 return statusError{err, http.StatusBadRequest} | |
| 165 } | |
| 166 return updateIncremental(c, &incr) | |
| 167 case "full_results.json": | |
| 168 return updateFullResults(c, file) | |
| 169 default: | |
| 170 return uploadTestFile(c, file, fh.Filename) | |
| 171 } | |
| 172 } | |
| 173 | |
| 174 // uploadTestFile creates a new TestFile from the values in context | |
| 175 // and supplied data, and puts it to the datastore. | |
| 176 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | |
| 177 var buf bytes.Buffer | |
| 178 tee := io.TeeReader(data, &buf) | |
| 179 | |
| 180 aux := struct { | |
| 181 N string `json:"build_number,omitempty"` | |
| 182 }{} | |
| 183 | |
| 184 dec := json.NewDecoder(tee) | |
| 185 if err := dec.Decode(&aux); err != nil { | |
| 186 return statusError{err, http.StatusInternalServerError} | |
| 187 } | |
| 188 | |
| 189 bn := 0 | |
| 190 | |
| 191 if aux.N != "" { | |
| 192 n, err := strconv.Atoi(aux.N) | |
| 193 if err != nil { | |
| 194 return statusError{errors.New("invalid build_number"), h ttp.StatusBadRequest} | |
| 195 } | |
| 196 bn = n | |
| 197 } | |
| 198 | |
| 199 p := GetUploadParams(c) | |
| 200 tf := model.TestFile{ | |
| 201 Master: p.Master, | |
| 202 Builder: p.Builder, | |
| 203 TestType: p.TestType, | |
| 204 BuildNumber: model.BuildNum(bn), | |
| 205 Name: filename, | |
| 206 Data: io.MultiReader(&buf, dec.Buffered()), | |
| 207 } | |
| 208 if err := tf.PutData(c); err != nil { | |
| 209 return statusError{err, http.StatusInternalServerError} | |
| 210 } | |
| 211 return nil | |
| 212 } | |
| 213 | |
| 214 // updateFullResults puts the supplied data as "full_results.json" | |
| 215 // to the datastore, and updates corresponding "results.json" and | |
| 216 // "results-small.json" files in the datastore. | |
| 217 // | |
| 218 // The supplied data should unmarshal into model.FullResults. | |
| 219 // Otherwise, an error is returned. | |
| 220 func updateFullResults(c context.Context, data io.Reader) error { | |
| 221 buf := &bytes.Buffer{} | |
| 222 tee := io.TeeReader(data, buf) | |
| 223 dec := json.NewDecoder(tee) | |
| 224 | |
| 225 var f model.FullResult | |
| 226 if err := dec.Decode(&f); err != nil { | |
| 227 return statusError{err, http.StatusBadRequest} | |
| 228 } | |
| 229 | |
| 230 var wg sync.WaitGroup | |
| 231 errCh := make(chan error, 2) | |
| 232 | |
| 233 wg.Add(1) | |
| 234 go func() { | |
| 235 defer wg.Done() | |
| 236 errCh <- uploadTestFile( | |
| 237 c, io.MultiReader(buf, dec.Buffered()), "full_results.js on", | |
| 238 ) | |
| 239 }() | |
| 240 | |
| 241 wg.Add(1) | |
| 242 go func() { | |
| 243 defer wg.Done() | |
| 244 incr, err := f.AggregateResult() | |
| 245 if err != nil { | |
| 246 errCh <- statusError{err, http.StatusBadRequest} | |
| 247 return | |
| 248 } | |
| 249 errCh <- updateIncremental(c, &incr) | |
| 250 }() | |
| 251 | |
| 252 wg.Wait() | |
| 253 close(errCh) | |
| 254 for err := range errCh { | |
| 255 if err != nil { | |
| 256 return err | |
| 257 } | |
| 258 } | |
| 259 return nil | |
| 260 } | |
| 261 | |
| 262 // updateIncremental gets "results.json" and "results-small.json" | |
| 263 // for values in context, merges incr into them, and puts the updated | |
| 264 // files to the datastore. | |
| 265 func updateIncremental(c context.Context, incr *model.AggregateResult) error { | |
| 266 u := GetUploadParams(c) | |
| 267 p := model.TestFileParams{ | |
| 268 Master: u.Master, | |
| 269 Builder: u.Builder, | |
| 270 TestType: u.TestType, | |
| 271 } | |
| 272 | |
| 273 names := []string{"results.json", "results-small.json"} | |
| 274 files := make([]struct { | |
| 275 tf *model.TestFile | |
| 276 aggr *model.AggregateResult | |
| 277 err error | |
| 278 }, len(names)) | |
| 279 | |
| 280 var wg sync.WaitGroup | |
| 281 | |
| 282 for i, name := range names { | |
| 283 i, name := i, name | |
| 284 wg.Add(1) | |
| 285 go func() { | |
| 286 defer wg.Done() | |
| 287 tf, err := getTestFileAlt(c, p, u.DeprMaster) | |
| 288 if err != nil { | |
| 289 if _, ok := err.(ErrNoMatches); ok { | |
| 290 files[i].tf = &model.TestFile{ | |
| 291 Master: p.Master, | |
| 292 Builder: p.Builder, | |
| 293 TestType: p.TestType, | |
| 294 BuildNumber: -1, | |
| 295 Name: name, | |
| 296 } | |
| 297 } else { | |
| 298 files[i].err = err | |
| 299 } | |
| 300 return | |
| 301 } | |
| 302 files[i].tf = tf | |
| 303 if err := tf.GetData(c); err != nil { | |
| 304 files[i].err = err | |
| 305 return | |
| 306 } | |
| 307 if err := json.NewDecoder(tf.Data).Decode(files[i].aggr) ; err != nil { | |
| 308 files[i].err = err | |
| 309 return | |
| 310 } | |
| 311 }() | |
| 312 } | |
| 313 | |
| 314 wg.Wait() | |
| 315 for _, file := range files { | |
| 316 if file.err != nil { | |
| 317 return file.err | |
| 318 } | |
| 319 } | |
| 320 | |
| 321 errs := make([]error, len(files)) | |
| 322 | |
| 323 return datastore.Get(c).RunInTransaction(func(c context.Context) error { | |
| 324 for i, file := range files { | |
| 325 i, file := i, file | |
| 326 wg.Add(1) | |
|
estaab
2016/08/12 15:29:07
It seems a little messy to reuse the WaitGroup fro
nishanths
2016/08/16 05:16:29
Done.
| |
| 327 go func() { | |
| 328 defer wg.Done() | |
| 329 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) | |
| 330 }() | |
| 331 } | |
| 332 | |
| 333 wg.Wait() | |
| 334 // Prioritize returning http.StatusInternalServerError status | |
| 335 // code errors over other errors. | |
| 336 var e error | |
| 337 for _, err := range errs { | |
| 338 se, ok := err.(statusError) | |
| 339 if ok && se.code == http.StatusInternalServerError { | |
| 340 return se | |
| 341 } | |
| 342 e = err | |
| 343 } | |
| 344 return e | |
| 345 }, &datastore.TransactionOptions{XG: true}) | |
| 346 } | |
| 347 | |
| 348 // getTestFileAlt returns the the first TestFile in the datastore for | |
| 349 // the query formed by calling p.Query(). | |
| 350 // | |
| 351 // The function tries to find the first TestFile using p. If no such TestFile | |
| 352 // exists the function sets p.Master to altMaster and tries again. | |
| 353 // If altMaster is empty, the function does not perform the additional try. | |
| 354 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) { | |
| 355 a, err := getFirstTestFile(c, p.Query()) | |
| 356 if err == nil { | |
| 357 return a, nil | |
| 358 } | |
| 359 if _, ok := err.(ErrNoMatches); ok && altMaster == "" { | |
| 360 return nil, err | |
| 361 } | |
| 362 | |
| 363 origMaster := p.Master | |
| 364 p.Master = altMaster | |
| 365 | |
| 366 a, err = getFirstTestFile(c, p.Query()) | |
| 367 if err == nil { | |
| 368 a.Master = origMaster | |
| 369 return a, nil | |
| 370 } | |
| 371 | |
| 372 return nil, err | |
| 373 } | |
| 374 | |
| 375 // updateAggregate updates tf with the result of merging incr into | |
| 376 // aggr. | |
| 377 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { | |
| 378 if !model.IsAggregateTestFile(tf.Name) { | |
| 379 return errors.New("frontend: tf should be an aggregate test file ") | |
| 380 } | |
| 381 | |
| 382 size := model.ResultsSize | |
| 383 if tf.Name == "results-small.json" { | |
| 384 size = model.ResultsSmallSize | |
| 385 } | |
| 386 | |
| 387 if aggr == nil { | |
| 388 aggr = incr | |
| 389 } else { | |
| 390 if err := aggr.Merge(incr); err != nil { | |
| 391 switch err { | |
| 392 case model.ErrBuilderNameConflict: | |
| 393 return statusError{err, http.StatusBadRequest} | |
| 394 case model.ErrBuildNumberConflict: | |
| 395 return statusError{err, http.StatusConflict} | |
| 396 default: | |
| 397 return statusError{err, http.StatusInternalServe rError} | |
| 398 } | |
| 399 } | |
| 400 } | |
| 401 | |
| 402 if err := aggr.Trim(size); err != nil { | |
| 403 return statusError{err, http.StatusInternalServerError} | |
| 404 } | |
| 405 | |
| 406 b := &bytes.Buffer{} | |
| 407 if err := json.NewEncoder(b).Encode(&aggr); err != nil { | |
| 408 return statusError{err, http.StatusInternalServerError} | |
| 409 } | |
| 410 | |
| 411 tf.Data = b | |
| 412 if err := tf.PutData(c); err != nil { | |
| 413 return statusError{err, http.StatusInternalServerError} | |
| 414 } | |
| 415 | |
| 416 return nil | |
| 417 } | |
| OLD | NEW |