| Index: logdog/client/annotee/processor.go
|
| diff --git a/logdog/client/annotee/processor.go b/logdog/client/annotee/processor.go
|
| index 6ed7be04bf7b18ce980c1f7fccd8a69081f28cca..e09ae4698d8ae7939b4306d37d34c293b6a553a2 100644
|
| --- a/logdog/client/annotee/processor.go
|
| +++ b/logdog/client/annotee/processor.go
|
| @@ -116,7 +116,7 @@ type Processor struct {
|
| o *Options
|
|
|
| astate *annotation.State
|
| - stepHandlers map[string]*stepHandler
|
| + stepHandlers map[*annotation.Step]*stepHandler
|
|
|
| annotationStream streamclient.Stream
|
| annotationC chan annotationSignal
|
| @@ -134,7 +134,7 @@ func New(c context.Context, o Options) *Processor {
|
| ctx: c,
|
| o: &o,
|
|
|
| - stepHandlers: make(map[string]*stepHandler),
|
| + stepHandlers: make(map[*annotation.Step]*stepHandler),
|
| }
|
| p.astate = &annotation.State{
|
| LogNameBase: o.Base,
|
| @@ -233,7 +233,6 @@ func (p *Processor) IngestLine(s *Stream, line string) error {
|
| }
|
|
|
| var step *annotation.Step
|
| - var h *stepHandler
|
| if s.Annotate {
|
| if a != "" {
|
| // Append our annotation to the annotation state. This may cause our
|
| @@ -262,20 +261,39 @@ func (p *Processor) IngestLine(s *Stream, line string) error {
|
| // Build our output, which will consist of the initial line and any extra
|
| // lines that have been registered.
|
| inject := h.flushInjectedLines()
|
| - output := make([]string, 1, 1+len(inject))
|
| - output[0] = line
|
| - output = append(output, inject...)
|
| -
|
| - for _, l := range output {
|
| - // If configured, tee to our tee stream.
|
| - if s.Tee != nil && (a == "" || !s.StripAnnotations) {
|
| - // Tee this to the Stream's configured Tee output.
|
| - if err := writeTextLine(s.Tee, l); err != nil {
|
| - log.WithError(err).Errorf(p.ctx, "Failed to tee line.")
|
| + stepOutput := make([]string, 0, 1+len(inject))
|
| +
|
| + // If this is an annotation line, write it to our root handler. Otherwise,
|
| + // write it to our step's handler (by appending it to stepOutput).
|
| + if a != "" {
|
| + // If we're not stripping annotations, emit this to the root handler.
|
| + if !s.StripAnnotations {
|
| + // Get our root log stream handler. As an optimization, if "step" is
|
| + // the root step, then "h" is already the root handler, so we don't need
|
| + // to duplicate the lookup.
|
| + var rootHandler *stepHandler
|
| + if rootStep := p.astate.RootStep(); rootStep != step {
|
| + rootHandler, err = p.getStepHandler(rootStep, true)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + } else {
|
| + rootHandler = h
|
| + }
|
| +
|
| + if err := rootHandler.writeBaseStream(s, line); err != nil {
|
| + log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.")
|
| return err
|
| }
|
| }
|
| + } else {
|
| + stepOutput = append(stepOutput, line)
|
| + }
|
| +
|
| + // Add any injected lines.
|
| + stepOutput = append(stepOutput, inject...)
|
|
|
| + for _, l := range stepOutput {
|
| // Write to our LogDog stream.
|
| if err := h.writeBaseStream(s, l); err != nil {
|
| log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.")
|
| @@ -315,8 +333,7 @@ func (p *Processor) Finish() *annotation.State {
|
| }
|
|
|
| func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHandler, error) {
|
| - name := step.CanonicalName()
|
| - if h := p.stepHandlers[name]; h != nil {
|
| + if h := p.stepHandlers[step]; h != nil {
|
| return h, nil
|
| }
|
| if !create {
|
| @@ -327,11 +344,11 @@ func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan
|
| if err != nil {
|
| log.Fields{
|
| log.ErrorKey: err,
|
| - "step": name,
|
| + "step": step,
|
| }.Errorf(p.ctx, "Failed to create step handler.")
|
| return nil, err
|
| }
|
| - p.stepHandlers[name] = h
|
| + p.stepHandlers[step] = h
|
| return h, nil
|
| }
|
|
|
| @@ -345,7 +362,7 @@ func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) {
|
| // Remove this handler from our list. This will stop us from
|
| // double-finishing when finish() calls Close(), which calls the StepClosed
|
| // callback.
|
| - delete(p.stepHandlers, h.String())
|
| + delete(p.stepHandlers, h.step)
|
|
|
| // Finish the step.
|
| h.finish(closeSteps)
|
| @@ -485,7 +502,7 @@ type stepHandler struct {
|
|
|
| func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) {
|
| h := stepHandler{
|
| - Context: log.SetField(p.ctx, "step", step.CanonicalName()),
|
| + Context: log.SetField(p.ctx, "step", step),
|
| processor: p,
|
| step: step,
|
|
|
| @@ -500,7 +517,7 @@ func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) {
|
| }
|
|
|
| func (h *stepHandler) String() string {
|
| - return h.step.CanonicalName()
|
| + return h.step.String()
|
| }
|
|
|
| func (h *stepHandler) finish(closeSteps bool) {
|
| @@ -523,6 +540,15 @@ func (h *stepHandler) finish(closeSteps bool) {
|
| }
|
|
|
| func (h *stepHandler) writeBaseStream(s *Stream, line string) error {
|
| + // If we're teeing, also write this to our Processor's tee stream.
|
| + if s.Tee != nil {
|
| + // Tee this to the Stream's configured Tee output.
|
| + if err := writeTextLine(s.Tee, line); err != nil {
|
| + log.WithError(err).Errorf(h, "Failed to tee line.")
|
| + return err
|
| + }
|
| + }
|
| +
|
| name := h.step.BaseStream(s.Name)
|
| stream, created, err := h.getStream(name, &textStreamArchetype)
|
| if err != nil {
|
|
|