Chromium Code Reviews| Index: client/logdog/annotee/processor.go |
| diff --git a/client/logdog/annotee/processor.go b/client/logdog/annotee/processor.go |
| index cc1ac2c36e74556b8e4baf82712f4398d3af831b..530047ce0959ec93677b557c569779d74beb929c 100644 |
| --- a/client/logdog/annotee/processor.go |
| +++ b/client/logdog/annotee/processor.go |
| @@ -108,6 +108,9 @@ type Options struct { |
| // emitting. If true and CURRENT_TIMESTAMP annotations are not provided |
| // then step start/end times are left empty. |
| Offline bool |
| + |
| + // CloseSteps specified whether outstanding open steps must be closed. |
| + CloseSteps bool |
| } |
| // Processor consumes data from a list of Stream entries and interacts with the |
| @@ -251,13 +254,12 @@ func (p *Processor) IngestLine(s *Stream, line string) error { |
| return err |
| } |
| -// Finish instructs the Processor to close any outstanding state. This should be |
| -// called when all automatic state updates have completed in case any steps |
| -// didn't properly close their state. |
| +// Finish instructs the Processor to finish any outstanding state. |
| +// It is mandatory to call Finish. |
| func (p *Processor) Finish() *annotation.State { |
| - // Close our step handlers. |
| + // Finish our step handlers. |
| for _, h := range p.stepHandlers { |
| - p.closeStepHandler(h) |
| + p.finishStepHandler(h, p.o.CloseSteps) |
| } |
| return p.astate |
| } |
| @@ -285,18 +287,18 @@ func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan |
| func (p *Processor) closeStep(step *annotation.Step) { |
| if h, _ := p.getStepHandler(step, false); h != nil { |
| - p.closeStepHandler(h) |
| + p.finishStepHandler(h, true) |
| } |
| } |
| -func (p *Processor) closeStepHandler(h *stepHandler) { |
| +func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) { |
| // Remove this handler from our list. This will stop us from |
| // double-finishing when finish() calls Close(), which calls the StepClosed |
| // callback. |
| delete(p.stepHandlers, h.String()) |
| // Finish the step. |
| - h.finish() |
| + h.finish(closeSteps) |
|
hinoka
2016/06/16 21:13:58
I don' think you have to pass it in here either
nodir
2016/06/16 21:23:14
it does not always equal p.o.CloseSteps, e.g. 2 hu
nodir
2016/06/16 21:23:14
why?
|
| } |
| type annotationCallbacks struct { |
| @@ -358,7 +360,7 @@ type stepHandler struct { |
| streams map[types.StreamName]streamclient.Stream |
| annotationC chan []byte |
| annotationFinishedC chan struct{} |
| - closed bool |
| + finished bool |
| } |
| func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { |
| @@ -455,27 +457,27 @@ func (h *stepHandler) runAnnotationMeter(s streamclient.Stream, interval time.Du |
| } |
| } |
| -func (h *stepHandler) finish() { |
| - if h.closed { |
| +func (h *stepHandler) finish(closeSteps bool) { |
| + if h.finished { |
| return |
| } |
| - // This may send one last annotation to summarize the |
| - // state if closing changed it. |
| - if h.step.Close(nil) { |
| - // Manually mark it updated, since Close callbacks will have unregistered |
| - // us from the standard Updated() reporting loop. |
| - h.updated() |
| + if closeSteps { |
| + h.step.Close(nil) |
| } |
| - |
| + // Send last annotation unconditionally. |
| + // It may send the same annotation it sent last time; could be optimized. |
| + h.sendAnnotation() |
| // Close and reap our meter goroutine. |
| close(h.annotationC) |
| <-h.annotationFinishedC |
| // Close all streams associated with this handler. |
| - h.closeAllStreams() |
| + if closeSteps { |
| + h.closeAllStreams() |
| + } |
| - h.closed = true |
| + h.finished = true |
| } |
| func (h *stepHandler) writeBaseStream(s *Stream, line string) error { |
| @@ -492,11 +494,12 @@ func (h *stepHandler) writeBaseStream(s *Stream, line string) error { |
| } |
| func (h *stepHandler) updated() { |
| - // Ignore updates after the step has closed. |
| - if h.closed { |
| - return |
| + if !h.finished { |
| + h.sendAnnotation() |
| } |
| +} |
| +func (h *stepHandler) sendAnnotation() { |
| // Serialize immediately, as the Step's internal state may change in future |
| // annotation runs. |
| p := h.step.Proto() |
| @@ -510,8 +513,8 @@ func (h *stepHandler) updated() { |
| } |
| func (h *stepHandler) getStream(name types.StreamName, flags *streamproto.Flags) (s streamclient.Stream, created bool, err error) { |
| - if h.closed { |
| - err = fmt.Errorf("refusing to get stream %q for closed handler", name) |
| + if h.finished { |
| + err = fmt.Errorf("refusing to get stream %q for finished handler", name) |
| return |
| } |
| if s = h.streams[name]; s != nil { |