Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(396)

Side by Side Diff: go/src/infra/appengine/test-results/frontend/upload.go

Issue 2240473004: test-results: package frontend: add upload handler (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Address estaab@ comments, set dependent CL Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 package frontend
2
3 import (
4 "bytes"
5 "encoding/json"
6 "errors"
7 "io"
8 "mime/multipart"
9 "net/http"
10 "strconv"
11 "sync"
12
13 "golang.org/x/net/context"
14
15 "github.com/luci/gae/service/datastore"
16 "github.com/luci/luci-go/server/router"
17
18 "infra/appengine/test-results/model"
19 )
20
21 type statusError struct {
22 error
23 code int
24 }
25
26 // MarshalJSON marshals status error to JSON.
27 func (se *statusError) MarshalJSON() ([]byte, error) {
28 m := map[string]interface{}{}
29
30 if se == nil || se.error == nil {
31 return json.Marshal(m)
32 }
33
34 m["error"] = se.Error()
35 m["status"] = se.code
36 return json.Marshal(m)
37 }
38
39 // UploadParams is the multipart form values in a
40 // TestFile upload request.
41 type UploadParams struct {
42 Master string
43 // DeprecatedMaster is set when master.Name was provided
44 // in the request, instead of master.Identifer.
45 DeprecatedMaster string
46 Builder string
47 TestType string
48 }
49
50 type contextKey int
51
52 const uploadContextKey = contextKey(0)
53
54 // GetUploadParams returns the UploadParams from the context if
55 // present or nil otherwise.
56 func GetUploadParams(c context.Context) *UploadParams {
57 if v := c.Value(uploadContextKey); v != nil {
58 return v.(*UploadParams)
59 }
60 return nil
61 }
62
63 // SetUploadParams returns a new context with the supplied
64 // UploadParams added to it.
65 func SetUploadParams(c context.Context, p *UploadParams) context.Context {
66 return context.WithValue(c, uploadContextKey, p)
67 }
68
69 // withParsedUploadForm is middleware that verifies and adds
70 // multipart form upload data to the context.
71 //
72 // If there is an error parsing the form or required
73 // values are missing, WithParsed writes the HTTP error
74 // to the response writer and stops execution of the request.
75 func withParsedUploadForm(ctx *router.Context, next router.Handler) {
76 w, r := ctx.Writer, ctx.Request
77 const _1M = 1 << 20
78
79 if err := r.ParseMultipartForm(_1M); err != nil {
80 http.Error(w, err.Error(), http.StatusInternalServerError)
81 return
82 }
83
84 u := &UploadParams{}
85
86 if v := r.MultipartForm.Value["master"]; len(v) > 0 {
87 if m := model.MasterByName(v[0]); m != nil {
88 u.Master = m.Identifier
89 u.DeprecatedMaster = v[0]
90 } else {
91 u.Master = v[0]
92 }
93 }
94
95 if v := r.MultipartForm.Value["builder"]; len(v) > 0 {
96 u.Builder = v[0]
97 } else {
98 http.Error(w, "missing builder", http.StatusBadRequest)
99 return
100 }
101
102 if v := r.MultipartForm.Value["testtype"]; len(v) > 0 {
103 u.TestType = cleanTestType(v[0])
104 }
105
106 if _, ok := r.MultipartForm.File["file"]; !ok {
107 http.Error(w, "missing file", http.StatusBadRequest)
108 return
109 }
110
111 ctx.Context = SetUploadParams(ctx.Context, u)
112 next(ctx)
113 }
114
115 // uploadHandler is the HTTP handler for upload
116 // requests.
117 func uploadHandler(ctx *router.Context) {
118 c, w, r := ctx.Context, ctx.Writer, ctx.Request
119 fileheaders := r.MultipartForm.File["file"]
120
121 for _, fh := range fileheaders {
122 if err := doFileUpload(c, fh); err != nil {
123 code := http.StatusInternalServerError
124 if se, ok := err.(statusError); ok {
125 code = se.code
126 }
127 http.Error(w, err.Error(), code)
128 return
129 }
130 }
131
132 io.WriteString(w, "OK")
133 }
134
135 func doFileUpload(c context.Context, fh *multipart.FileHeader) error {
136 file, err := fh.Open()
137 if err != nil {
138 return statusError{err, http.StatusInternalServerError}
139 }
140 defer file.Close()
141
142 switch fh.Filename {
143 case "incremental_results.json":
144 var incr model.AggregateResult
145 if err := json.NewDecoder(file).Decode(&incr); err != nil {
146 return statusError{err, http.StatusBadRequest}
147 }
148 return updateIncremental(c, &incr)
149 case "full_results.json":
150 return updateFullResults(c, file)
151 default:
152 return uploadTestFile(c, file, fh.Filename)
153 }
154 }
155
156 // uploadTestFile creates a new TestFile from the values in context
157 // and supplied data, and puts it to the datastore.
158 func uploadTestFile(c context.Context, data io.Reader, filename string) error {
159 var buf bytes.Buffer
160 tee := io.TeeReader(data, &buf)
161
162 aux := struct {
163 N string `json:"build_number,omitempty"`
164 }{}
165
166 dec := json.NewDecoder(tee)
167 if err := dec.Decode(&aux); err != nil {
168 return statusError{err, http.StatusInternalServerError}
169 }
170
171 bn := 0
172
173 if aux.N != "" {
174 n, err := strconv.Atoi(aux.N)
175 if err != nil {
176 return statusError{errors.New("invalid build_number"), h ttp.StatusBadRequest}
177 }
178 bn = n
179 }
180
181 p := GetUploadParams(c)
182 tf := model.TestFile{
183 Master: p.Master,
184 Builder: p.Builder,
185 TestType: p.TestType,
186 BuildNumber: model.BuildNum(bn),
187 Name: filename,
188 Data: io.MultiReader(&buf, dec.Buffered()),
189 }
190 if err := tf.PutData(c); err != nil {
191 return statusError{err, http.StatusInternalServerError}
192 }
193 return nil
194 }
195
196 // updateFullResults puts the supplied data as "full_results.json"
197 // to the datastore, and updates corresponding "results.json" and
198 // "results-small.json" files in the datastore.
199 //
200 // The supplied data should unmarshal into model.FullResults.
201 // Otherwise, an error is returned.
202 func updateFullResults(c context.Context, data io.Reader) error {
203 buf := &bytes.Buffer{}
204 tee := io.TeeReader(data, buf)
205 dec := json.NewDecoder(tee)
206
207 var f model.FullResult
208 if err := dec.Decode(&f); err != nil {
209 return statusError{err, http.StatusBadRequest}
210 }
211
212 wg := sync.WaitGroup{}
213 errCh := make(chan error, 2)
214
215 wg.Add(1)
216 go func() {
217 defer wg.Done()
218 errCh <- uploadTestFile(
219 c, io.MultiReader(buf, dec.Buffered()), "full_results.js on",
220 )
221 }()
222
223 wg.Add(1)
224 go func() {
225 defer wg.Done()
226 incr, err := f.AggregateResult()
227 if err != nil {
228 errCh <- statusError{err, http.StatusBadRequest}
229 return
230 }
231 errCh <- updateIncremental(c, &incr)
232 }()
233
234 wg.Wait()
235 close(errCh)
236 for err := range errCh {
237 if err != nil {
238 return err
239 }
240 }
241 return nil
242 }
243
244 // updateIncremental gets "results.json" and "results-small.json"
245 // for values in context, merges incr into them, and puts the updated
246 // files to the datastore.
247 func updateIncremental(c context.Context, incr *model.AggregateResult) error {
248 u := GetUploadParams(c)
249 p := model.TestFileParams{
250 Master: u.Master,
251 Builder: u.Builder,
252 TestType: u.TestType,
253 }
254
255 names := []string{"results.json", "results-small.json"}
256 files := make([]struct {
257 tf *model.TestFile
258 aggr *model.AggregateResult
259 err error
260 }, len(names))
261
262 wg := sync.WaitGroup{}
263
264 for i, name := range names {
265 i, name := i, name
266 wg.Add(1)
267 go func() {
268 defer wg.Done()
269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster)
270 if err != nil {
271 if _, ok := err.(ErrNoMatches); ok {
272 files[i].tf = &model.TestFile{
273 Master: p.Master,
274 Builder: p.Builder,
275 TestType: p.TestType,
276 BuildNumber: -1,
277 Name: name,
278 }
279 } else {
280 files[i].err = err
281 }
282 return
283 }
284 files[i].tf = tf
285 if err := tf.GetData(c); err != nil {
286 files[i].err = err
287 return
288 }
289 if err := json.NewDecoder(tf.Data).Decode(files[i].aggr) ; err != nil {
290 files[i].err = err
291 return
292 }
293 }()
294 }
295
296 wg.Wait()
297 for _, file := range files {
298 if file.err != nil {
299 return file.err
300 }
301 }
302
303 wg = sync.WaitGroup{}
304 errs := make([]error, len(files))
305
306 return datastore.Get(c).RunInTransaction(func(c context.Context) error {
307 for i, file := range files {
308 i, file := i, file
309 wg.Add(1)
310 go func() {
311 defer wg.Done()
312 errs[i] = updateAggregate(c, file.tf, file.aggr, incr)
313 }()
314 }
315
316 wg.Wait()
317 // Prioritize returning http.StatusInternalServerError status
318 // code errors over other errors.
319 var e error
320 for _, err := range errs {
321 se, ok := err.(statusError)
322 if ok && se.code == http.StatusInternalServerError {
323 return se
324 }
325 e = err
326 }
327 return e
328 }, &datastore.TransactionOptions{XG: true})
329 }
330
331 // getTestFileAlt returns the the first TestFile in the datastore for
332 // the query formed by calling p.Query().
333 //
334 // The function tries to find the first TestFile using p. If no such TestFile
335 // exists the function sets p.Master to altMaster and tries again.
336 // If altMaster is empty, the function does not perform the additional try.
337 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) {
338 a, err := getFirstTestFile(c, p.Query())
339 if err == nil {
340 return a, nil
341 }
342 if _, ok := err.(ErrNoMatches); ok && altMaster == "" {
343 return nil, err
344 }
345
346 origMaster := p.Master
347 p.Master = altMaster
348
349 a, err = getFirstTestFile(c, p.Query())
350 if err == nil {
351 a.Master = origMaster
352 return a, nil
353 }
354
355 return nil, err
356 }
357
358 // updateAggregate updates tf with the result of merging incr into
359 // aggr.
360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error {
361 if !model.IsAggregateTestFile(tf.Name) {
362 return errors.New("frontend: tf should be an aggregate test file ")
363 }
364
365 size := model.ResultsSize
366 if tf.Name == "results-small.json" {
367 size = model.ResultsSmallSize
368 }
369
370 if aggr == nil {
371 aggr = incr
372 } else {
373 if err := aggr.Merge(incr); err != nil {
374 switch err {
375 case model.ErrBuilderNameConflict:
376 return statusError{err, http.StatusBadRequest}
377 case model.ErrBuildNumberConflict:
378 return statusError{err, http.StatusConflict}
379 default:
380 return statusError{err, http.StatusInternalServe rError}
381 }
382 }
383 }
384
385 if err := aggr.Trim(size); err != nil {
386 return statusError{err, http.StatusInternalServerError}
387 }
388
389 b := &bytes.Buffer{}
390 if err := json.NewEncoder(b).Encode(&aggr); err != nil {
391 return statusError{err, http.StatusInternalServerError}
392 }
393
394 tf.Data = b
395 if err := tf.PutData(c); err != nil {
396 return statusError{err, http.StatusInternalServerError}
397 }
398
399 return nil
400 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698