Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(558)

Side by Side Diff: go/src/infra/appengine/test-results/frontend/upload.go

Issue 2250043002: test-results: package frontend: Add delete keys task queue (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@xx_5
Patch Set: Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 package frontend 1 package frontend
2 2
3 import ( 3 import (
4 "bytes" 4 "bytes"
5 "encoding/json" 5 "encoding/json"
6 "errors" 6 "errors"
7 "io" 7 "io"
8 "mime/multipart" 8 "mime/multipart"
9 "net/http" 9 "net/http"
10 "net/url"
10 "strconv" 11 "strconv"
11 "sync" 12 "sync"
13 "time"
12 14
13 "golang.org/x/net/context" 15 "golang.org/x/net/context"
14 16
15 "github.com/luci/gae/service/datastore" 17 "github.com/luci/gae/service/datastore"
18 "github.com/luci/gae/service/taskqueue"
19 "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/server/router" 20 "github.com/luci/luci-go/server/router"
17 21
22 "infra/appengine/test-results/builderstate"
18 "infra/appengine/test-results/model" 23 "infra/appengine/test-results/model"
19 ) 24 )
20 25
21 type statusError struct { 26 type statusError struct {
22 error 27 error
23 code int 28 code int
24 } 29 }
25 30
26 // MarshalJSON marshals status error to JSON. 31 // MarshalJSON marshals status error to JSON.
27 func (se *statusError) MarshalJSON() ([]byte, error) { 32 func (se *statusError) MarshalJSON() ([]byte, error) {
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
139 } 144 }
140 defer file.Close() 145 defer file.Close()
141 146
142 switch fh.Filename { 147 switch fh.Filename {
143 case "incremental_results.json": 148 case "incremental_results.json":
144 var incr model.AggregateResult 149 var incr model.AggregateResult
145 if err := json.NewDecoder(file).Decode(&incr); err != nil { 150 if err := json.NewDecoder(file).Decode(&incr); err != nil {
146 return statusError{err, http.StatusBadRequest} 151 return statusError{err, http.StatusBadRequest}
147 } 152 }
148 return updateIncremental(c, &incr) 153 return updateIncremental(c, &incr)
154
149 case "full_results.json": 155 case "full_results.json":
150 » » return updateFullResults(c, file) 156 » » bn, data, err := extractBuildNumber(file)
157 » » if err != nil {
158 » » » if err == ErrInvalidBuildNumber {
159 » » » » return statusError{err, http.StatusBadRequest}
160 » » » }
161 » » » return statusError{err, http.StatusInternalServerError}
162 » » }
163 » » if err := updateFullResults(c, data); err != nil {
164 » » » return err
165 » » }
166
167 » » p := GetUploadParams(c)
168 » » go builderstate.Update(c, p.Master, p.Builder, p.TestType, time. Now().UTC())
Vadim Sh. 2016/08/16 18:56:13 you need to wait for these goroutines to finish so
nishanths 2016/08/16 20:35:13 Done.
169 » » go taskqueue.Get(c).Add(
170 » » » taskqueue.NewPOSTTask("/internal/monitoring/upload", url .Values{
Vadim Sh. 2016/08/16 18:56:13 does this handler exist?
nishanths 2016/08/16 20:35:13 It exists in the python implementation.
171 » » » » "master": {p.Master},
172 » » » » "builder": {p.Builder},
173 » » » » "build_number": {strconv.Itoa(bn)},
174 » » » » "test_type": {p.TestType},
175 » » » }),
176 » » » defaultQueueName,
177 » » )
178 » » return nil
179
151 default: 180 default:
152 return uploadTestFile(c, file, fh.Filename) 181 return uploadTestFile(c, file, fh.Filename)
153 } 182 }
154 } 183 }
155 184
156 // uploadTestFile creates a new TestFile from the values in context 185 // ErrInvalidBuildNumber is returned when the extractBuildNumber fails
157 // and supplied data, and puts it to the datastore. 186 // to convert the build number value to an int.
158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { 187 var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int")
188
189 // extractBuildNumber extracts the value of "build_number" key from
190 // the supplied JSON encoded data. The returned io.Reader will have
191 // the same contents as the supplied io.Reader.
192 //
193 // The error is ErrInvalidBuildNumber if the build number value
194 // could not be converted to an int.
195 func extractBuildNumber(data io.Reader) (int, io.Reader, error) {
159 var buf bytes.Buffer 196 var buf bytes.Buffer
160 tee := io.TeeReader(data, &buf) 197 tee := io.TeeReader(data, &buf)
161 198
162 aux := struct { 199 aux := struct {
163 N string `json:"build_number,omitempty"` 200 N string `json:"build_number,omitempty"`
164 }{} 201 }{}
165
166 dec := json.NewDecoder(tee) 202 dec := json.NewDecoder(tee)
167 if err := dec.Decode(&aux); err != nil { 203 if err := dec.Decode(&aux); err != nil {
168 » » return statusError{err, http.StatusInternalServerError} 204 » » return 0, io.MultiReader(&buf, dec.Buffered()), err
169 } 205 }
170 206
171 » bn := 0 207 » var bn int
172
173 if aux.N != "" { 208 if aux.N != "" {
174 n, err := strconv.Atoi(aux.N) 209 n, err := strconv.Atoi(aux.N)
175 if err != nil { 210 if err != nil {
176 » » » return statusError{errors.New("invalid build_number"), h ttp.StatusBadRequest} 211 » » » return 0, io.MultiReader(&buf, dec.Buffered()), ErrInval idBuildNumber
177 } 212 }
178 bn = n 213 bn = n
179 } 214 }
180 215
216 return bn, io.MultiReader(&buf, dec.Buffered()), nil
217 }
218
219 // uploadTestFile creates a new TestFile from the values in context
220 // and supplied data, and puts it to the datastore.
221 func uploadTestFile(c context.Context, data io.Reader, filename string) error {
222 bn, data, err := extractBuildNumber(data)
223 if err != nil {
224 if err == ErrInvalidBuildNumber {
225 return statusError{err, http.StatusBadRequest}
226 }
227 return statusError{err, http.StatusInternalServerError}
228 }
229
181 p := GetUploadParams(c) 230 p := GetUploadParams(c)
182 tf := model.TestFile{ 231 tf := model.TestFile{
183 Master: p.Master, 232 Master: p.Master,
184 Builder: p.Builder, 233 Builder: p.Builder,
185 TestType: p.TestType, 234 TestType: p.TestType,
186 BuildNumber: model.BuildNum(bn), 235 BuildNumber: model.BuildNum(bn),
187 Name: filename, 236 Name: filename,
188 » » Data: io.MultiReader(&buf, dec.Buffered()), 237 » » Data: data,
189 } 238 }
190 if err := tf.PutData(c); err != nil { 239 if err := tf.PutData(c); err != nil {
191 return statusError{err, http.StatusInternalServerError} 240 return statusError{err, http.StatusInternalServerError}
192 } 241 }
193 » return nil 242
243 » return datastore.Get(c).Put(&tf)
194 } 244 }
195 245
196 // updateFullResults puts the supplied data as "full_results.json" 246 // updateFullResults puts the supplied data as "full_results.json"
197 // to the datastore, and updates corresponding "results.json" and 247 // to the datastore, and updates corresponding "results.json" and
198 // "results-small.json" files in the datastore. 248 // "results-small.json" files in the datastore.
199 // 249 //
200 // The supplied data should unmarshal into model.FullResults. 250 // The supplied data should unmarshal into model.FullResults.
201 // Otherwise, an error is returned. 251 // Otherwise, an error is returned.
202 func updateFullResults(c context.Context, data io.Reader) error { 252 func updateFullResults(c context.Context, data io.Reader) error {
203 buf := &bytes.Buffer{} 253 buf := &bytes.Buffer{}
204 tee := io.TeeReader(data, buf) 254 tee := io.TeeReader(data, buf)
205 dec := json.NewDecoder(tee) 255 dec := json.NewDecoder(tee)
206 256
207 var f model.FullResult 257 var f model.FullResult
208 if err := dec.Decode(&f); err != nil { 258 if err := dec.Decode(&f); err != nil {
209 return statusError{err, http.StatusBadRequest} 259 return statusError{err, http.StatusBadRequest}
210 } 260 }
211 261
212 wg := sync.WaitGroup{} 262 wg := sync.WaitGroup{}
213 errCh := make(chan error, 2) 263 errCh := make(chan error, 2)
214 264
215 wg.Add(1) 265 wg.Add(1)
216 go func() { 266 go func() {
217 defer wg.Done() 267 defer wg.Done()
218 » » errCh <- uploadTestFile( 268 » » errCh <- uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_results.json")
219 » » » c, io.MultiReader(buf, dec.Buffered()), "full_results.js on",
220 » » )
221 }() 269 }()
222 270
223 wg.Add(1) 271 wg.Add(1)
224 go func() { 272 go func() {
225 defer wg.Done() 273 defer wg.Done()
226 incr, err := f.AggregateResult() 274 incr, err := f.AggregateResult()
227 if err != nil { 275 if err != nil {
228 errCh <- statusError{err, http.StatusBadRequest} 276 errCh <- statusError{err, http.StatusBadRequest}
229 return 277 return
230 } 278 }
(...skipping 24 matching lines...) Expand all
255 names := []string{"results.json", "results-small.json"} 303 names := []string{"results.json", "results-small.json"}
256 files := make([]struct { 304 files := make([]struct {
257 tf *model.TestFile 305 tf *model.TestFile
258 aggr *model.AggregateResult 306 aggr *model.AggregateResult
259 err error 307 err error
260 }, len(names)) 308 }, len(names))
261 309
262 wg := sync.WaitGroup{} 310 wg := sync.WaitGroup{}
263 311
264 for i, name := range names { 312 for i, name := range names {
265 » » i, name := i, name 313 » » i, name, p := i, name, p
266 wg.Add(1) 314 wg.Add(1)
315
267 go func() { 316 go func() {
268 defer wg.Done() 317 defer wg.Done()
318 p.Name = name
269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) 319 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster)
270 if err != nil { 320 if err != nil {
271 if _, ok := err.(ErrNoMatches); ok { 321 if _, ok := err.(ErrNoMatches); ok {
272 files[i].tf = &model.TestFile{ 322 files[i].tf = &model.TestFile{
273 Master: p.Master, 323 Master: p.Master,
274 Builder: p.Builder, 324 Builder: p.Builder,
275 TestType: p.TestType, 325 TestType: p.TestType,
276 BuildNumber: -1, 326 BuildNumber: -1,
277 Name: name, 327 Name: name,
278 } 328 }
279 » » » » } else { 329 » » » » » return
280 » » » » » files[i].err = err
281 } 330 }
331 files[i].err = err
282 return 332 return
283 } 333 }
284 files[i].tf = tf
285 if err := tf.GetData(c); err != nil { 334 if err := tf.GetData(c); err != nil {
286 files[i].err = err 335 files[i].err = err
287 return 336 return
288 } 337 }
289 » » » if err := json.NewDecoder(tf.Data).Decode(files[i].aggr) ; err != nil { 338 » » » var a model.AggregateResult
339 » » » if err := json.NewDecoder(tf.Data).Decode(&a); err != ni l {
290 files[i].err = err 340 files[i].err = err
291 return 341 return
292 } 342 }
343 files[i].tf = tf
344 files[i].aggr = &a
293 }() 345 }()
294 } 346 }
295 347
296 wg.Wait() 348 wg.Wait()
297 for _, file := range files { 349 for _, file := range files {
298 if file.err != nil { 350 if file.err != nil {
299 return file.err 351 return file.err
300 } 352 }
301 } 353 }
302 354
303 » wg = sync.WaitGroup{} 355 » return datastore.Get(c).RunInTransaction(func(c context.Context) error {
304 » errs := make([]error, len(files)) 356 » » wg = sync.WaitGroup{}
357 » » errs := make([]error, len(files))
305 358
306 return datastore.Get(c).RunInTransaction(func(c context.Context) error {
307 for i, file := range files { 359 for i, file := range files {
308 i, file := i, file 360 i, file := i, file
309 wg.Add(1) 361 wg.Add(1)
310 go func() { 362 go func() {
311 defer wg.Done() 363 defer wg.Done()
312 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) 364 errs[i] = updateAggregate(c, file.tf, file.aggr, incr)
313 }() 365 }()
314 } 366 }
315 367
316 wg.Wait() 368 wg.Wait()
317 // Prioritize returning http.StatusInternalServerError status 369 // Prioritize returning http.StatusInternalServerError status
318 // code errors over other errors. 370 // code errors over other errors.
319 var e error 371 var e error
320 for _, err := range errs { 372 for _, err := range errs {
321 se, ok := err.(statusError) 373 se, ok := err.(statusError)
322 if ok && se.code == http.StatusInternalServerError { 374 if ok && se.code == http.StatusInternalServerError {
323 return se 375 return se
Vadim Sh. 2016/08/16 18:56:13 beware that RunInTransaction can return an error o
nishanths 2016/08/16 20:35:13 in that case, the error will be propagated at line
324 } 376 }
325 e = err 377 e = err
326 } 378 }
327 return e 379 return e
328 }, &datastore.TransactionOptions{XG: true}) 380 }, &datastore.TransactionOptions{XG: true})
Vadim Sh. 2016/08/16 18:56:13 how many entity groups does this transaction modif
nishanths 2016/08/16 20:35:13 Two entity groups. (1) TestFile (2) DataEntry
329 } 381 }
330 382
331 // getTestFileAlt returns the the first TestFile in the datastore for 383 // getTestFileAlt returns the the first TestFile in the datastore for
332 // the query formed by calling p.Query(). 384 // the query formed by calling p.Query().
333 // 385 //
334 // The function tries to find the first TestFile using p. If no such TestFile 386 // The function tries to find the first TestFile using p. If no such TestFile
335 // exists the function sets p.Master to altMaster and tries again. 387 // exists the function sets p.Master to altMaster and tries again.
336 // If altMaster is empty, the function does not perform the additional try. 388 // If altMaster is empty, the function does not perform the additional try.
337 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) { 389 func getTestFileAlt(c context.Context, p model.TestFileParams, altMaster string) (ret *model.TestFile, err error) {
338 a, err := getFirstTestFile(c, p.Query()) 390 a, err := getFirstTestFile(c, p.Query())
(...skipping 10 matching lines...) Expand all
349 a, err = getFirstTestFile(c, p.Query()) 401 a, err = getFirstTestFile(c, p.Query())
350 if err == nil { 402 if err == nil {
351 a.Master = origMaster 403 a.Master = origMaster
352 return a, nil 404 return a, nil
353 } 405 }
354 406
355 return nil, err 407 return nil, err
356 } 408 }
357 409
358 // updateAggregate updates tf with the result of merging incr into 410 // updateAggregate updates tf with the result of merging incr into
359 // aggr. 411 // aggr, and updates tf in datastore.
360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { 412 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error {
361 if !model.IsAggregateTestFile(tf.Name) { 413 if !model.IsAggregateTestFile(tf.Name) {
362 return errors.New("frontend: tf should be an aggregate test file ") 414 return errors.New("frontend: tf should be an aggregate test file ")
363 } 415 }
364 416
365 size := model.ResultsSize 417 size := model.ResultsSize
366 if tf.Name == "results-small.json" { 418 if tf.Name == "results-small.json" {
367 size = model.ResultsSmallSize 419 size = model.ResultsSmallSize
368 } 420 }
369 421
(...skipping 19 matching lines...) Expand all
389 b := &bytes.Buffer{} 441 b := &bytes.Buffer{}
390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { 442 if err := json.NewEncoder(b).Encode(&aggr); err != nil {
391 return statusError{err, http.StatusInternalServerError} 443 return statusError{err, http.StatusInternalServerError}
392 } 444 }
393 445
394 tf.Data = b 446 tf.Data = b
395 if err := tf.PutData(c); err != nil { 447 if err := tf.PutData(c); err != nil {
396 return statusError{err, http.StatusInternalServerError} 448 return statusError{err, http.StatusInternalServerError}
397 } 449 }
398 450
451 if err := datastore.Get(c).Put(tf); err != nil {
452 return err
Vadim Sh. 2016/08/16 18:56:13 statusError{err, http.StatusInternalServerError} ?
nishanths 2016/08/16 20:35:13 Done.
453 }
454 if err := deleteKeys(c, tf.OldDataKeys); err != nil {
Vadim Sh. 2016/08/16 18:56:13 ignoring this error?
nishanths 2016/08/16 20:35:13 yes, only logging the error. Returning an HTTP e
455 logging.Fields{
456 logging.ErrorKey: err,
457 "keys": tf.OldDataKeys,
458 }.Errorf(c, "upload: failed to delete keys")
459 }
399 return nil 460 return nil
400 } 461 }
462
463 func deleteKeys(c context.Context, k []*datastore.Key) error {
464 if len(k) == 0 {
465 return nil
466 }
467
468 keys := make([]string, 0, len(k))
469 for _, key := range k {
470 keys = append(keys, key.Encode())
471 }
472
473 payload, err := json.Marshal(struct {
474 Keys []string `json:"keys"`
475 }{
476 keys,
477 })
478 if err != nil {
479 return err
480 }
481
482 h := make(http.Header)
483 h.Set("Content-Type", "application/json")
484
485 logging.Fields{
486 "keys": keys,
487 }.Infof(c, "deleteKeys: enqueing")
488
489 return taskqueue.Get(c).Add(&taskqueue.Task{
490 Path: "/internal/delete",
491 Payload: payload,
492 Header: h,
493 Method: "POST",
494 Delay: time.Duration(30) * time.Minute,
495 }, deleteKeysQueueName)
496 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698