OLD | NEW |
(Empty) | |
| 1 package frontend |
| 2 |
| 3 import ( |
| 4 "bytes" |
| 5 "encoding/json" |
| 6 "errors" |
| 7 "io" |
| 8 "mime/multipart" |
| 9 "net/http" |
| 10 "strconv" |
| 11 "sync" |
| 12 |
| 13 "golang.org/x/net/context" |
| 14 |
| 15 "github.com/luci/gae/service/datastore" |
| 16 "github.com/luci/luci-go/server/router" |
| 17 |
| 18 "infra/appengine/test-results/model" |
| 19 ) |
| 20 |
| 21 type statusError struct { |
| 22 error |
| 23 code int |
| 24 } |
| 25 |
| 26 // MarshalJSON marshals status error to JSON. |
| 27 func (se *statusError) MarshalJSON() ([]byte, error) { |
| 28 m := map[string]interface{}{} |
| 29 |
| 30 if se == nil || se.error == nil { |
| 31 return json.Marshal(m) |
| 32 } |
| 33 |
| 34 m["error"] = se.Error() |
| 35 m["status"] = se.code |
| 36 return json.Marshal(m) |
| 37 } |
| 38 |
| 39 // UploadParams is the multipart form values in a |
| 40 // TestFile upload request. |
| 41 type UploadParams struct { |
| 42 Master string |
| 43 // DeprecatedMaster is set when master.Name was provided |
| 44 // in the request, instead of master.Identifer. |
| 45 DeprecatedMaster string |
| 46 Builder string |
| 47 TestType string |
| 48 } |
| 49 |
| 50 type contextKey int |
| 51 |
| 52 const uploadContextKey = contextKey(0) |
| 53 |
| 54 // GetUploadParams returns the UploadParams from the context if |
| 55 // present or nil otherwise. |
| 56 func GetUploadParams(c context.Context) *UploadParams { |
| 57 if v := c.Value(uploadContextKey); v != nil { |
| 58 return v.(*UploadParams) |
| 59 } |
| 60 return nil |
| 61 } |
| 62 |
| 63 // SetUploadParams returns a new context with the supplied |
| 64 // UploadParams added to it. |
| 65 func SetUploadParams(c context.Context, p *UploadParams) context.Context { |
| 66 return context.WithValue(c, uploadContextKey, p) |
| 67 } |
| 68 |
| 69 // withParsedUploadForm is middleware that verifies and adds |
| 70 // multipart form upload data to the context. |
| 71 // |
| 72 // If there is an error parsing the form or required |
| 73 // values are missing, WithParsed writes the HTTP error |
| 74 // to the response writer and stops execution of the request. |
| 75 func withParsedUploadForm(ctx *router.Context, next router.Handler) { |
| 76 w, r := ctx.Writer, ctx.Request |
| 77 const _1M = 1 << 20 |
| 78 |
| 79 if err := r.ParseMultipartForm(_1M); err != nil { |
| 80 http.Error(w, err.Error(), http.StatusInternalServerError) |
| 81 return |
| 82 } |
| 83 |
| 84 u := &UploadParams{} |
| 85 |
| 86 if v := r.MultipartForm.Value["master"]; len(v) > 0 { |
| 87 if m := model.MasterByName(v[0]); m != nil { |
| 88 u.Master = m.Identifier |
| 89 u.DeprecatedMaster = v[0] |
| 90 } else { |
| 91 u.Master = v[0] |
| 92 } |
| 93 } |
| 94 |
| 95 if v := r.MultipartForm.Value["builder"]; len(v) > 0 { |
| 96 u.Builder = v[0] |
| 97 } else { |
| 98 http.Error(w, "missing builder", http.StatusBadRequest) |
| 99 return |
| 100 } |
| 101 |
| 102 if v := r.MultipartForm.Value["testtype"]; len(v) > 0 { |
| 103 u.TestType = cleanTestType(v[0]) |
| 104 } |
| 105 |
| 106 if _, ok := r.MultipartForm.File["file"]; !ok { |
| 107 http.Error(w, "missing file", http.StatusBadRequest) |
| 108 return |
| 109 } |
| 110 |
| 111 ctx.Context = SetUploadParams(ctx.Context, u) |
| 112 next(ctx) |
| 113 } |
| 114 |
| 115 // uploadHandler is the HTTP handler for upload |
| 116 // requests. |
| 117 func uploadHandler(ctx *router.Context) { |
| 118 c, w, r := ctx.Context, ctx.Writer, ctx.Request |
| 119 fileheaders := r.MultipartForm.File["file"] |
| 120 |
| 121 for _, fh := range fileheaders { |
| 122 if err := doFileUpload(c, fh); err != nil { |
| 123 code := http.StatusInternalServerError |
| 124 if se, ok := err.(statusError); ok { |
| 125 code = se.code |
| 126 } |
| 127 http.Error(w, err.Error(), code) |
| 128 return |
| 129 } |
| 130 } |
| 131 |
| 132 io.WriteString(w, "OK") |
| 133 } |
| 134 |
| 135 func doFileUpload(c context.Context, fh *multipart.FileHeader) error { |
| 136 file, err := fh.Open() |
| 137 if err != nil { |
| 138 return statusError{err, http.StatusInternalServerError} |
| 139 } |
| 140 defer file.Close() |
| 141 |
| 142 switch fh.Filename { |
| 143 case "incremental_results.json": |
| 144 var incr model.AggregateResult |
| 145 if err := json.NewDecoder(file).Decode(&incr); err != nil { |
| 146 return statusError{err, http.StatusBadRequest} |
| 147 } |
| 148 return updateIncremental(c, &incr) |
| 149 case "full_results.json": |
| 150 return updateFullResults(c, file) |
| 151 default: |
| 152 return uploadTestFile(c, file, fh.Filename) |
| 153 } |
| 154 } |
| 155 |
| 156 // uploadTestFile creates a new TestFile from the values in context |
| 157 // and supplied data, and puts it to the datastore. |
| 158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| 159 var buf bytes.Buffer |
| 160 tee := io.TeeReader(data, &buf) |
| 161 |
| 162 aux := struct { |
| 163 N string `json:"build_number,omitempty"` |
| 164 }{} |
| 165 |
| 166 dec := json.NewDecoder(tee) |
| 167 if err := dec.Decode(&aux); err != nil { |
| 168 return statusError{err, http.StatusInternalServerError} |
| 169 } |
| 170 |
| 171 bn := 0 |
| 172 |
| 173 if aux.N != "" { |
| 174 n, err := strconv.Atoi(aux.N) |
| 175 if err != nil { |
| 176 return statusError{errors.New("invalid build_number"), h
ttp.StatusBadRequest} |
| 177 } |
| 178 bn = n |
| 179 } |
| 180 |
| 181 p := GetUploadParams(c) |
| 182 tf := model.TestFile{ |
| 183 Master: p.Master, |
| 184 Builder: p.Builder, |
| 185 TestType: p.TestType, |
| 186 BuildNumber: model.BuildNum(bn), |
| 187 Name: filename, |
| 188 Data: io.MultiReader(&buf, dec.Buffered()), |
| 189 } |
| 190 if err := tf.PutData(c); err != nil { |
| 191 return statusError{err, http.StatusInternalServerError} |
| 192 } |
| 193 return nil |
| 194 } |
| 195 |
| 196 // updateFullResults puts the supplied data as "full_results.json" |
| 197 // to the datastore, and updates corresponding "results.json" and |
| 198 // "results-small.json" files in the datastore. |
| 199 // |
| 200 // The supplied data should unmarshal into model.FullResults. |
| 201 // Otherwise, an error is returned. |
| 202 func updateFullResults(c context.Context, data io.Reader) error { |
| 203 buf := &bytes.Buffer{} |
| 204 tee := io.TeeReader(data, buf) |
| 205 dec := json.NewDecoder(tee) |
| 206 |
| 207 var f model.FullResult |
| 208 if err := dec.Decode(&f); err != nil { |
| 209 return statusError{err, http.StatusBadRequest} |
| 210 } |
| 211 |
| 212 wg := sync.WaitGroup{} |
| 213 errCh := make(chan error, 2) |
| 214 |
| 215 wg.Add(1) |
| 216 go func() { |
| 217 defer wg.Done() |
| 218 errCh <- uploadTestFile( |
| 219 c, io.MultiReader(buf, dec.Buffered()), "full_results.js
on", |
| 220 ) |
| 221 }() |
| 222 |
| 223 wg.Add(1) |
| 224 go func() { |
| 225 defer wg.Done() |
| 226 incr, err := f.AggregateResult() |
| 227 if err != nil { |
| 228 errCh <- statusError{err, http.StatusBadRequest} |
| 229 return |
| 230 } |
| 231 errCh <- updateIncremental(c, &incr) |
| 232 }() |
| 233 |
| 234 wg.Wait() |
| 235 close(errCh) |
| 236 for err := range errCh { |
| 237 if err != nil { |
| 238 return err |
| 239 } |
| 240 } |
| 241 return nil |
| 242 } |
| 243 |
| 244 // updateIncremental gets "results.json" and "results-small.json" |
| 245 // for values in context, merges incr into them, and puts the updated |
| 246 // files to the datastore. |
| 247 func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
| 248 u := GetUploadParams(c) |
| 249 p := model.TestFileParams{ |
| 250 Master: u.Master, |
| 251 Builder: u.Builder, |
| 252 TestType: u.TestType, |
| 253 } |
| 254 |
| 255 names := []string{"results.json", "results-small.json"} |
| 256 files := make([]struct { |
| 257 tf *model.TestFile |
| 258 aggr *model.AggregateResult |
| 259 err error |
| 260 }, len(names)) |
| 261 |
| 262 wg := sync.WaitGroup{} |
| 263 |
| 264 for i, name := range names { |
| 265 i, name := i, name |
| 266 wg.Add(1) |
| 267 go func() { |
| 268 defer wg.Done() |
| 269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
| 270 if err != nil { |
| 271 if _, ok := err.(ErrNoMatches); ok { |
| 272 files[i].tf = &model.TestFile{ |
| 273 Master: p.Master, |
| 274 Builder: p.Builder, |
| 275 TestType: p.TestType, |
| 276 BuildNumber: -1, |
| 277 Name: name, |
| 278 } |
| 279 } else { |
| 280 files[i].err = err |
| 281 } |
| 282 return |
| 283 } |
| 284 files[i].tf = tf |
| 285 if err := tf.GetData(c); err != nil { |
| 286 files[i].err = err |
| 287 return |
| 288 } |
| 289 if err := json.NewDecoder(tf.Data).Decode(files[i].aggr)
; err != nil { |
| 290 files[i].err = err |
| 291 return |
| 292 } |
| 293 }() |
| 294 } |
| 295 |
| 296 wg.Wait() |
| 297 for _, file := range files { |
| 298 if file.err != nil { |
| 299 return file.err |
| 300 } |
| 301 } |
| 302 |
| 303 wg = sync.WaitGroup{} |
| 304 errs := make([]error, len(files)) |
| 305 |
| 306 return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
| 307 for i, file := range files { |
| 308 i, file := i, file |
| 309 wg.Add(1) |
| 310 go func() { |
| 311 defer wg.Done() |
| 312 errs[i] = updateAggregate(c, file.tf, file.aggr,
incr) |
| 313 }() |
| 314 } |
| 315 |
| 316 wg.Wait() |
| 317 // Prioritize returning http.StatusInternalServerError status |
| 318 // code errors over other errors. |
| 319 var e error |
| 320 for _, err := range errs { |
| 321 se, ok := err.(statusError) |
| 322 if ok && se.code == http.StatusInternalServerError { |
| 323 return se |
| 324 } |
| 325 e = err |
| 326 } |
| 327 return e |
| 328 }, &datastore.TransactionOptions{XG: true}) |
| 329 } |
| 330 |
| 331 // getTestFileAlt returns the the first TestFile in the datastore for |
| 332 // the query formed by calling p.Query(). |
| 333 // |
| 334 // The function tries to find the first TestFile using p. If no such TestFile |
| 335 // exists the function sets p.Master to altMaster and tries again. |
| 336 // If altMaster is empty, the function does not perform the additional try. |
| 337 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string)
(ret *model.TestFile, err error) { |
| 338 a, err := getFirstTestFile(c, p.Query()) |
| 339 if err == nil { |
| 340 return a, nil |
| 341 } |
| 342 if _, ok := err.(ErrNoMatches); ok && altMaster == "" { |
| 343 return nil, err |
| 344 } |
| 345 |
| 346 origMaster := p.Master |
| 347 p.Master = altMaster |
| 348 |
| 349 a, err = getFirstTestFile(c, p.Query()) |
| 350 if err == nil { |
| 351 a.Master = origMaster |
| 352 return a, nil |
| 353 } |
| 354 |
| 355 return nil, err |
| 356 } |
| 357 |
| 358 // updateAggregate updates tf with the result of merging incr into |
| 359 // aggr. |
| 360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
gregateResult) error { |
| 361 if !model.IsAggregateTestFile(tf.Name) { |
| 362 return errors.New("frontend: tf should be an aggregate test file
") |
| 363 } |
| 364 |
| 365 size := model.ResultsSize |
| 366 if tf.Name == "results-small.json" { |
| 367 size = model.ResultsSmallSize |
| 368 } |
| 369 |
| 370 if aggr == nil { |
| 371 aggr = incr |
| 372 } else { |
| 373 if err := aggr.Merge(incr); err != nil { |
| 374 switch err { |
| 375 case model.ErrBuilderNameConflict: |
| 376 return statusError{err, http.StatusBadRequest} |
| 377 case model.ErrBuildNumberConflict: |
| 378 return statusError{err, http.StatusConflict} |
| 379 default: |
| 380 return statusError{err, http.StatusInternalServe
rError} |
| 381 } |
| 382 } |
| 383 } |
| 384 |
| 385 if err := aggr.Trim(size); err != nil { |
| 386 return statusError{err, http.StatusInternalServerError} |
| 387 } |
| 388 |
| 389 b := &bytes.Buffer{} |
| 390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { |
| 391 return statusError{err, http.StatusInternalServerError} |
| 392 } |
| 393 |
| 394 tf.Data = b |
| 395 if err := tf.PutData(c); err != nil { |
| 396 return statusError{err, http.StatusInternalServerError} |
| 397 } |
| 398 |
| 399 return nil |
| 400 } |
OLD | NEW |