Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package annotee | 5 package annotee |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bufio" | 8 "bufio" |
| 9 "bytes" | 9 "bytes" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 101 // - If this is < 0, metadata will only be pushed at the beginning and end of | 101 // - If this is < 0, metadata will only be pushed at the beginning and end of |
| 102 // a step. | 102 // a step. |
| 103 // - If this equals 0, metadata will be pushed every time it's upda ted. | 103 // - If this equals 0, metadata will be pushed every time it's upda ted. |
| 104 // - If this is 0, DefaultMetadataUpdateInterval will be used. | 104 // - If this is 0, DefaultMetadataUpdateInterval will be used. |
| 105 MetadataUpdateInterval time.Duration | 105 MetadataUpdateInterval time.Duration |
| 106 | 106 |
| 107 // Offline specifies whether parsing happens not at the same time as | 107 // Offline specifies whether parsing happens not at the same time as |
| 108 // emitting. If true and CURRENT_TIMESTAMP annotations are not provided | 108 // emitting. If true and CURRENT_TIMESTAMP annotations are not provided |
| 109 // then step start/end times are left empty. | 109 // then step start/end times are left empty. |
| 110 Offline bool | 110 Offline bool |
| 111 | |
| 112 // CloseSteps specified whether outstanding open steps must be closed. | |
| 113 CloseSteps bool | |
| 111 } | 114 } |
| 112 | 115 |
| 113 // Processor consumes data from a list of Stream entries and interacts with the | 116 // Processor consumes data from a list of Stream entries and interacts with the |
| 114 // supplied Client instance. | 117 // supplied Client instance. |
| 115 // | 118 // |
| 116 // A Processor must be instantiated with New. | 119 // A Processor must be instantiated with New. |
| 117 type Processor struct { | 120 type Processor struct { |
| 118 ctx context.Context | 121 ctx context.Context |
| 119 o *Options | 122 o *Options |
| 120 | 123 |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 244 // Write to our LogDog stream. | 247 // Write to our LogDog stream. |
| 245 if err := h.writeBaseStream(s, l); err != nil { | 248 if err := h.writeBaseStream(s, l); err != nil { |
| 246 log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.") | 249 log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.") |
| 247 return err | 250 return err |
| 248 } | 251 } |
| 249 } | 252 } |
| 250 | 253 |
| 251 return err | 254 return err |
| 252 } | 255 } |
| 253 | 256 |
| 254 // Finish instructs the Processor to close any outstanding state. This should be | 257 // Finish instructs the Processor to finish any outstanding state. |
| 255 // called when all automatic state updates have completed in case any steps | 258 // It is mandatory to call Finish. |
| 256 // didn't properly close their state. | |
| 257 func (p *Processor) Finish() *annotation.State { | 259 func (p *Processor) Finish() *annotation.State { |
| 258 » // Close our step handlers. | 260 » // Finish our step handlers. |
| 259 for _, h := range p.stepHandlers { | 261 for _, h := range p.stepHandlers { |
| 260 » » p.closeStepHandler(h) | 262 » » p.finishStepHandler(h, p.o.CloseSteps) |
| 261 } | 263 } |
| 262 return p.astate | 264 return p.astate |
| 263 } | 265 } |
| 264 | 266 |
| 265 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan dler, error) { | 267 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan dler, error) { |
| 266 name := step.CanonicalName() | 268 name := step.CanonicalName() |
| 267 if h := p.stepHandlers[name]; h != nil { | 269 if h := p.stepHandlers[name]; h != nil { |
| 268 return h, nil | 270 return h, nil |
| 269 } | 271 } |
| 270 if !create { | 272 if !create { |
| 271 return nil, nil | 273 return nil, nil |
| 272 } | 274 } |
| 273 | 275 |
| 274 h, err := newStepHandler(p, step) | 276 h, err := newStepHandler(p, step) |
| 275 if err != nil { | 277 if err != nil { |
| 276 log.Fields{ | 278 log.Fields{ |
| 277 log.ErrorKey: err, | 279 log.ErrorKey: err, |
| 278 "step": name, | 280 "step": name, |
| 279 }.Errorf(p.ctx, "Failed to create step handler.") | 281 }.Errorf(p.ctx, "Failed to create step handler.") |
| 280 return nil, err | 282 return nil, err |
| 281 } | 283 } |
| 282 p.stepHandlers[name] = h | 284 p.stepHandlers[name] = h |
| 283 return h, nil | 285 return h, nil |
| 284 } | 286 } |
| 285 | 287 |
| 286 func (p *Processor) closeStep(step *annotation.Step) { | 288 func (p *Processor) closeStep(step *annotation.Step) { |
| 287 if h, _ := p.getStepHandler(step, false); h != nil { | 289 if h, _ := p.getStepHandler(step, false); h != nil { |
| 288 » » p.closeStepHandler(h) | 290 » » p.finishStepHandler(h, true) |
| 289 } | 291 } |
| 290 } | 292 } |
| 291 | 293 |
| 292 func (p *Processor) closeStepHandler(h *stepHandler) { | 294 func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) { |
| 293 // Remove this handler from our list. This will stop us from | 295 // Remove this handler from our list. This will stop us from |
| 294 // double-finishing when finish() calls Close(), which calls the StepClo sed | 296 // double-finishing when finish() calls Close(), which calls the StepClo sed |
| 295 // callback. | 297 // callback. |
| 296 delete(p.stepHandlers, h.String()) | 298 delete(p.stepHandlers, h.String()) |
| 297 | 299 |
| 298 // Finish the step. | 300 // Finish the step. |
| 299 » h.finish() | 301 » h.finish(closeSteps) |
|
hinoka
2016/06/16 21:13:58
I don' think you have to pass it in here either
nodir
2016/06/16 21:23:14
it does not always equal p.o.CloseSteps, e.g. 2 hu
nodir
2016/06/16 21:23:14
why?
| |
| 300 } | 302 } |
| 301 | 303 |
| 302 type annotationCallbacks struct { | 304 type annotationCallbacks struct { |
| 303 *Processor | 305 *Processor |
| 304 } | 306 } |
| 305 | 307 |
| 306 func (c *annotationCallbacks) StepClosed(step *annotation.Step) { | 308 func (c *annotationCallbacks) StepClosed(step *annotation.Step) { |
| 307 c.closeStep(step) | 309 c.closeStep(step) |
| 308 } | 310 } |
| 309 | 311 |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 351 context.Context | 353 context.Context |
| 352 | 354 |
| 353 processor *Processor | 355 processor *Processor |
| 354 step *annotation.Step | 356 step *annotation.Step |
| 355 | 357 |
| 356 client streamclient.Client | 358 client streamclient.Client |
| 357 injectedLines []string | 359 injectedLines []string |
| 358 streams map[types.StreamName]streamclient.Stream | 360 streams map[types.StreamName]streamclient.Stream |
| 359 annotationC chan []byte | 361 annotationC chan []byte |
| 360 annotationFinishedC chan struct{} | 362 annotationFinishedC chan struct{} |
| 361 » closed bool | 363 » finished bool |
| 362 } | 364 } |
| 363 | 365 |
| 364 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { | 366 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { |
| 365 h := stepHandler{ | 367 h := stepHandler{ |
| 366 Context: log.SetField(p.ctx, "step", step.CanonicalName()), | 368 Context: log.SetField(p.ctx, "step", step.CanonicalName()), |
| 367 processor: p, | 369 processor: p, |
| 368 step: step, | 370 step: step, |
| 369 | 371 |
| 370 client: p.o.Client, | 372 client: p.o.Client, |
| 371 streams: make(map[types.StreamName]streamclient.Stre am), | 373 streams: make(map[types.StreamName]streamclient.Stre am), |
| (...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 448 timerRunning = true | 450 timerRunning = true |
| 449 } | 451 } |
| 450 | 452 |
| 451 case <-t.GetC(): | 453 case <-t.GetC(): |
| 452 timerRunning = false | 454 timerRunning = false |
| 453 sendLatest() | 455 sendLatest() |
| 454 } | 456 } |
| 455 } | 457 } |
| 456 } | 458 } |
| 457 | 459 |
| 458 func (h *stepHandler) finish() { | 460 func (h *stepHandler) finish(closeSteps bool) { |
| 459 » if h.closed { | 461 » if h.finished { |
| 460 return | 462 return |
| 461 } | 463 } |
| 462 | 464 |
| 463 » // This may send one last annotation to summarize the | 465 » if closeSteps { |
| 464 » // state if closing changed it. | 466 » » h.step.Close(nil) |
| 465 » if h.step.Close(nil) { | |
| 466 » » // Manually mark it updated, since Close callbacks will have unr egistered | |
| 467 » » // us from the standard Updated() reporting loop. | |
| 468 » » h.updated() | |
| 469 } | 467 } |
| 470 | 468 » // Send last annotation unconditionally. |
| 469 » // It may send the same annotation it sent last time; could be optimized . | |
| 470 » h.sendAnnotation() | |
| 471 // Close and reap our meter goroutine. | 471 // Close and reap our meter goroutine. |
| 472 close(h.annotationC) | 472 close(h.annotationC) |
| 473 <-h.annotationFinishedC | 473 <-h.annotationFinishedC |
| 474 | 474 |
| 475 // Close all streams associated with this handler. | 475 // Close all streams associated with this handler. |
| 476 » h.closeAllStreams() | 476 » if closeSteps { |
| 477 » » h.closeAllStreams() | |
| 478 » } | |
| 477 | 479 |
| 478 » h.closed = true | 480 » h.finished = true |
| 479 } | 481 } |
| 480 | 482 |
| 481 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { | 483 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { |
| 482 name := h.step.BaseStream(s.Name) | 484 name := h.step.BaseStream(s.Name) |
| 483 stream, created, err := h.getStream(name, &textStreamArchetype) | 485 stream, created, err := h.getStream(name, &textStreamArchetype) |
| 484 if err != nil { | 486 if err != nil { |
| 485 return err | 487 return err |
| 486 } | 488 } |
| 487 if created { | 489 if created { |
| 488 segs := s.Name.Segments() | 490 segs := s.Name.Segments() |
| 489 h.maybeInjectLink("stdio", segs[len(segs)-1], name) | 491 h.maybeInjectLink("stdio", segs[len(segs)-1], name) |
| 490 } | 492 } |
| 491 return writeTextLine(stream, line) | 493 return writeTextLine(stream, line) |
| 492 } | 494 } |
| 493 | 495 |
| 494 func (h *stepHandler) updated() { | 496 func (h *stepHandler) updated() { |
| 495 » // Ignore updates after the step has closed. | 497 » if !h.finished { |
| 496 » if h.closed { | 498 » » h.sendAnnotation() |
| 497 » » return | |
| 498 } | 499 } |
| 500 } | |
| 499 | 501 |
| 502 func (h *stepHandler) sendAnnotation() { | |
| 500 // Serialize immediately, as the Step's internal state may change in fut ure | 503 // Serialize immediately, as the Step's internal state may change in fut ure |
| 501 // annotation runs. | 504 // annotation runs. |
| 502 p := h.step.Proto() | 505 p := h.step.Proto() |
| 503 | 506 |
| 504 data, err := proto.Marshal(p) | 507 data, err := proto.Marshal(p) |
| 505 if err != nil { | 508 if err != nil { |
| 506 log.WithError(err).Errorf(h, "Failed to marshal state.") | 509 log.WithError(err).Errorf(h, "Failed to marshal state.") |
| 507 return | 510 return |
| 508 } | 511 } |
| 509 h.annotationC <- data | 512 h.annotationC <- data |
| 510 } | 513 } |
| 511 | 514 |
| 512 func (h *stepHandler) getStream(name types.StreamName, flags *streamproto.Flags) (s streamclient.Stream, created bool, err error) { | 515 func (h *stepHandler) getStream(name types.StreamName, flags *streamproto.Flags) (s streamclient.Stream, created bool, err error) { |
| 513 » if h.closed { | 516 » if h.finished { |
| 514 » » err = fmt.Errorf("refusing to get stream %q for closed handler", name) | 517 » » err = fmt.Errorf("refusing to get stream %q for finished handler ", name) |
| 515 return | 518 return |
| 516 } | 519 } |
| 517 if s = h.streams[name]; s != nil { | 520 if s = h.streams[name]; s != nil { |
| 518 return | 521 return |
| 519 } | 522 } |
| 520 | 523 |
| 521 // New stream, will be creating. | 524 // New stream, will be creating. |
| 522 if flags == nil { | 525 if flags == nil { |
| 523 return | 526 return |
| 524 } | 527 } |
| (...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 625 return "" | 628 return "" |
| 626 } | 629 } |
| 627 return strings.TrimSpace(line[3 : len(line)-3]) | 630 return strings.TrimSpace(line[3 : len(line)-3]) |
| 628 } | 631 } |
| 629 | 632 |
| 630 func buildAnnotation(name string, params ...string) string { | 633 func buildAnnotation(name string, params ...string) string { |
| 631 v := make([]string, 1, 1+len(params)) | 634 v := make([]string, 1, 1+len(params)) |
| 632 v[0] = name | 635 v[0] = name |
| 633 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" | 636 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" |
| 634 } | 637 } |
| OLD | NEW |