Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: client/logdog/annotee/processor.go

Issue 2078603002: milo: fix running steps (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@milo-pending
Patch Set: address comments Created 4 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/logdog/annotee/annotation/annotation.go ('k') | common/proto/milo/annotations.proto » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package annotee 5 package annotee
6 6
7 import ( 7 import (
8 "bufio" 8 "bufio"
9 "bytes" 9 "bytes"
10 "fmt" 10 "fmt"
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
101 // - If this is < 0, metadata will only be pushed at the beginning and end of 101 // - If this is < 0, metadata will only be pushed at the beginning and end of
102 // a step. 102 // a step.
103 // - If this equals 0, metadata will be pushed every time it's upda ted. 103 // - If this equals 0, metadata will be pushed every time it's upda ted.
104 // - If this is 0, DefaultMetadataUpdateInterval will be used. 104 // - If this is 0, DefaultMetadataUpdateInterval will be used.
105 MetadataUpdateInterval time.Duration 105 MetadataUpdateInterval time.Duration
106 106
107 // Offline specifies whether parsing happens not at the same time as 107 // Offline specifies whether parsing happens not at the same time as
108 // emitting. If true and CURRENT_TIMESTAMP annotations are not provided 108 // emitting. If true and CURRENT_TIMESTAMP annotations are not provided
109 // then step start/end times are left empty. 109 // then step start/end times are left empty.
110 Offline bool 110 Offline bool
111
112 // CloseSteps specified whether outstanding open steps must be closed.
113 CloseSteps bool
111 } 114 }
112 115
113 // Processor consumes data from a list of Stream entries and interacts with the 116 // Processor consumes data from a list of Stream entries and interacts with the
114 // supplied Client instance. 117 // supplied Client instance.
115 // 118 //
116 // A Processor must be instantiated with New. 119 // A Processor must be instantiated with New.
117 type Processor struct { 120 type Processor struct {
118 ctx context.Context 121 ctx context.Context
119 o *Options 122 o *Options
120 123
(...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after
244 // Write to our LogDog stream. 247 // Write to our LogDog stream.
245 if err := h.writeBaseStream(s, l); err != nil { 248 if err := h.writeBaseStream(s, l); err != nil {
246 log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.") 249 log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.")
247 return err 250 return err
248 } 251 }
249 } 252 }
250 253
251 return err 254 return err
252 } 255 }
253 256
254 // Finish instructs the Processor to close any outstanding state. This should be 257 // Finish instructs the Processor to finish any outstanding state.
255 // called when all automatic state updates have completed in case any steps 258 // It is mandatory to call Finish.
256 // didn't properly close their state.
257 func (p *Processor) Finish() *annotation.State { 259 func (p *Processor) Finish() *annotation.State {
258 » // Close our step handlers. 260 » // Finish our step handlers.
259 for _, h := range p.stepHandlers { 261 for _, h := range p.stepHandlers {
260 » » p.closeStepHandler(h) 262 » » p.finishStepHandler(h, p.o.CloseSteps)
261 } 263 }
262 return p.astate 264 return p.astate
263 } 265 }
264 266
265 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan dler, error) { 267 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan dler, error) {
266 name := step.CanonicalName() 268 name := step.CanonicalName()
267 if h := p.stepHandlers[name]; h != nil { 269 if h := p.stepHandlers[name]; h != nil {
268 return h, nil 270 return h, nil
269 } 271 }
270 if !create { 272 if !create {
271 return nil, nil 273 return nil, nil
272 } 274 }
273 275
274 h, err := newStepHandler(p, step) 276 h, err := newStepHandler(p, step)
275 if err != nil { 277 if err != nil {
276 log.Fields{ 278 log.Fields{
277 log.ErrorKey: err, 279 log.ErrorKey: err,
278 "step": name, 280 "step": name,
279 }.Errorf(p.ctx, "Failed to create step handler.") 281 }.Errorf(p.ctx, "Failed to create step handler.")
280 return nil, err 282 return nil, err
281 } 283 }
282 p.stepHandlers[name] = h 284 p.stepHandlers[name] = h
283 return h, nil 285 return h, nil
284 } 286 }
285 287
286 func (p *Processor) closeStep(step *annotation.Step) { 288 func (p *Processor) closeStep(step *annotation.Step) {
287 if h, _ := p.getStepHandler(step, false); h != nil { 289 if h, _ := p.getStepHandler(step, false); h != nil {
288 » » p.closeStepHandler(h) 290 » » p.finishStepHandler(h, true)
289 } 291 }
290 } 292 }
291 293
292 func (p *Processor) closeStepHandler(h *stepHandler) { 294 func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) {
293 // Remove this handler from our list. This will stop us from 295 // Remove this handler from our list. This will stop us from
294 // double-finishing when finish() calls Close(), which calls the StepClo sed 296 // double-finishing when finish() calls Close(), which calls the StepClo sed
295 // callback. 297 // callback.
296 delete(p.stepHandlers, h.String()) 298 delete(p.stepHandlers, h.String())
297 299
298 // Finish the step. 300 // Finish the step.
299 » h.finish() 301 » h.finish(closeSteps)
hinoka 2016/06/16 21:13:58 I don' think you have to pass it in here either
nodir 2016/06/16 21:23:14 it does not always equal p.o.CloseSteps, e.g. 2 hu
nodir 2016/06/16 21:23:14 why?
300 } 302 }
301 303
302 type annotationCallbacks struct { 304 type annotationCallbacks struct {
303 *Processor 305 *Processor
304 } 306 }
305 307
306 func (c *annotationCallbacks) StepClosed(step *annotation.Step) { 308 func (c *annotationCallbacks) StepClosed(step *annotation.Step) {
307 c.closeStep(step) 309 c.closeStep(step)
308 } 310 }
309 311
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
351 context.Context 353 context.Context
352 354
353 processor *Processor 355 processor *Processor
354 step *annotation.Step 356 step *annotation.Step
355 357
356 client streamclient.Client 358 client streamclient.Client
357 injectedLines []string 359 injectedLines []string
358 streams map[types.StreamName]streamclient.Stream 360 streams map[types.StreamName]streamclient.Stream
359 annotationC chan []byte 361 annotationC chan []byte
360 annotationFinishedC chan struct{} 362 annotationFinishedC chan struct{}
361 » closed bool 363 » finished bool
362 } 364 }
363 365
364 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { 366 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) {
365 h := stepHandler{ 367 h := stepHandler{
366 Context: log.SetField(p.ctx, "step", step.CanonicalName()), 368 Context: log.SetField(p.ctx, "step", step.CanonicalName()),
367 processor: p, 369 processor: p,
368 step: step, 370 step: step,
369 371
370 client: p.o.Client, 372 client: p.o.Client,
371 streams: make(map[types.StreamName]streamclient.Stre am), 373 streams: make(map[types.StreamName]streamclient.Stre am),
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
448 timerRunning = true 450 timerRunning = true
449 } 451 }
450 452
451 case <-t.GetC(): 453 case <-t.GetC():
452 timerRunning = false 454 timerRunning = false
453 sendLatest() 455 sendLatest()
454 } 456 }
455 } 457 }
456 } 458 }
457 459
458 func (h *stepHandler) finish() { 460 func (h *stepHandler) finish(closeSteps bool) {
459 » if h.closed { 461 » if h.finished {
460 return 462 return
461 } 463 }
462 464
463 » // This may send one last annotation to summarize the 465 » if closeSteps {
464 » // state if closing changed it. 466 » » h.step.Close(nil)
465 » if h.step.Close(nil) {
466 » » // Manually mark it updated, since Close callbacks will have unr egistered
467 » » // us from the standard Updated() reporting loop.
468 » » h.updated()
469 } 467 }
470 468 » // Send last annotation unconditionally.
469 » // It may send the same annotation it sent last time; could be optimized .
470 » h.sendAnnotation()
471 // Close and reap our meter goroutine. 471 // Close and reap our meter goroutine.
472 close(h.annotationC) 472 close(h.annotationC)
473 <-h.annotationFinishedC 473 <-h.annotationFinishedC
474 474
475 // Close all streams associated with this handler. 475 // Close all streams associated with this handler.
476 » h.closeAllStreams() 476 » if closeSteps {
477 » » h.closeAllStreams()
478 » }
477 479
478 » h.closed = true 480 » h.finished = true
479 } 481 }
480 482
481 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { 483 func (h *stepHandler) writeBaseStream(s *Stream, line string) error {
482 name := h.step.BaseStream(s.Name) 484 name := h.step.BaseStream(s.Name)
483 stream, created, err := h.getStream(name, &textStreamArchetype) 485 stream, created, err := h.getStream(name, &textStreamArchetype)
484 if err != nil { 486 if err != nil {
485 return err 487 return err
486 } 488 }
487 if created { 489 if created {
488 segs := s.Name.Segments() 490 segs := s.Name.Segments()
489 h.maybeInjectLink("stdio", segs[len(segs)-1], name) 491 h.maybeInjectLink("stdio", segs[len(segs)-1], name)
490 } 492 }
491 return writeTextLine(stream, line) 493 return writeTextLine(stream, line)
492 } 494 }
493 495
494 func (h *stepHandler) updated() { 496 func (h *stepHandler) updated() {
495 » // Ignore updates after the step has closed. 497 » if !h.finished {
496 » if h.closed { 498 » » h.sendAnnotation()
497 » » return
498 } 499 }
500 }
499 501
502 func (h *stepHandler) sendAnnotation() {
500 // Serialize immediately, as the Step's internal state may change in fut ure 503 // Serialize immediately, as the Step's internal state may change in fut ure
501 // annotation runs. 504 // annotation runs.
502 p := h.step.Proto() 505 p := h.step.Proto()
503 506
504 data, err := proto.Marshal(p) 507 data, err := proto.Marshal(p)
505 if err != nil { 508 if err != nil {
506 log.WithError(err).Errorf(h, "Failed to marshal state.") 509 log.WithError(err).Errorf(h, "Failed to marshal state.")
507 return 510 return
508 } 511 }
509 h.annotationC <- data 512 h.annotationC <- data
510 } 513 }
511 514
512 func (h *stepHandler) getStream(name types.StreamName, flags *streamproto.Flags) (s streamclient.Stream, created bool, err error) { 515 func (h *stepHandler) getStream(name types.StreamName, flags *streamproto.Flags) (s streamclient.Stream, created bool, err error) {
513 » if h.closed { 516 » if h.finished {
514 » » err = fmt.Errorf("refusing to get stream %q for closed handler", name) 517 » » err = fmt.Errorf("refusing to get stream %q for finished handler ", name)
515 return 518 return
516 } 519 }
517 if s = h.streams[name]; s != nil { 520 if s = h.streams[name]; s != nil {
518 return 521 return
519 } 522 }
520 523
521 // New stream, will be creating. 524 // New stream, will be creating.
522 if flags == nil { 525 if flags == nil {
523 return 526 return
524 } 527 }
(...skipping 100 matching lines...) Expand 10 before | Expand all | Expand 10 after
625 return "" 628 return ""
626 } 629 }
627 return strings.TrimSpace(line[3 : len(line)-3]) 630 return strings.TrimSpace(line[3 : len(line)-3])
628 } 631 }
629 632
630 func buildAnnotation(name string, params ...string) string { 633 func buildAnnotation(name string, params ...string) string {
631 v := make([]string, 1, 1+len(params)) 634 v := make([]string, 1, 1+len(params))
632 v[0] = name 635 v[0] = name
633 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" 636 return "@@@" + strings.Join(append(v, params...), "@") + "@@@"
634 } 637 }
OLDNEW
« no previous file with comments | « client/logdog/annotee/annotation/annotation.go ('k') | common/proto/milo/annotations.proto » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698