Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(210)

Side by Side Diff: go/src/infra/appengine/test-results/frontend/upload.go

Issue 2250043002: test-results: package frontend: Add delete keys task queue (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@xx_5
Patch Set: (Rebase) Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 package frontend 1 package frontend
2 2
3 import ( 3 import (
4 "bytes" 4 "bytes"
5 "encoding/json" 5 "encoding/json"
6 "errors" 6 "errors"
7 "io" 7 "io"
8 "mime/multipart" 8 "mime/multipart"
9 "net/http" 9 "net/http"
10 "net/url"
10 "strconv" 11 "strconv"
11 "sync" 12 "sync"
13 "time"
12 14
13 "golang.org/x/net/context" 15 "golang.org/x/net/context"
14 16
15 "github.com/luci/gae/service/datastore" 17 "github.com/luci/gae/service/datastore"
18 "github.com/luci/gae/service/taskqueue"
19 "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/server/router" 20 "github.com/luci/luci-go/server/router"
17 21
22 "infra/appengine/test-results/builderstate"
18 "infra/appengine/test-results/model" 23 "infra/appengine/test-results/model"
19 ) 24 )
20 25
21 type statusError struct { 26 type statusError struct {
22 error 27 error
23 code int 28 code int
24 } 29 }
25 30
26 // MarshalJSON marshals status error to JSON. 31 // MarshalJSON marshals status error to JSON.
27 func (se *statusError) MarshalJSON() ([]byte, error) { 32 func (se *statusError) MarshalJSON() ([]byte, error) {
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
139 } 144 }
140 defer file.Close() 145 defer file.Close()
141 146
142 switch fh.Filename { 147 switch fh.Filename {
143 case "incremental_results.json": 148 case "incremental_results.json":
144 var incr model.AggregateResult 149 var incr model.AggregateResult
145 if err := json.NewDecoder(file).Decode(&incr); err != nil { 150 if err := json.NewDecoder(file).Decode(&incr); err != nil {
146 return statusError{err, http.StatusBadRequest} 151 return statusError{err, http.StatusBadRequest}
147 } 152 }
148 return updateIncremental(c, &incr) 153 return updateIncremental(c, &incr)
154
149 case "full_results.json": 155 case "full_results.json":
150 » » return updateFullResults(c, file) 156 » » bn, data, err := extractBuildNumber(file)
157 » » if err != nil {
158 » » » if err == ErrInvalidBuildNumber {
159 » » » » return statusError{err, http.StatusBadRequest}
160 » » » }
161 » » » return statusError{err, http.StatusInternalServerError}
162 » » }
163 » » if err := updateFullResults(c, data); err != nil {
164 » » » return err
165 » » }
166
167 » » p := GetUploadParams(c)
168 » » wg := sync.WaitGroup{}
169
170 » » wg.Add(1)
171 » » go func() {
172 » » » defer wg.Done()
173 » » » if err := builderstate.Update(c, p.Master, p.Builder, p. TestType, time.Now().UTC()); err != nil {
174 » » » » logging.WithError(err).Errorf(c, "builderstate u pdate")
175 » » » }
176 » » }()
177
178 » » wg.Add(1)
179 » » go func() {
180 » » » defer wg.Done()
181 » » » if err := taskqueue.Get(c).Add(
182 » » » » taskqueue.NewPOSTTask("/internal/monitoring/uplo ad", url.Values{
183 » » » » » "master": {p.Master},
184 » » » » » "builder": {p.Builder},
185 » » » » » "build_number": {strconv.Itoa(bn)},
186 » » » » » "test_type": {p.TestType},
187 » » » » }),
188 » » » » defaultQueueName,
189 » » » ); err != nil {
190 » » » » logging.WithError(err).Errorf(c, "taskqueue add: /internal/monitoring/upload")
191 » » » }
192 » » }()
193
194 » » wg.Wait()
195 » » return nil
196
151 default: 197 default:
152 return uploadTestFile(c, file, fh.Filename) 198 return uploadTestFile(c, file, fh.Filename)
153 } 199 }
154 } 200 }
155 201
156 // uploadTestFile creates a new TestFile from the values in context 202 // ErrInvalidBuildNumber is returned when the extractBuildNumber fails
157 // and supplied data, and puts it to the datastore. 203 // to convert the build number value to an int.
158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { 204 var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to int")
205
206 // extractBuildNumber extracts the value of "build_number" key from
207 // the supplied JSON encoded data. The returned io.Reader will have
208 // the same contents as the supplied io.Reader.
209 //
210 // The error is ErrInvalidBuildNumber if the build number value
211 // could not be converted to an int.
212 func extractBuildNumber(data io.Reader) (int, io.Reader, error) {
159 var buf bytes.Buffer 213 var buf bytes.Buffer
160 tee := io.TeeReader(data, &buf) 214 tee := io.TeeReader(data, &buf)
161 215
162 aux := struct { 216 aux := struct {
163 N string `json:"build_number,omitempty"` 217 N string `json:"build_number,omitempty"`
164 }{} 218 }{}
165
166 dec := json.NewDecoder(tee) 219 dec := json.NewDecoder(tee)
167 if err := dec.Decode(&aux); err != nil { 220 if err := dec.Decode(&aux); err != nil {
168 » » return statusError{err, http.StatusInternalServerError} 221 » » return 0, io.MultiReader(&buf, dec.Buffered()), err
169 } 222 }
170 223
171 » bn := 0 224 » var bn int
172
173 if aux.N != "" { 225 if aux.N != "" {
174 n, err := strconv.Atoi(aux.N) 226 n, err := strconv.Atoi(aux.N)
175 if err != nil { 227 if err != nil {
176 » » » return statusError{errors.New("invalid build_number"), h ttp.StatusBadRequest} 228 » » » return 0, io.MultiReader(&buf, dec.Buffered()), ErrInval idBuildNumber
177 } 229 }
178 bn = n 230 bn = n
179 } 231 }
180 232
233 return bn, io.MultiReader(&buf, dec.Buffered()), nil
234 }
235
236 // uploadTestFile creates a new TestFile from the values in context
237 // and supplied data, and puts it to the datastore.
238 func uploadTestFile(c context.Context, data io.Reader, filename string) error {
239 bn, data, err := extractBuildNumber(data)
240 if err != nil {
241 if err == ErrInvalidBuildNumber {
242 return statusError{err, http.StatusBadRequest}
243 }
244 return statusError{err, http.StatusInternalServerError}
245 }
246
181 p := GetUploadParams(c) 247 p := GetUploadParams(c)
182 tf := model.TestFile{ 248 tf := model.TestFile{
183 Master: p.Master, 249 Master: p.Master,
184 Builder: p.Builder, 250 Builder: p.Builder,
185 TestType: p.TestType, 251 TestType: p.TestType,
186 BuildNumber: model.BuildNum(bn), 252 BuildNumber: model.BuildNum(bn),
187 Name: filename, 253 Name: filename,
188 » » Data: io.MultiReader(&buf, dec.Buffered()), 254 » » Data: data,
189 } 255 }
190 if err := tf.PutData(c); err != nil { 256 if err := tf.PutData(c); err != nil {
191 return statusError{err, http.StatusInternalServerError} 257 return statusError{err, http.StatusInternalServerError}
192 } 258 }
193 » return nil 259
260 » return datastore.Get(c).Put(&tf)
194 } 261 }
195 262
196 // updateFullResults puts the supplied data as "full_results.json" 263 // updateFullResults puts the supplied data as "full_results.json"
197 // to the datastore, and updates corresponding "results.json" and 264 // to the datastore, and updates corresponding "results.json" and
198 // "results-small.json" files in the datastore. 265 // "results-small.json" files in the datastore.
199 // 266 //
200 // The supplied data should unmarshal into model.FullResults. 267 // The supplied data should unmarshal into model.FullResults.
201 // Otherwise, an error is returned. 268 // Otherwise, an error is returned.
202 func updateFullResults(c context.Context, data io.Reader) error { 269 func updateFullResults(c context.Context, data io.Reader) error {
203 buf := &bytes.Buffer{} 270 buf := &bytes.Buffer{}
204 tee := io.TeeReader(data, buf) 271 tee := io.TeeReader(data, buf)
205 dec := json.NewDecoder(tee) 272 dec := json.NewDecoder(tee)
206 273
207 var f model.FullResult 274 var f model.FullResult
208 if err := dec.Decode(&f); err != nil { 275 if err := dec.Decode(&f); err != nil {
209 return statusError{err, http.StatusBadRequest} 276 return statusError{err, http.StatusBadRequest}
210 } 277 }
211 278
212 » wg := sync.WaitGroup{} 279 » if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_r esults.json"); err != nil {
213 » errCh := make(chan error, 2) 280 » » return statusError{err, http.StatusInternalServerError}
281 » }
214 282
215 » wg.Add(1) 283 » incr, err := f.AggregateResult()
216 » go func() { 284 » if err != nil {
217 » » defer wg.Done() 285 » » return statusError{err, http.StatusBadRequest}
218 » » errCh <- uploadTestFile( 286 » }
219 » » » c, io.MultiReader(buf, dec.Buffered()), "full_results.js on", 287 » if err := updateIncremental(c, &incr); err != nil {
220 » » ) 288 » » return statusError{err, http.StatusInternalServerError}
221 » }() 289 » }
222 290
223 wg.Add(1)
224 go func() {
225 defer wg.Done()
226 incr, err := f.AggregateResult()
227 if err != nil {
228 errCh <- statusError{err, http.StatusBadRequest}
229 return
230 }
231 errCh <- updateIncremental(c, &incr)
232 }()
233
234 wg.Wait()
235 close(errCh)
236 for err := range errCh {
237 if err != nil {
238 return err
239 }
240 }
241 return nil 291 return nil
242 } 292 }
243 293
244 // updateIncremental gets "results.json" and "results-small.json" 294 // updateIncremental gets "results.json" and "results-small.json"
245 // for values in context, merges incr into them, and puts the updated 295 // for values in context, merges incr into them, and puts the updated
246 // files to the datastore. 296 // files to the datastore.
247 func updateIncremental(c context.Context, incr *model.AggregateResult) error { 297 func updateIncremental(c context.Context, incr *model.AggregateResult) error {
248 u := GetUploadParams(c) 298 u := GetUploadParams(c)
249 p := model.TestFileParams{ 299 p := model.TestFileParams{
250 Master: u.Master, 300 Master: u.Master,
251 Builder: u.Builder, 301 Builder: u.Builder,
252 TestType: u.TestType, 302 TestType: u.TestType,
253 } 303 }
254 304
255 names := []string{"results.json", "results-small.json"} 305 names := []string{"results.json", "results-small.json"}
256 files := make([]struct { 306 files := make([]struct {
257 tf *model.TestFile 307 tf *model.TestFile
258 aggr *model.AggregateResult 308 aggr *model.AggregateResult
259 err error 309 err error
260 }, len(names)) 310 }, len(names))
261 311
262 wg := sync.WaitGroup{} 312 wg := sync.WaitGroup{}
263 313
264 for i, name := range names { 314 for i, name := range names {
265 » » i, name := i, name 315 » » i, name, p := i, name, p
266 wg.Add(1) 316 wg.Add(1)
317
267 go func() { 318 go func() {
268 defer wg.Done() 319 defer wg.Done()
320 p.Name = name
269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) 321 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster)
270 if err != nil { 322 if err != nil {
271 if _, ok := err.(ErrNoMatches); ok { 323 if _, ok := err.(ErrNoMatches); ok {
272 files[i].tf = &model.TestFile{ 324 files[i].tf = &model.TestFile{
273 Master: p.Master, 325 Master: p.Master,
274 Builder: p.Builder, 326 Builder: p.Builder,
275 TestType: p.TestType, 327 TestType: p.TestType,
276 BuildNumber: -1, 328 BuildNumber: -1,
277 Name: name, 329 Name: name,
278 } 330 }
279 » » » » } else { 331 » » » » » return
280 » » » » » files[i].err = err
281 } 332 }
333 files[i].err = err
282 return 334 return
283 } 335 }
284 files[i].tf = tf
285 if err := tf.GetData(c); err != nil { 336 if err := tf.GetData(c); err != nil {
286 files[i].err = err 337 files[i].err = err
287 return 338 return
288 } 339 }
289 » » » if err := json.NewDecoder(tf.Data).Decode(files[i].aggr) ; err != nil { 340 » » » var a model.AggregateResult
341 » » » if err := json.NewDecoder(tf.Data).Decode(&a); err != ni l {
290 files[i].err = err 342 files[i].err = err
291 return 343 return
292 } 344 }
345 files[i].tf = tf
346 files[i].aggr = &a
293 }() 347 }()
294 } 348 }
295 349
296 wg.Wait() 350 wg.Wait()
297 for _, file := range files { 351 for _, file := range files {
298 if file.err != nil { 352 if file.err != nil {
299 return file.err 353 return file.err
300 } 354 }
301 } 355 }
302 356
303 » wg = sync.WaitGroup{} 357 » return datastore.Get(c).RunInTransaction(func(c context.Context) error {
304 » errs := make([]error, len(files)) 358 » » wg = sync.WaitGroup{}
359 » » errs := make([]error, len(files))
305 360
306 return datastore.Get(c).RunInTransaction(func(c context.Context) error {
307 for i, file := range files { 361 for i, file := range files {
308 i, file := i, file 362 i, file := i, file
309 wg.Add(1) 363 wg.Add(1)
310 go func() { 364 go func() {
311 defer wg.Done() 365 defer wg.Done()
312 errs[i] = updateAggregate(c, file.tf, file.aggr, incr) 366 errs[i] = updateAggregate(c, file.tf, file.aggr, incr)
313 }() 367 }()
314 } 368 }
315 369
316 wg.Wait() 370 wg.Wait()
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
349 a, err = getFirstTestFile(c, p.Query()) 403 a, err = getFirstTestFile(c, p.Query())
350 if err == nil { 404 if err == nil {
351 a.Master = origMaster 405 a.Master = origMaster
352 return a, nil 406 return a, nil
353 } 407 }
354 408
355 return nil, err 409 return nil, err
356 } 410 }
357 411
358 // updateAggregate updates tf with the result of merging incr into 412 // updateAggregate updates tf with the result of merging incr into
359 // aggr. 413 // aggr, and updates tf in datastore.
360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error { 414 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag gregateResult) error {
361 if !model.IsAggregateTestFile(tf.Name) { 415 if !model.IsAggregateTestFile(tf.Name) {
362 return errors.New("frontend: tf should be an aggregate test file ") 416 return errors.New("frontend: tf should be an aggregate test file ")
363 } 417 }
364 418
365 size := model.ResultsSize 419 size := model.ResultsSize
366 if tf.Name == "results-small.json" { 420 if tf.Name == "results-small.json" {
367 size = model.ResultsSmallSize 421 size = model.ResultsSmallSize
368 } 422 }
369 423
(...skipping 19 matching lines...) Expand all
389 b := &bytes.Buffer{} 443 b := &bytes.Buffer{}
390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { 444 if err := json.NewEncoder(b).Encode(&aggr); err != nil {
391 return statusError{err, http.StatusInternalServerError} 445 return statusError{err, http.StatusInternalServerError}
392 } 446 }
393 447
394 tf.Data = b 448 tf.Data = b
395 if err := tf.PutData(c); err != nil { 449 if err := tf.PutData(c); err != nil {
396 return statusError{err, http.StatusInternalServerError} 450 return statusError{err, http.StatusInternalServerError}
397 } 451 }
398 452
453 if err := datastore.Get(c).Put(tf); err != nil {
454 return statusError{err, http.StatusInternalServerError}
455 }
456 if err := deleteKeys(c, tf.OldDataKeys); err != nil {
457 logging.Fields{
458 logging.ErrorKey: err,
459 "keys": tf.OldDataKeys,
460 }.Errorf(c, "upload: failed to delete keys")
461 }
399 return nil 462 return nil
400 } 463 }
464
465 func deleteKeys(c context.Context, k []*datastore.Key) error {
466 if len(k) == 0 {
467 return nil
468 }
469
470 keys := make([]string, 0, len(k))
471 for _, key := range k {
472 keys = append(keys, key.Encode())
473 }
474
475 payload, err := json.Marshal(struct {
476 Keys []string `json:"keys"`
477 }{
478 keys,
479 })
480 if err != nil {
481 return err
482 }
483
484 h := make(http.Header)
485 h.Set("Content-Type", "application/json")
486
487 logging.Fields{
488 "keys": keys,
489 }.Infof(c, "deleteKeys: enqueing")
490
491 return taskqueue.Get(c).Add(&taskqueue.Task{
492 Path: "/internal/delete",
493 Payload: payload,
494 Header: h,
495 Method: "POST",
496 Delay: time.Duration(30) * time.Minute,
497 }, deleteKeysQueueName)
498 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698