OLD | NEW |
1 package frontend | 1 package frontend |
2 | 2 |
3 import ( | 3 import ( |
4 "bytes" | 4 "bytes" |
5 "encoding/json" | 5 "encoding/json" |
6 "errors" | 6 "errors" |
7 "io" | 7 "io" |
8 "mime/multipart" | 8 "mime/multipart" |
9 "net/http" | 9 "net/http" |
| 10 "net/url" |
10 "strconv" | 11 "strconv" |
11 "sync" | 12 "sync" |
| 13 "time" |
12 | 14 |
13 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
14 | 16 |
15 "github.com/luci/gae/service/datastore" | 17 "github.com/luci/gae/service/datastore" |
| 18 "github.com/luci/gae/service/taskqueue" |
| 19 "github.com/luci/luci-go/common/logging" |
16 "github.com/luci/luci-go/server/router" | 20 "github.com/luci/luci-go/server/router" |
17 | 21 |
| 22 "infra/appengine/test-results/builderstate" |
18 "infra/appengine/test-results/model" | 23 "infra/appengine/test-results/model" |
19 ) | 24 ) |
20 | 25 |
21 type statusError struct { | 26 type statusError struct { |
22 error | 27 error |
23 code int | 28 code int |
24 } | 29 } |
25 | 30 |
26 // MarshalJSON marshals status error to JSON. | 31 // MarshalJSON marshals status error to JSON. |
27 func (se *statusError) MarshalJSON() ([]byte, error) { | 32 func (se *statusError) MarshalJSON() ([]byte, error) { |
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
139 } | 144 } |
140 defer file.Close() | 145 defer file.Close() |
141 | 146 |
142 switch fh.Filename { | 147 switch fh.Filename { |
143 case "incremental_results.json": | 148 case "incremental_results.json": |
144 var incr model.AggregateResult | 149 var incr model.AggregateResult |
145 if err := json.NewDecoder(file).Decode(&incr); err != nil { | 150 if err := json.NewDecoder(file).Decode(&incr); err != nil { |
146 return statusError{err, http.StatusBadRequest} | 151 return statusError{err, http.StatusBadRequest} |
147 } | 152 } |
148 return updateIncremental(c, &incr) | 153 return updateIncremental(c, &incr) |
| 154 |
149 case "full_results.json": | 155 case "full_results.json": |
150 » » return updateFullResults(c, file) | 156 » » bn, data, err := extractBuildNumber(file) |
| 157 » » if err != nil { |
| 158 » » » if err == ErrInvalidBuildNumber { |
| 159 » » » » return statusError{err, http.StatusBadRequest} |
| 160 » » » } |
| 161 » » » return statusError{err, http.StatusInternalServerError} |
| 162 » » } |
| 163 » » if err := updateFullResults(c, data); err != nil { |
| 164 » » » return err |
| 165 » » } |
| 166 |
| 167 » » p := GetUploadParams(c) |
| 168 » » wg := sync.WaitGroup{} |
| 169 |
| 170 » » wg.Add(1) |
| 171 » » go func() { |
| 172 » » » defer wg.Done() |
| 173 » » » if err := builderstate.Update(c, p.Master, p.Builder, p.
TestType, time.Now().UTC()); err != nil { |
| 174 » » » » logging.WithError(err).Errorf(c, "builderstate u
pdate") |
| 175 » » » } |
| 176 » » }() |
| 177 |
| 178 » » wg.Add(1) |
| 179 » » go func() { |
| 180 » » » defer wg.Done() |
| 181 » » » if err := taskqueue.Get(c).Add( |
| 182 » » » » taskqueue.NewPOSTTask("/internal/monitoring/uplo
ad", url.Values{ |
| 183 » » » » » "master": {p.Master}, |
| 184 » » » » » "builder": {p.Builder}, |
| 185 » » » » » "build_number": {strconv.Itoa(bn)}, |
| 186 » » » » » "test_type": {p.TestType}, |
| 187 » » » » }), |
| 188 » » » » defaultQueueName, |
| 189 » » » ); err != nil { |
| 190 » » » » logging.WithError(err).Errorf(c, "taskqueue add:
/internal/monitoring/upload") |
| 191 » » » } |
| 192 » » }() |
| 193 |
| 194 » » wg.Wait() |
| 195 » » return nil |
| 196 |
151 default: | 197 default: |
152 return uploadTestFile(c, file, fh.Filename) | 198 return uploadTestFile(c, file, fh.Filename) |
153 } | 199 } |
154 } | 200 } |
155 | 201 |
156 // uploadTestFile creates a new TestFile from the values in context | 202 // ErrInvalidBuildNumber is returned when the extractBuildNumber fails |
157 // and supplied data, and puts it to the datastore. | 203 // to convert the build number value to an int. |
158 func uploadTestFile(c context.Context, data io.Reader, filename string) error { | 204 var ErrInvalidBuildNumber = errors.New("invalid build_number: cannot convert to
int") |
| 205 |
| 206 // extractBuildNumber extracts the value of "build_number" key from |
| 207 // the supplied JSON encoded data. The returned io.Reader will have |
| 208 // the same contents as the supplied io.Reader. |
| 209 // |
| 210 // The error is ErrInvalidBuildNumber if the build number value |
| 211 // could not be converted to an int. |
| 212 func extractBuildNumber(data io.Reader) (int, io.Reader, error) { |
159 var buf bytes.Buffer | 213 var buf bytes.Buffer |
160 tee := io.TeeReader(data, &buf) | 214 tee := io.TeeReader(data, &buf) |
161 | 215 |
162 aux := struct { | 216 aux := struct { |
163 N string `json:"build_number,omitempty"` | 217 N string `json:"build_number,omitempty"` |
164 }{} | 218 }{} |
165 | |
166 dec := json.NewDecoder(tee) | 219 dec := json.NewDecoder(tee) |
167 if err := dec.Decode(&aux); err != nil { | 220 if err := dec.Decode(&aux); err != nil { |
168 » » return statusError{err, http.StatusInternalServerError} | 221 » » return 0, io.MultiReader(&buf, dec.Buffered()), err |
169 } | 222 } |
170 | 223 |
171 » bn := 0 | 224 » var bn int |
172 | |
173 if aux.N != "" { | 225 if aux.N != "" { |
174 n, err := strconv.Atoi(aux.N) | 226 n, err := strconv.Atoi(aux.N) |
175 if err != nil { | 227 if err != nil { |
176 » » » return statusError{errors.New("invalid build_number"), h
ttp.StatusBadRequest} | 228 » » » return 0, io.MultiReader(&buf, dec.Buffered()), ErrInval
idBuildNumber |
177 } | 229 } |
178 bn = n | 230 bn = n |
179 } | 231 } |
180 | 232 |
| 233 return bn, io.MultiReader(&buf, dec.Buffered()), nil |
| 234 } |
| 235 |
| 236 // uploadTestFile creates a new TestFile from the values in context |
| 237 // and supplied data, and puts it to the datastore. |
| 238 func uploadTestFile(c context.Context, data io.Reader, filename string) error { |
| 239 bn, data, err := extractBuildNumber(data) |
| 240 if err != nil { |
| 241 if err == ErrInvalidBuildNumber { |
| 242 return statusError{err, http.StatusBadRequest} |
| 243 } |
| 244 return statusError{err, http.StatusInternalServerError} |
| 245 } |
| 246 |
181 p := GetUploadParams(c) | 247 p := GetUploadParams(c) |
182 tf := model.TestFile{ | 248 tf := model.TestFile{ |
183 Master: p.Master, | 249 Master: p.Master, |
184 Builder: p.Builder, | 250 Builder: p.Builder, |
185 TestType: p.TestType, | 251 TestType: p.TestType, |
186 BuildNumber: model.BuildNum(bn), | 252 BuildNumber: model.BuildNum(bn), |
187 Name: filename, | 253 Name: filename, |
188 » » Data: io.MultiReader(&buf, dec.Buffered()), | 254 » » Data: data, |
189 } | 255 } |
190 if err := tf.PutData(c); err != nil { | 256 if err := tf.PutData(c); err != nil { |
191 return statusError{err, http.StatusInternalServerError} | 257 return statusError{err, http.StatusInternalServerError} |
192 } | 258 } |
193 » return nil | 259 |
| 260 » return datastore.Get(c).Put(&tf) |
194 } | 261 } |
195 | 262 |
196 // updateFullResults puts the supplied data as "full_results.json" | 263 // updateFullResults puts the supplied data as "full_results.json" |
197 // to the datastore, and updates corresponding "results.json" and | 264 // to the datastore, and updates corresponding "results.json" and |
198 // "results-small.json" files in the datastore. | 265 // "results-small.json" files in the datastore. |
199 // | 266 // |
200 // The supplied data should unmarshal into model.FullResults. | 267 // The supplied data should unmarshal into model.FullResults. |
201 // Otherwise, an error is returned. | 268 // Otherwise, an error is returned. |
202 func updateFullResults(c context.Context, data io.Reader) error { | 269 func updateFullResults(c context.Context, data io.Reader) error { |
203 buf := &bytes.Buffer{} | 270 buf := &bytes.Buffer{} |
204 tee := io.TeeReader(data, buf) | 271 tee := io.TeeReader(data, buf) |
205 dec := json.NewDecoder(tee) | 272 dec := json.NewDecoder(tee) |
206 | 273 |
207 var f model.FullResult | 274 var f model.FullResult |
208 if err := dec.Decode(&f); err != nil { | 275 if err := dec.Decode(&f); err != nil { |
209 return statusError{err, http.StatusBadRequest} | 276 return statusError{err, http.StatusBadRequest} |
210 } | 277 } |
211 | 278 |
212 » wg := sync.WaitGroup{} | 279 » if err := uploadTestFile(c, io.MultiReader(buf, dec.Buffered()), "full_r
esults.json"); err != nil { |
213 » errCh := make(chan error, 2) | 280 » » return statusError{err, http.StatusInternalServerError} |
| 281 » } |
214 | 282 |
215 » wg.Add(1) | 283 » incr, err := f.AggregateResult() |
216 » go func() { | 284 » if err != nil { |
217 » » defer wg.Done() | 285 » » return statusError{err, http.StatusBadRequest} |
218 » » errCh <- uploadTestFile( | 286 » } |
219 » » » c, io.MultiReader(buf, dec.Buffered()), "full_results.js
on", | 287 » if err := updateIncremental(c, &incr); err != nil { |
220 » » ) | 288 » » return statusError{err, http.StatusInternalServerError} |
221 » }() | 289 » } |
222 | 290 |
223 wg.Add(1) | |
224 go func() { | |
225 defer wg.Done() | |
226 incr, err := f.AggregateResult() | |
227 if err != nil { | |
228 errCh <- statusError{err, http.StatusBadRequest} | |
229 return | |
230 } | |
231 errCh <- updateIncremental(c, &incr) | |
232 }() | |
233 | |
234 wg.Wait() | |
235 close(errCh) | |
236 for err := range errCh { | |
237 if err != nil { | |
238 return err | |
239 } | |
240 } | |
241 return nil | 291 return nil |
242 } | 292 } |
243 | 293 |
244 // updateIncremental gets "results.json" and "results-small.json" | 294 // updateIncremental gets "results.json" and "results-small.json" |
245 // for values in context, merges incr into them, and puts the updated | 295 // for values in context, merges incr into them, and puts the updated |
246 // files to the datastore. | 296 // files to the datastore. |
247 func updateIncremental(c context.Context, incr *model.AggregateResult) error { | 297 func updateIncremental(c context.Context, incr *model.AggregateResult) error { |
248 u := GetUploadParams(c) | 298 u := GetUploadParams(c) |
249 p := model.TestFileParams{ | 299 p := model.TestFileParams{ |
250 Master: u.Master, | 300 Master: u.Master, |
251 Builder: u.Builder, | 301 Builder: u.Builder, |
252 TestType: u.TestType, | 302 TestType: u.TestType, |
253 } | 303 } |
254 | 304 |
255 names := []string{"results.json", "results-small.json"} | 305 names := []string{"results.json", "results-small.json"} |
256 files := make([]struct { | 306 files := make([]struct { |
257 tf *model.TestFile | 307 tf *model.TestFile |
258 aggr *model.AggregateResult | 308 aggr *model.AggregateResult |
259 err error | 309 err error |
260 }, len(names)) | 310 }, len(names)) |
261 | 311 |
262 wg := sync.WaitGroup{} | 312 wg := sync.WaitGroup{} |
263 | 313 |
264 for i, name := range names { | 314 for i, name := range names { |
265 » » i, name := i, name | 315 » » i, name, p := i, name, p |
266 wg.Add(1) | 316 wg.Add(1) |
| 317 |
267 go func() { | 318 go func() { |
268 defer wg.Done() | 319 defer wg.Done() |
| 320 p.Name = name |
269 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) | 321 tf, err := getTestFileAlt(c, p, u.DeprecatedMaster) |
270 if err != nil { | 322 if err != nil { |
271 if _, ok := err.(ErrNoMatches); ok { | 323 if _, ok := err.(ErrNoMatches); ok { |
272 files[i].tf = &model.TestFile{ | 324 files[i].tf = &model.TestFile{ |
273 Master: p.Master, | 325 Master: p.Master, |
274 Builder: p.Builder, | 326 Builder: p.Builder, |
275 TestType: p.TestType, | 327 TestType: p.TestType, |
276 BuildNumber: -1, | 328 BuildNumber: -1, |
277 Name: name, | 329 Name: name, |
278 } | 330 } |
279 » » » » } else { | 331 » » » » » return |
280 » » » » » files[i].err = err | |
281 } | 332 } |
| 333 files[i].err = err |
282 return | 334 return |
283 } | 335 } |
284 files[i].tf = tf | |
285 if err := tf.GetData(c); err != nil { | 336 if err := tf.GetData(c); err != nil { |
286 files[i].err = err | 337 files[i].err = err |
287 return | 338 return |
288 } | 339 } |
289 » » » if err := json.NewDecoder(tf.Data).Decode(files[i].aggr)
; err != nil { | 340 » » » var a model.AggregateResult |
| 341 » » » if err := json.NewDecoder(tf.Data).Decode(&a); err != ni
l { |
290 files[i].err = err | 342 files[i].err = err |
291 return | 343 return |
292 } | 344 } |
| 345 files[i].tf = tf |
| 346 files[i].aggr = &a |
293 }() | 347 }() |
294 } | 348 } |
295 | 349 |
296 wg.Wait() | 350 wg.Wait() |
297 for _, file := range files { | 351 for _, file := range files { |
298 if file.err != nil { | 352 if file.err != nil { |
299 return file.err | 353 return file.err |
300 } | 354 } |
301 } | 355 } |
302 | 356 |
303 » wg = sync.WaitGroup{} | 357 » return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
304 » errs := make([]error, len(files)) | 358 » » wg = sync.WaitGroup{} |
| 359 » » errs := make([]error, len(files)) |
305 | 360 |
306 return datastore.Get(c).RunInTransaction(func(c context.Context) error { | |
307 for i, file := range files { | 361 for i, file := range files { |
308 i, file := i, file | 362 i, file := i, file |
309 wg.Add(1) | 363 wg.Add(1) |
310 go func() { | 364 go func() { |
311 defer wg.Done() | 365 defer wg.Done() |
312 errs[i] = updateAggregate(c, file.tf, file.aggr,
incr) | 366 errs[i] = updateAggregate(c, file.tf, file.aggr,
incr) |
313 }() | 367 }() |
314 } | 368 } |
315 | 369 |
316 wg.Wait() | 370 wg.Wait() |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
349 a, err = getFirstTestFile(c, p.Query()) | 403 a, err = getFirstTestFile(c, p.Query()) |
350 if err == nil { | 404 if err == nil { |
351 a.Master = origMaster | 405 a.Master = origMaster |
352 return a, nil | 406 return a, nil |
353 } | 407 } |
354 | 408 |
355 return nil, err | 409 return nil, err |
356 } | 410 } |
357 | 411 |
358 // updateAggregate updates tf with the result of merging incr into | 412 // updateAggregate updates tf with the result of merging incr into |
359 // aggr. | 413 // aggr, and updates tf in datastore. |
360 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
gregateResult) error { | 414 func updateAggregate(c context.Context, tf *model.TestFile, aggr, incr *model.Ag
gregateResult) error { |
361 if !model.IsAggregateTestFile(tf.Name) { | 415 if !model.IsAggregateTestFile(tf.Name) { |
362 return errors.New("frontend: tf should be an aggregate test file
") | 416 return errors.New("frontend: tf should be an aggregate test file
") |
363 } | 417 } |
364 | 418 |
365 size := model.ResultsSize | 419 size := model.ResultsSize |
366 if tf.Name == "results-small.json" { | 420 if tf.Name == "results-small.json" { |
367 size = model.ResultsSmallSize | 421 size = model.ResultsSmallSize |
368 } | 422 } |
369 | 423 |
(...skipping 19 matching lines...) Expand all Loading... |
389 b := &bytes.Buffer{} | 443 b := &bytes.Buffer{} |
390 if err := json.NewEncoder(b).Encode(&aggr); err != nil { | 444 if err := json.NewEncoder(b).Encode(&aggr); err != nil { |
391 return statusError{err, http.StatusInternalServerError} | 445 return statusError{err, http.StatusInternalServerError} |
392 } | 446 } |
393 | 447 |
394 tf.Data = b | 448 tf.Data = b |
395 if err := tf.PutData(c); err != nil { | 449 if err := tf.PutData(c); err != nil { |
396 return statusError{err, http.StatusInternalServerError} | 450 return statusError{err, http.StatusInternalServerError} |
397 } | 451 } |
398 | 452 |
| 453 if err := datastore.Get(c).Put(tf); err != nil { |
| 454 return statusError{err, http.StatusInternalServerError} |
| 455 } |
| 456 if err := deleteKeys(c, tf.OldDataKeys); err != nil { |
| 457 logging.Fields{ |
| 458 logging.ErrorKey: err, |
| 459 "keys": tf.OldDataKeys, |
| 460 }.Errorf(c, "upload: failed to delete keys") |
| 461 } |
399 return nil | 462 return nil |
400 } | 463 } |
| 464 |
| 465 func deleteKeys(c context.Context, k []*datastore.Key) error { |
| 466 if len(k) == 0 { |
| 467 return nil |
| 468 } |
| 469 |
| 470 keys := make([]string, 0, len(k)) |
| 471 for _, key := range k { |
| 472 keys = append(keys, key.Encode()) |
| 473 } |
| 474 |
| 475 payload, err := json.Marshal(struct { |
| 476 Keys []string `json:"keys"` |
| 477 }{ |
| 478 keys, |
| 479 }) |
| 480 if err != nil { |
| 481 return err |
| 482 } |
| 483 |
| 484 h := make(http.Header) |
| 485 h.Set("Content-Type", "application/json") |
| 486 |
| 487 logging.Fields{ |
| 488 "keys": keys, |
| 489 }.Infof(c, "deleteKeys: enqueing") |
| 490 |
| 491 return taskqueue.Get(c).Add(&taskqueue.Task{ |
| 492 Path: "/internal/delete", |
| 493 Payload: payload, |
| 494 Header: h, |
| 495 Method: "POST", |
| 496 Delay: time.Duration(30) * time.Minute, |
| 497 }, deleteKeysQueueName) |
| 498 } |
OLD | NEW |