Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(467)

Side by Side Diff: logdog/client/annotee/processor.go

Issue 2328023003: Fix Annotee stream name generation. (Closed)
Patch Set: Remove debugging junk. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/client/annotee/annotation/test_expectations/timestamps_steps.proto.txt ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package annotee 5 package annotee
6 6
7 import ( 7 import (
8 "bufio" 8 "bufio"
9 "bytes" 9 "bytes"
10 "fmt" 10 "fmt"
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
109 109
110 // Processor consumes data from a list of Stream entries and interacts with the 110 // Processor consumes data from a list of Stream entries and interacts with the
111 // supplied Client instance. 111 // supplied Client instance.
112 // 112 //
113 // A Processor must be instantiated with New. 113 // A Processor must be instantiated with New.
114 type Processor struct { 114 type Processor struct {
115 ctx context.Context 115 ctx context.Context
116 o *Options 116 o *Options
117 117
118 astate *annotation.State 118 astate *annotation.State
119 » stepHandlers map[string]*stepHandler 119 » stepHandlers map[*annotation.Step]*stepHandler
120 120
121 annotationStream streamclient.Stream 121 annotationStream streamclient.Stream
122 annotationC chan annotationSignal 122 annotationC chan annotationSignal
123 annotationFinishedC chan struct{} 123 annotationFinishedC chan struct{}
124 } 124 }
125 125
126 type annotationSignal struct { 126 type annotationSignal struct {
127 data []byte 127 data []byte
128 updateType annotation.UpdateType 128 updateType annotation.UpdateType
129 } 129 }
130 130
131 // New instantiates a new Processor. 131 // New instantiates a new Processor.
132 func New(c context.Context, o Options) *Processor { 132 func New(c context.Context, o Options) *Processor {
133 p := Processor{ 133 p := Processor{
134 ctx: c, 134 ctx: c,
135 o: &o, 135 o: &o,
136 136
137 » » stepHandlers: make(map[string]*stepHandler), 137 » » stepHandlers: make(map[*annotation.Step]*stepHandler),
138 } 138 }
139 p.astate = &annotation.State{ 139 p.astate = &annotation.State{
140 LogNameBase: o.Base, 140 LogNameBase: o.Base,
141 Callbacks: &annotationCallbacks{&p}, 141 Callbacks: &annotationCallbacks{&p},
142 Execution: o.Execution, 142 Execution: o.Execution,
143 Clock: clock.Get(c), 143 Clock: clock.Get(c),
144 Offline: o.Offline, 144 Offline: o.Offline,
145 } 145 }
146 return &p 146 return &p
147 } 147 }
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
226 log.WithError(err).Errorf(p.ctx, "Failed to initialize.") 226 log.WithError(err).Errorf(p.ctx, "Failed to initialize.")
227 return err 227 return err
228 } 228 }
229 229
230 a := extractAnnotation(line) 230 a := extractAnnotation(line)
231 if a != "" { 231 if a != "" {
232 log.Debugf(p.ctx, "Annotation: %q", a) 232 log.Debugf(p.ctx, "Annotation: %q", a)
233 } 233 }
234 234
235 var step *annotation.Step 235 var step *annotation.Step
236 var h *stepHandler
237 if s.Annotate { 236 if s.Annotate {
238 if a != "" { 237 if a != "" {
239 // Append our annotation to the annotation state. This m ay cause our 238 // Append our annotation to the annotation state. This m ay cause our
240 // annotation callbacks to be invoked. 239 // annotation callbacks to be invoked.
241 if err := p.astate.Append(a); err != nil { 240 if err := p.astate.Append(a); err != nil {
242 log.Fields{ 241 log.Fields{
243 log.ErrorKey: err, 242 log.ErrorKey: err,
244 "stream": s.Name, 243 "stream": s.Name,
245 "annotation": a, 244 "annotation": a,
246 }.Errorf(p.ctx, "Failed to process annotation.") 245 }.Errorf(p.ctx, "Failed to process annotation.")
247 } 246 }
248 } 247 }
249 248
250 // Use the step handler for the current step. 249 // Use the step handler for the current step.
251 step = p.astate.CurrentStep() 250 step = p.astate.CurrentStep()
252 } else { 251 } else {
253 // Not handling annotations. Use our root step handler. 252 // Not handling annotations. Use our root step handler.
254 step = p.astate.RootStep() 253 step = p.astate.RootStep()
255 } 254 }
256 255
257 h, err := p.getStepHandler(step, true) 256 h, err := p.getStepHandler(step, true)
258 if err != nil { 257 if err != nil {
259 return err 258 return err
260 } 259 }
261 260
262 // Build our output, which will consist of the initial line and any extr a 261 // Build our output, which will consist of the initial line and any extr a
263 // lines that have been registered. 262 // lines that have been registered.
264 inject := h.flushInjectedLines() 263 inject := h.flushInjectedLines()
265 » output := make([]string, 1, 1+len(inject)) 264 » stepOutput := make([]string, 0, 1+len(inject))
266 » output[0] = line
267 » output = append(output, inject...)
268 265
269 » for _, l := range output { 266 » // If this is an annotation line, write it to our root handler. Otherwis e,
270 » » // If configured, tee to our tee stream. 267 » // write it to our step's handler (by appending it to stepOutput).
271 » » if s.Tee != nil && (a == "" || !s.StripAnnotations) { 268 » if a != "" {
272 » » » // Tee this to the Stream's configured Tee output. 269 » » // If we're not stripping annotations, emit this to the root han dler.
273 » » » if err := writeTextLine(s.Tee, l); err != nil { 270 » » if !s.StripAnnotations {
274 » » » » log.WithError(err).Errorf(p.ctx, "Failed to tee line.") 271 » » » // Get our root log stream handler. As an optimization, if "step" is
272 » » » // the root step, then "h" is already the root handler, so we don't need
273 » » » // to duplicate the lookup.
274 » » » var rootHandler *stepHandler
275 » » » if rootStep := p.astate.RootStep(); rootStep != step {
276 » » » » rootHandler, err = p.getStepHandler(rootStep, tr ue)
277 » » » » if err != nil {
278 » » » » » return err
279 » » » » }
280 » » » } else {
281 » » » » rootHandler = h
282 » » » }
283
284 » » » if err := rootHandler.writeBaseStream(s, line); err != n il {
285 » » » » log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.")
275 return err 286 return err
276 } 287 }
277 } 288 }
289 } else {
290 stepOutput = append(stepOutput, line)
291 }
278 292
293 // Add any injected lines.
294 stepOutput = append(stepOutput, inject...)
295
296 for _, l := range stepOutput {
279 // Write to our LogDog stream. 297 // Write to our LogDog stream.
280 if err := h.writeBaseStream(s, l); err != nil { 298 if err := h.writeBaseStream(s, l); err != nil {
281 log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.") 299 log.WithError(err).Errorf(p.ctx, "Failed to send line to LogDog.")
282 return err 300 return err
283 } 301 }
284 } 302 }
285 303
286 return err 304 return err
287 } 305 }
288 306
(...skipping 19 matching lines...) Expand all
308 if err := p.annotationStream.Close(); err != nil { 326 if err := p.annotationStream.Close(); err != nil {
309 log.WithError(err).Errorf(p.ctx, "Failed to close annota tion stream.") 327 log.WithError(err).Errorf(p.ctx, "Failed to close annota tion stream.")
310 } 328 }
311 p.annotationStream = nil 329 p.annotationStream = nil
312 } 330 }
313 331
314 return p.astate 332 return p.astate
315 } 333 }
316 334
317 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan dler, error) { 335 func (p *Processor) getStepHandler(step *annotation.Step, create bool) (*stepHan dler, error) {
318 » name := step.CanonicalName() 336 » if h := p.stepHandlers[step]; h != nil {
319 » if h := p.stepHandlers[name]; h != nil {
320 return h, nil 337 return h, nil
321 } 338 }
322 if !create { 339 if !create {
323 return nil, nil 340 return nil, nil
324 } 341 }
325 342
326 h, err := newStepHandler(p, step) 343 h, err := newStepHandler(p, step)
327 if err != nil { 344 if err != nil {
328 log.Fields{ 345 log.Fields{
329 log.ErrorKey: err, 346 log.ErrorKey: err,
330 » » » "step": name, 347 » » » "step": step,
331 }.Errorf(p.ctx, "Failed to create step handler.") 348 }.Errorf(p.ctx, "Failed to create step handler.")
332 return nil, err 349 return nil, err
333 } 350 }
334 » p.stepHandlers[name] = h 351 » p.stepHandlers[step] = h
335 return h, nil 352 return h, nil
336 } 353 }
337 354
338 func (p *Processor) closeStep(step *annotation.Step) { 355 func (p *Processor) closeStep(step *annotation.Step) {
339 if h, _ := p.getStepHandler(step, false); h != nil { 356 if h, _ := p.getStepHandler(step, false); h != nil {
340 p.finishStepHandler(h, true) 357 p.finishStepHandler(h, true)
341 } 358 }
342 } 359 }
343 360
344 func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) { 361 func (p *Processor) finishStepHandler(h *stepHandler, closeSteps bool) {
345 // Remove this handler from our list. This will stop us from 362 // Remove this handler from our list. This will stop us from
346 // double-finishing when finish() calls Close(), which calls the StepClo sed 363 // double-finishing when finish() calls Close(), which calls the StepClo sed
347 // callback. 364 // callback.
348 » delete(p.stepHandlers, h.String()) 365 » delete(p.stepHandlers, h.step)
349 366
350 // Finish the step. 367 // Finish the step.
351 h.finish(closeSteps) 368 h.finish(closeSteps)
352 } 369 }
353 370
354 func (p *Processor) createStream(name types.StreamName, archetype *streamproto.F lags) (streamclient.Stream, error) { 371 func (p *Processor) createStream(name types.StreamName, archetype *streamproto.F lags) (streamclient.Stream, error) {
355 return p.o.Client.NewStream(streamFlagsFromArchetype(p.ctx, name, archet ype)) 372 return p.o.Client.NewStream(streamFlagsFromArchetype(p.ctx, name, archet ype))
356 } 373 }
357 374
358 func (p *Processor) annotationStateUpdated(ut annotation.UpdateType) { 375 func (p *Processor) annotationStateUpdated(ut annotation.UpdateType) {
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
478 step *annotation.Step 495 step *annotation.Step
479 496
480 client streamclient.Client 497 client streamclient.Client
481 injectedLines []string 498 injectedLines []string
482 streams map[types.StreamName]streamclient.Stream 499 streams map[types.StreamName]streamclient.Stream
483 finished bool 500 finished bool
484 } 501 }
485 502
486 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) { 503 func newStepHandler(p *Processor, step *annotation.Step) (*stepHandler, error) {
487 h := stepHandler{ 504 h := stepHandler{
488 » » Context: log.SetField(p.ctx, "step", step.CanonicalName()), 505 » » Context: log.SetField(p.ctx, "step", step),
489 processor: p, 506 processor: p,
490 step: step, 507 step: step,
491 508
492 client: p.o.Client, 509 client: p.o.Client,
493 streams: make(map[types.StreamName]streamclient.Stream), 510 streams: make(map[types.StreamName]streamclient.Stream),
494 } 511 }
495 512
496 // Send our initial annotation state. 513 // Send our initial annotation state.
497 h.updated(annotation.UpdateStructural) 514 h.updated(annotation.UpdateStructural)
498 515
499 return &h, nil 516 return &h, nil
500 } 517 }
501 518
502 func (h *stepHandler) String() string { 519 func (h *stepHandler) String() string {
503 » return h.step.CanonicalName() 520 » return h.step.String()
504 } 521 }
505 522
506 func (h *stepHandler) finish(closeSteps bool) { 523 func (h *stepHandler) finish(closeSteps bool) {
507 if h.finished { 524 if h.finished {
508 return 525 return
509 } 526 }
510 527
511 if closeSteps { 528 if closeSteps {
512 h.step.Close(nil) 529 h.step.Close(nil)
513 } 530 }
514 531
515 // Close all streams associated with this handler. 532 // Close all streams associated with this handler.
516 if closeSteps { 533 if closeSteps {
517 h.closeAllStreams() 534 h.closeAllStreams()
518 } 535 }
519 536
520 // Notify that the annotation state has updated (closed). 537 // Notify that the annotation state has updated (closed).
521 h.processor.annotationStateUpdated(annotation.UpdateStructural) 538 h.processor.annotationStateUpdated(annotation.UpdateStructural)
522 h.finished = true 539 h.finished = true
523 } 540 }
524 541
525 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { 542 func (h *stepHandler) writeBaseStream(s *Stream, line string) error {
543 // If we're teeing, also write this to our Processor's tee stream.
544 if s.Tee != nil {
545 // Tee this to the Stream's configured Tee output.
546 if err := writeTextLine(s.Tee, line); err != nil {
547 log.WithError(err).Errorf(h, "Failed to tee line.")
548 return err
549 }
550 }
551
526 name := h.step.BaseStream(s.Name) 552 name := h.step.BaseStream(s.Name)
527 stream, created, err := h.getStream(name, &textStreamArchetype) 553 stream, created, err := h.getStream(name, &textStreamArchetype)
528 if err != nil { 554 if err != nil {
529 return err 555 return err
530 } 556 }
531 if created { 557 if created {
532 switch s.Name { 558 switch s.Name {
533 case STDOUT: 559 case STDOUT:
534 if h.step.SetSTDOUTStream(&milo.LogdogStream{Name: strin g(name)}) { 560 if h.step.SetSTDOUTStream(&milo.LogdogStream{Name: strin g(name)}) {
535 h.updated(annotation.UpdateIterative) 561 h.updated(annotation.UpdateIterative)
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after
662 return "" 688 return ""
663 } 689 }
664 return strings.TrimSpace(line[3 : len(line)-3]) 690 return strings.TrimSpace(line[3 : len(line)-3])
665 } 691 }
666 692
667 func buildAnnotation(name string, params ...string) string { 693 func buildAnnotation(name string, params ...string) string {
668 v := make([]string, 1, 1+len(params)) 694 v := make([]string, 1, 1+len(params))
669 v[0] = name 695 v[0] = name
670 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" 696 return "@@@" + strings.Join(append(v, params...), "@") + "@@@"
671 } 697 }
OLDNEW
« no previous file with comments | « logdog/client/annotee/annotation/test_expectations/timestamps_steps.proto.txt ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698