| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package annotee | 5 package annotee |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bufio" | 8 "bufio" |
| 9 "bytes" | 9 "bytes" |
| 10 "fmt" | 10 "fmt" |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 109 | 109 |
| 110 // Processor consumes data from a list of Stream entries and interacts with the | 110 // Processor consumes data from a list of Stream entries and interacts with the |
| 111 // supplied Client instance. | 111 // supplied Client instance. |
| 112 // | 112 // |
| 113 // A Processor must be instantiated with New. | 113 // A Processor must be instantiated with New. |
| 114 type Processor struct { | 114 type Processor struct { |
| 115 ctx context.Context | 115 ctx context.Context |
| 116 o *Options | 116 o *Options |
| 117 | 117 |
| 118 astate *annotation.State | 118 astate *annotation.State |
| 119 » stepHandlers map[string]*stepHandler | 119 » stepHandlers map[*annotation.Step]*stepHandler |
| 120 | 120 |
| 121 annotationStream streamclient.Stream | 121 annotationStream streamclient.Stream |
| 122 annotationC chan annotationSignal | 122 annotationC chan annotationSignal |
| 123 annotationFinishedC chan struct{} | 123 annotationFinishedC chan struct{} |
| 124 } | 124 } |
| 125 | 125 |
| 126 type annotationSignal struct { | 126 type annotationSignal struct { |
| 127 data []byte | 127 data []byte |
| 128 updateType annotation.UpdateType | 128 updateType annotation.UpdateType |
| 129 } | 129 } |
| 130 | 130 |
| 131 // New instantiates a new Processor. | 131 // New instantiates a new Processor. |
| 132 func New(c context.Context, o Options) *Processor { | 132 func New(c context.Context, o Options) *Processor { |
| 133 p := Processor{ | 133 p := Processor{ |
| 134 ctx: c, | 134 ctx: c, |
| 135 o: &o, | 135 o: &o, |
| 136 | 136 |
| 137 » » stepHandlers: make(map[string]*stepHandler), | 137 » » stepHandlers: make(map[*annotation.Step]*stepHandler), |
| 138 } | 138 } |
| 139 p.astate = &annotation.State{ | 139 p.astate = &annotation.State{ |
| 140 LogNameBase: o.Base, | 140 LogNameBase: o.Base, |
| 141 Callbacks: &annotationCallbacks{&p}, | 141 Callbacks: &annotationCallbacks{&p}, |
| 142 Execution: o.Execution, | 142 Execution: o.Execution, |
| 143 Clock: clock.Get(c), | 143 Clock: clock.Get(c), |
| 144 Offline: o.Offline, | 144 Offline: o.Offline, |
| 145 } | 145 } |
| 146 return &p | 146 return &p |
| 147 } | 147 } |
| (...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 226 log.WithError(err).Errorf(p.ctx, "Failed to initialize.") | 226 log.WithError(err).Errorf(p.ctx, "Failed to initialize.") |
| 227 return err | 227 return err |
| 228 } | 228 } |
| 229 | 229 |
| 230 a := extractAnnotation(line) | 230 a := extractAnnotation(line) |
| 231 if a != "" { | 231 if a != "" { |
| 232 log.Debugf(p.ctx, "Annotation: %q", a) | 232 log.Debugf(p.ctx, "Annotation: %q", a) |
| 233 } | 233 } |
| 234 | 234 |
| 235 var step *annotation.Step | 235 var step *annotation.Step |
| 236 var h *stepHandler | |
| 237 if s.Annotate { | 236 if s.Annotate { |
| 238 if a != "" { | 237 if a != "" { |
| 239 // Append our annotation to the annotation state. This m
ay cause our | 238 // Append our annotation to the annotation state. This m
ay cause our |
| 240 // annotation callbacks to be invoked. | 239 // annotation callbacks to be invoked. |
| 241 if err := p.astate.Append(a); err != nil { | 240 if err := p.astate.Append(a); err != nil { |
| 242 log.Fields{ | 241 log.Fields{ |
| 243 log.ErrorKey: err, | 242 log.ErrorKey: err, |
| 244 "stream": s.Name, | 243 "stream": s.Name, |
| 245 "annotation": a, | 244 "annotation": a, |
| 246 }.Errorf(p.ctx, "Failed to process annotation.") | 245 }.Errorf(p.ctx, "Failed to process annotation.") |
| 247 } | 246 } |
| 248 } | 247 } |
| 249 | 248 |
| 250 // Use the step handler for the current step. | 249 // Use the step handler for the current step. |
| 251 step = p.astate.CurrentStep() | 250 step = p.astate.CurrentStep() |
| 252 } else { | 251 } else { |
| 253 // Not handling annotations. Use our root step handler. | 252 // Not handling annotations. Use our root step handler. |
| 254 step = p.astate.RootStep() | 253 step = p.astate.RootStep() |
| 255 } | 254 } |
| 256 | 255 |
| 257 h, err := p.getStepHandler(step, true) | 256 h, err := p.getStepHandler(step, true) |
| 258 if err != nil { | 257 if err != nil { |
| 259 return err | 258 return err |
| 260 } | 259 } |
| 261 | 260 |
| 262 // Build our output, which will consist of the initial line and any extr
a | 261 // Build our output, which will consist of the initial line and any extr
a |
| 263 // lines that have been registered. | 262 // lines that have been registered. |
| 264 inject := h.flushInjectedLines() | 263 inject := h.flushInjectedLines() |
| 265 » output := make([]string, 1, 1+len(inject)) | 264 » stepOutput := make([]string, 0, 1+len(inject)) |
| 266 » output[0] = line | |
| 267 » output = append(output, inject...) | |
| 268 | 265 |
| 269 » for _, l := range output { | 266 » // If this is an annotation line, write it to our root handler. Otherwis
e, |
| 270 » » // If configured, tee to our tee stream. | 267 » // write it to our step's handler (by appending it to stepOutput). |
| 271 » » if s.Tee != nil && (a == "" || !s.StripAnnotations) { | 268 » if a != "" { |
| 272 » » » // Tee this to the Stream's configured Tee output. | 269 » » // If we're not stripping annotations, emit this to the root han
dler. |
| 273 » » » if err := writeTextLine(s.Tee, l); err != nil { | 270 » » if !s.StripAnnotations { |
| 274 » » » » log.WithError(err).Errorf(p.ctx, "Failed to tee
line.") | 271 » » » // Get our root log stream handler. As an optimization,
if "step" is |
| 272 » » » // the root step, then "h" is already the root handler,
so we don't need |
| 273 » » » // to duplicate the lookup. |
| 274 » » » var rootHandler *stepHandler |
| 275 » » » if rootStep := p.astate.RootStep(); rootStep != step { |
| 276 » » » » rootHandler, err = p.getStepHandler(rootStep, tr
ue) |
| 277 » » » » if err != nil { |
| 278 » » » » » return err |
| 279 » » » » } |
| 280 » » » } else { |
| 281 » » » » rootHandler = h |
| 282 » » » } |
| 283 |
| 284 » » » if err := rootHandler.writeBaseStream(s, line); err != n
il { |
| 285 » » » » log.WithError(err).Errorf(p.ctx, "Failed to send
line to LogDog.") |
| 275 return err | 286 return err |
| 276 } | 287 } |
| 277 } | 288 } |
| 289 } else { |
| 290 stepOutput = append(stepOutput, line) |
| 291 } |
| 278 | 292 |
| 293 // Add any injected lines. |
| 294 stepOutput = append(stepOutput, inject...) |
| 295 |
| 296 for _, l := range stepOutput { |
| 279 // Write to our LogDog stream. | 297 // Write to our LogDog stream. |
| 280 if err := h.writeBaseStream(s, l); err != nil { | 298 if err := h.writeBaseStream(s, l); err != nil { |
| 281 log.WithError(err).Errorf(p.ctx, "Failed to send line to
LogDog.") | 299 log.WithError(err).Errorf(p.ctx, "Failed to send line to
LogDog.") |
| 282 return err | 300 return err |
| 283 } | 301 } |
| 284 } | 302 } |
| 285 | 303 |
| 286 return err | 304 return err |
| 287 } | 305 } |
| 288 | 306 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 308 if err := p.annotationStream.Close(); err != nil { | 326 if err := p.annotationStream.Close(); err != nil { |
| 309 log.WithError(err).Errorf(p.ctx, "Failed to close annota
tion stream.") | 327 log.WithError(err).Errorf(p.ctx, "Failed to close annota
tion stream.") |
| 310 } | 328 } |
| 311 p.annotationStream = nil | 329 p.annotationStream = nil |
| 312 } | 330 } |
| 313 | 331 |
| 314 return p.astate | 332 return p.astate |
| 315 } | 333 } |
| 316 | 334 |
| 317 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan
dler, error) { | 335 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan
dler, error) { |
| 318 » name := step.CanonicalName() | 336 » if h := p.stepHandlers[step]; h != nil { |
| 319 » if h := p.stepHandlers[name]; h != nil { | |
| 320 return h, nil | 337 return h, nil |
| 321 } | 338 } |
| 322 if !create { | 339 if !create { |
| 323 return nil, nil | 340 return nil, nil |
| 324 } | 341 } |
| 325 | 342 |
| 326 h, err := newStepHandler(p, step) | 343 h, err := newStepHandler(p, step) |
| 327 if err != nil { | 344 if err != nil { |
| 328 log.Fields{ | 345 log.Fields{ |
| 329 log.ErrorKey: err, | 346 log.ErrorKey: err, |
| 330 » » » "step": name, | 347 » » » "step": step, |
| 331 }.Errorf(p.ctx, "Failed to create step handler.") | 348 }.Errorf(p.ctx, "Failed to create step handler.") |
| 332 return nil, err | 349 return nil, err |
| 333 } | 350 } |
| 334 » p.stepHandlers[name] = h | 351 » p.stepHandlers[step] = h |
| 335 return h, nil | 352 return h, nil |
| 336 } | 353 } |
| 337 | 354 |
| 338 func (p *Processor) closeStep(step *annotation.Step) { | 355 func (p *Processor) closeStep(step *annotation.Step) { |
| 339 if h, _ := p.getStepHandler(step, false); h != nil { | 356 if h, _ := p.getStepHandler(step, false); h != nil { |
| 340 p.finishStepHandler(h, true) | 357 p.finishStepHandler(h, true) |
| 341 } | 358 } |
| 342 } | 359 } |
| 343 | 360 |
| 344 func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) { | 361 func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) { |
| 345 // Remove this handler from our list. This will stop us from | 362 // Remove this handler from our list. This will stop us from |
| 346 // double-finishing when finish() calls Close(), which calls the StepClo
sed | 363 // double-finishing when finish() calls Close(), which calls the StepClo
sed |
| 347 // callback. | 364 // callback. |
| 348 » delete(p.stepHandlers, h.String()) | 365 » delete(p.stepHandlers, h.step) |
| 349 | 366 |
| 350 // Finish the step. | 367 // Finish the step. |
| 351 h.finish(closeSteps) | 368 h.finish(closeSteps) |
| 352 } | 369 } |
| 353 | 370 |
| 354 func (p *Processor) createStream(name types.StreamName, archetype *streamproto.F
lags) (streamclient.Stream, error) { | 371 func (p *Processor) createStream(name types.StreamName, archetype *streamproto.F
lags) (streamclient.Stream, error) { |
| 355 return p.o.Client.NewStream(streamFlagsFromArchetype(p.ctx, name, archet
ype)) | 372 return p.o.Client.NewStream(streamFlagsFromArchetype(p.ctx, name, archet
ype)) |
| 356 } | 373 } |
| 357 | 374 |
| 358 func (p *Processor) annotationStateUpdated(ut annotation.UpdateType) { | 375 func (p *Processor) annotationStateUpdated(ut annotation.UpdateType) { |
| (...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 478 step *annotation.Step | 495 step *annotation.Step |
| 479 | 496 |
| 480 client streamclient.Client | 497 client streamclient.Client |
| 481 injectedLines []string | 498 injectedLines []string |
| 482 streams map[types.StreamName]streamclient.Stream | 499 streams map[types.StreamName]streamclient.Stream |
| 483 finished bool | 500 finished bool |
| 484 } | 501 } |
| 485 | 502 |
| 486 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { | 503 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { |
| 487 h := stepHandler{ | 504 h := stepHandler{ |
| 488 » » Context: log.SetField(p.ctx, "step", step.CanonicalName()), | 505 » » Context: log.SetField(p.ctx, "step", step), |
| 489 processor: p, | 506 processor: p, |
| 490 step: step, | 507 step: step, |
| 491 | 508 |
| 492 client: p.o.Client, | 509 client: p.o.Client, |
| 493 streams: make(map[types.StreamName]streamclient.Stream), | 510 streams: make(map[types.StreamName]streamclient.Stream), |
| 494 } | 511 } |
| 495 | 512 |
| 496 // Send our initial annotation state. | 513 // Send our initial annotation state. |
| 497 h.updated(annotation.UpdateStructural) | 514 h.updated(annotation.UpdateStructural) |
| 498 | 515 |
| 499 return &h, nil | 516 return &h, nil |
| 500 } | 517 } |
| 501 | 518 |
| 502 func (h *stepHandler) String() string { | 519 func (h *stepHandler) String() string { |
| 503 » return h.step.CanonicalName() | 520 » return h.step.String() |
| 504 } | 521 } |
| 505 | 522 |
| 506 func (h *stepHandler) finish(closeSteps bool) { | 523 func (h *stepHandler) finish(closeSteps bool) { |
| 507 if h.finished { | 524 if h.finished { |
| 508 return | 525 return |
| 509 } | 526 } |
| 510 | 527 |
| 511 if closeSteps { | 528 if closeSteps { |
| 512 h.step.Close(nil) | 529 h.step.Close(nil) |
| 513 } | 530 } |
| 514 | 531 |
| 515 // Close all streams associated with this handler. | 532 // Close all streams associated with this handler. |
| 516 if closeSteps { | 533 if closeSteps { |
| 517 h.closeAllStreams() | 534 h.closeAllStreams() |
| 518 } | 535 } |
| 519 | 536 |
| 520 // Notify that the annotation state has updated (closed). | 537 // Notify that the annotation state has updated (closed). |
| 521 h.processor.annotationStateUpdated(annotation.UpdateStructural) | 538 h.processor.annotationStateUpdated(annotation.UpdateStructural) |
| 522 h.finished = true | 539 h.finished = true |
| 523 } | 540 } |
| 524 | 541 |
| 525 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { | 542 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { |
| 543 // If we're teeing, also write this to our Processor's tee stream. |
| 544 if s.Tee != nil { |
| 545 // Tee this to the Stream's configured Tee output. |
| 546 if err := writeTextLine(s.Tee, line); err != nil { |
| 547 log.WithError(err).Errorf(h, "Failed to tee line.") |
| 548 return err |
| 549 } |
| 550 } |
| 551 |
| 526 name := h.step.BaseStream(s.Name) | 552 name := h.step.BaseStream(s.Name) |
| 527 stream, created, err := h.getStream(name, &textStreamArchetype) | 553 stream, created, err := h.getStream(name, &textStreamArchetype) |
| 528 if err != nil { | 554 if err != nil { |
| 529 return err | 555 return err |
| 530 } | 556 } |
| 531 if created { | 557 if created { |
| 532 switch s.Name { | 558 switch s.Name { |
| 533 case STDOUT: | 559 case STDOUT: |
| 534 if h.step.SetSTDOUTStream(&milo.LogdogStream{Name: strin
g(name)}) { | 560 if h.step.SetSTDOUTStream(&milo.LogdogStream{Name: strin
g(name)}) { |
| 535 h.updated(annotation.UpdateIterative) | 561 h.updated(annotation.UpdateIterative) |
| (...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 662 return "" | 688 return "" |
| 663 } | 689 } |
| 664 return strings.TrimSpace(line[3 : len(line)-3]) | 690 return strings.TrimSpace(line[3 : len(line)-3]) |
| 665 } | 691 } |
| 666 | 692 |
| 667 func buildAnnotation(name string, params ...string) string { | 693 func buildAnnotation(name string, params ...string) string { |
| 668 v := make([]string, 1, 1+len(params)) | 694 v := make([]string, 1, 1+len(params)) |
| 669 v[0] = name | 695 v[0] = name |
| 670 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" | 696 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" |
| 671 } | 697 } |
| OLD | NEW |