| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package annotee | 5 package annotee |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bufio" | 8 "bufio" |
| 9 "bytes" | 9 "bytes" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 269 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 280 | 280 |
| 281 h, err := p.getStepHandler(step, true) | 281 h, err := p.getStepHandler(step, true) |
| 282 if err != nil { | 282 if err != nil { |
| 283 return err | 283 return err |
| 284 } | 284 } |
| 285 | 285 |
| 286 // Determine our injected annotations. | 286 // Determine our injected annotations. |
| 287 injectedAnnotations := h.flushInjectedAnnotations() | 287 injectedAnnotations := h.flushInjectedAnnotations() |
| 288 | 288 |
| 289 // Emit the "all" link if configured (at most once). | 289 // Emit the "all" link if configured (at most once). |
| 290 » if s.EmitAllLink { | 290 » if lg := p.o.LinkGenerator; lg != nil && s.EmitAllLink { |
| 291 » » if l := buildStreamLinkAnnotation(p.o.LinkGenerator, "all", "std
io", "**/stdout", "**/stderr"); l != "" { | 291 » » injectedAnnotations = append(injectedAnnotations, |
| 292 » » » injectedAnnotations = append(injectedAnnotations, l) | 292 » » » buildAliasAnnotation("all", "stdio", lg.GetLink("**/stdo
ut", "**/stderr"))) |
| 293 » » } | 293 |
| 294 s.EmitAllLink = false | 294 s.EmitAllLink = false |
| 295 } | 295 } |
| 296 | 296 |
| 297 // Get our root log stream handler. As an optimization, if "step" is | 297 // Get our root log stream handler. As an optimization, if "step" is |
| 298 // the root step, then "h" is already the root handler, so we don't need | 298 // the root step, then "h" is already the root handler, so we don't need |
| 299 // to duplicate the lookup. | 299 // to duplicate the lookup. |
| 300 // | 300 // |
| 301 // We only need the handler if we're going to emit annotations to the ro
ot | 301 // We only need the handler if we're going to emit annotations to the ro
ot |
| 302 // stream. | 302 // stream. |
| 303 var rootHandler *stepHandler | 303 var rootHandler *stepHandler |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 349 if !s.StripAnnotations { | 349 if !s.StripAnnotations { |
| 350 if err := rootHandler.writeBaseStream(s, anno); err != n
il { | 350 if err := rootHandler.writeBaseStream(s, anno); err != n
il { |
| 351 log.WithError(err).Errorf(h, "Failed to send inj
ected annotation line to LogDog.") | 351 log.WithError(err).Errorf(h, "Failed to send inj
ected annotation line to LogDog.") |
| 352 return err | 352 return err |
| 353 } | 353 } |
| 354 } | 354 } |
| 355 } | 355 } |
| 356 | 356 |
| 357 // If we're stripping text, write a warning message noting that this str
eam | 357 // If we're stripping text, write a warning message noting that this str
eam |
| 358 // will not have text in it. | 358 // will not have text in it. |
| 359 » if !p.o.TeeText && s.Tee != nil && !h.textStrippedNote { | 359 » if !p.o.TeeText && s.Tee != nil { |
| 360 » » err := writeTextLine(s.Tee, "This build is configured to send lo
g data exclusively to LogDog. "+ | 360 » » if !h.textStrippedNote { |
| 361 » » » "Please click the LogDog link on the build page to view
this log stream.") | 361 » » » err := writeTextLine(s.Tee, "This build is configured to
send log data exclusively to LogDog. "+ |
| 362 » » if err != nil { | 362 » » » » "Please use the LogDog link on the build page to
view this log stream.") |
| 363 » » » log.WithError(err).Errorf(h, "Failed to write text strip
ped notice.") | 363 » » » if err != nil { |
| 364 » » » return err | 364 » » » » log.WithError(err).Errorf(h, "Failed to write te
xt stripped notice.") |
| 365 » » » » return err |
| 366 » » » } |
| 367 |
| 368 » » » h.textStrippedNote = true |
| 365 } | 369 } |
| 366 | 370 |
| 367 » » h.textStrippedNote = true | 371 » » // Add links to specific log streams as they are generated. |
| 372 » » injectTextStreamLines := h.flushInjectedTextStreamLines() |
| 373 » » for _, line := range injectTextStreamLines { |
| 374 » » » if err := writeTextLine(s.Tee, line); err != nil { |
| 375 » » » » log.WithError(err).Errorf(h, "Failed to inject t
ext stream line: %s", line) |
| 376 » » » » return err |
| 377 » » » } |
| 378 » » } |
| 368 } | 379 } |
| 369 | 380 |
| 370 // If this is a text line, and we're teeing text, emit this line. | 381 // If this is a text line, and we're teeing text, emit this line. |
| 371 if a == "" { | 382 if a == "" { |
| 372 if s.Tee != nil && p.o.TeeText { | 383 if s.Tee != nil && p.o.TeeText { |
| 373 if err := writeTextLine(s.Tee, line); err != nil { | 384 if err := writeTextLine(s.Tee, line); err != nil { |
| 374 log.WithError(err).Errorf(h, "Failed to tee text
line.") | 385 log.WithError(err).Errorf(h, "Failed to tee text
line.") |
| 375 return err | 386 return err |
| 376 } | 387 } |
| 377 } | 388 } |
| (...skipping 191 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 569 } | 580 } |
| 570 } | 581 } |
| 571 | 582 |
| 572 // stepHandler handles the steps associated with a specified stream. | 583 // stepHandler handles the steps associated with a specified stream. |
| 573 type stepHandler struct { | 584 type stepHandler struct { |
| 574 context.Context | 585 context.Context |
| 575 | 586 |
| 576 processor *Processor | 587 processor *Processor |
| 577 step *annotation.Step | 588 step *annotation.Step |
| 578 | 589 |
| 579 » client streamclient.Client | 590 » client streamclient.Client |
| 580 » injectedAnnotations []string | 591 » injectedAnnotations []string |
| 581 » streams map[types.StreamName]streamclient.Stream | 592 » injectedTextStreamLines []string |
| 582 » finished bool | 593 » streams map[types.StreamName]streamclient.Stream |
| 583 » allEmitted bool | 594 » finished bool |
| 584 » textStrippedNote bool | 595 » allEmitted bool |
| 596 » textStrippedNote bool |
| 585 } | 597 } |
| 586 | 598 |
| 587 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { | 599 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { |
| 588 h := stepHandler{ | 600 h := stepHandler{ |
| 589 Context: log.SetField(p.ctx, "step", step), | 601 Context: log.SetField(p.ctx, "step", step), |
| 590 processor: p, | 602 processor: p, |
| 591 step: step, | 603 step: step, |
| 592 | 604 |
| 593 client: p.o.Client, | 605 client: p.o.Client, |
| 594 streams: make(map[types.StreamName]streamclient.Stream), | 606 streams: make(map[types.StreamName]streamclient.Stream), |
| (...skipping 96 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 691 func (h *stepHandler) closeStreamImpl(name types.StreamName, s streamclient.Stre
am) { | 703 func (h *stepHandler) closeStreamImpl(name types.StreamName, s streamclient.Stre
am) { |
| 692 if err := s.Close(); err != nil { | 704 if err := s.Close(); err != nil { |
| 693 log.Fields{ | 705 log.Fields{ |
| 694 log.ErrorKey: err, | 706 log.ErrorKey: err, |
| 695 "stream": name, | 707 "stream": name, |
| 696 }.Errorf(h, "Failed to close step stream.") | 708 }.Errorf(h, "Failed to close step stream.") |
| 697 } | 709 } |
| 698 } | 710 } |
| 699 | 711 |
| 700 func (h *stepHandler) flushInjectedAnnotations() []string { | 712 func (h *stepHandler) flushInjectedAnnotations() []string { |
| 701 » if len(h.injectedAnnotations) == 0 { | 713 » return flushStringSlice(&h.injectedAnnotations) |
| 714 } |
| 715 |
| 716 func (h *stepHandler) flushInjectedTextStreamLines() []string { |
| 717 » return flushStringSlice(&h.injectedTextStreamLines) |
| 718 } |
| 719 |
| 720 func (h *stepHandler) maybeInjectLink(base, text string, names ...types.StreamNa
me) { |
| 721 » if lg := h.processor.o.LinkGenerator; lg != nil { |
| 722 » » link := lg.GetLink(names...) |
| 723 |
| 724 » » h.injectedAnnotations = append(h.injectedAnnotations, buildAlias
Annotation(base, text, link)) |
| 725 » » h.injectedTextStreamLines = append(h.injectedTextStreamLines, fm
t.Sprintf("LogDog Link [%s]: %s", base, link)) |
| 726 » } |
| 727 } |
| 728 |
| 729 func (h *stepHandler) maybeInjectTextStreamLink(name string, stream types.Stream
Name) { |
| 730 » if lg := h.processor.o.LinkGenerator; lg != nil { |
| 731 » } |
| 732 } |
| 733 |
| 734 func buildAliasAnnotation(base, text, link string) string { |
| 735 » return buildAnnotation("STEP_LINK", fmt.Sprintf("%s-->%s", text, base),
link) |
| 736 } |
| 737 |
| 738 func flushStringSlice(sp *[]string) []string { |
| 739 » if sp == nil { |
| 702 return nil | 740 return nil |
| 703 } | 741 } |
| 704 | 742 |
| 705 » lines := make([]string, len(h.injectedAnnotations)) | 743 » s := *sp |
| 706 » copy(lines, h.injectedAnnotations) | 744 » if len(s) == 0 { |
| 707 » h.injectedAnnotations = h.injectedAnnotations[:0] | 745 » » return nil |
| 746 » } |
| 747 |
| 748 » lines := make([]string, len(s)) |
| 749 » copy(lines, s) |
| 750 » *sp = s[:0] |
| 708 | 751 |
| 709 return lines | 752 return lines |
| 710 } | 753 } |
| 711 | 754 |
| 712 func (h *stepHandler) maybeInjectLink(base, text string, names ...types.StreamNa
me) { | |
| 713 if l := buildStreamLinkAnnotation(h.processor.o.LinkGenerator, base, tex
t, names...); l != "" { | |
| 714 h.injectedAnnotations = append(h.injectedAnnotations, l) | |
| 715 } | |
| 716 } | |
| 717 | |
| 718 func buildStreamLinkAnnotation(lg LinkGenerator, base, text string, names ...typ
es.StreamName) string { | |
| 719 if lg != nil { | |
| 720 if link := lg.GetLink(names...); link != "" { | |
| 721 return buildAnnotation("STEP_LINK", fmt.Sprintf("%s-->%s
", text, base), link) | |
| 722 } | |
| 723 } | |
| 724 return "" | |
| 725 } | |
| 726 | |
| 727 // lineReader reads from an input stream and returns the data line-by-line. | 755 // lineReader reads from an input stream and returns the data line-by-line. |
| 728 // | 756 // |
| 729 // We don't use a Scanner because we want to be able to handle lines that may | 757 // We don't use a Scanner because we want to be able to handle lines that may |
| 730 // exceed the buffer length. We don't use ReadBytes here because we need to | 758 // exceed the buffer length. We don't use ReadBytes here because we need to |
| 731 // capture the last line in the stream, even if it doesn't end with a newline. | 759 // capture the last line in the stream, even if it doesn't end with a newline. |
| 732 type lineReader struct { | 760 type lineReader struct { |
| 733 r *bufio.Reader | 761 r *bufio.Reader |
| 734 buf bytes.Buffer | 762 buf bytes.Buffer |
| 735 } | 763 } |
| 736 | 764 |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 774 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" | 802 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" |
| 775 } | 803 } |
| 776 | 804 |
| 777 func isContentAnnotation(a string) bool { | 805 func isContentAnnotation(a string) bool { |
| 778 // Strip out any annotation arguments. | 806 // Strip out any annotation arguments. |
| 779 if idx := strings.IndexRune(a, '@'); idx > 0 { | 807 if idx := strings.IndexRune(a, '@'); idx > 0 { |
| 780 a = a[:idx] | 808 a = a[:idx] |
| 781 } | 809 } |
| 782 return a == "STEP_LOG_LINE" | 810 return a == "STEP_LOG_LINE" |
| 783 } | 811 } |
| OLD | NEW |