Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(455)

Unified Diff: client/logdog/annotee/processor.go

Issue 2078603002: milo: fix running steps (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@milo-pending
Patch Set: address comments Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « client/logdog/annotee/annotation/annotation.go ('k') | common/proto/milo/annotations.proto » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: client/logdog/annotee/processor.go
diff --git a/client/logdog/annotee/processor.go b/client/logdog/annotee/processor.go
index cc1ac2c36e74556b8e4baf82712f4398d3af831b..530047ce0959ec93677b557c569779d74beb929c 100644
--- a/client/logdog/annotee/processor.go
+++ b/client/logdog/annotee/processor.go
@@ -108,6 +108,9 @@ type Options struct {
// emitting. If true and CURRENT_TIMESTAMP annotations are not provided
// then step start/end times are left empty.
Offline bool
+
+ // CloseSteps specified whether outstanding open steps must be closed.
+ CloseSteps bool
}
// Processor consumes data from a list of Stream entries and interacts with the
@@ -251,13 +254,12 @@ func (p *Processor) IngestLine(s *Stream, line string) error {
return err
}
-// Finish instructs the Processor to close any outstanding state. This should be
-// called when all automatic state updates have completed in case any steps
-// didn't properly close their state.
+// Finish instructs the Processor to finish any outstanding state.
+// It is mandatory to call Finish.
func (p *Processor) Finish() *annotation.State {
- // Close our step handlers.
+ // Finish our step handlers.
for _, h := range p.stepHandlers {
- p.closeStepHandler(h)
+ p.finishStepHandler(h, p.o.CloseSteps)
}
return p.astate
}
@@ -285,18 +287,18 @@ func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan
func (p *Processor) closeStep(step *annotation.Step) {
if h, _ := p.getStepHandler(step, false); h != nil {
- p.closeStepHandler(h)
+ p.finishStepHandler(h, true)
}
}
-func (p *Processor) closeStepHandler(h *stepHandler) {
+func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) {
// Remove this handler from our list. This will stop us from
// double-finishing when finish() calls Close(), which calls the StepClosed
// callback.
delete(p.stepHandlers, h.String())
// Finish the step.
- h.finish()
+ h.finish(closeSteps)
hinoka 2016/06/16 21:13:58 I don' think you have to pass it in here either
nodir 2016/06/16 21:23:14 it does not always equal p.o.CloseSteps, e.g. 2 hu
nodir 2016/06/16 21:23:14 why?
}
type annotationCallbacks struct {
@@ -358,7 +360,7 @@ type stepHandler struct {
streams map[types.StreamName]streamclient.Stream
annotationC chan []byte
annotationFinishedC chan struct{}
- closed bool
+ finished bool
}
func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) {
@@ -455,27 +457,27 @@ func (h *stepHandler) runAnnotationMeter(s streamclient.Stream, interval time.Du
}
}
-func (h *stepHandler) finish() {
- if h.closed {
+func (h *stepHandler) finish(closeSteps bool) {
+ if h.finished {
return
}
- // This may send one last annotation to summarize the
- // state if closing changed it.
- if h.step.Close(nil) {
- // Manually mark it updated, since Close callbacks will have unregistered
- // us from the standard Updated() reporting loop.
- h.updated()
+ if closeSteps {
+ h.step.Close(nil)
}
-
+ // Send last annotation unconditionally.
+ // It may send the same annotation it sent last time; could be optimized.
+ h.sendAnnotation()
// Close and reap our meter goroutine.
close(h.annotationC)
<-h.annotationFinishedC
// Close all streams associated with this handler.
- h.closeAllStreams()
+ if closeSteps {
+ h.closeAllStreams()
+ }
- h.closed = true
+ h.finished = true
}
func (h *stepHandler) writeBaseStream(s *Stream, line string) error {
@@ -492,11 +494,12 @@ func (h *stepHandler) writeBaseStream(s *Stream, line string) error {
}
func (h *stepHandler) updated() {
- // Ignore updates after the step has closed.
- if h.closed {
- return
+ if !h.finished {
+ h.sendAnnotation()
}
+}
+func (h *stepHandler) sendAnnotation() {
// Serialize immediately, as the Step's internal state may change in future
// annotation runs.
p := h.step.Proto()
@@ -510,8 +513,8 @@ func (h *stepHandler) updated() {
}
func (h *stepHandler) getStream(name types.StreamName, flags *streamproto.Flags) (s streamclient.Stream, created bool, err error) {
- if h.closed {
- err = fmt.Errorf("refusing to get stream %q for closed handler", name)
+ if h.finished {
+ err = fmt.Errorf("refusing to get stream %q for finished handler", name)
return
}
if s = h.streams[name]; s != nil {
« no previous file with comments | « client/logdog/annotee/annotation/annotation.go ('k') | common/proto/milo/annotations.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698