OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package annotee | 5 package annotee |
6 | 6 |
7 import ( | 7 import ( |
8 "bufio" | 8 "bufio" |
9 "bytes" | 9 "bytes" |
10 "fmt" | 10 "fmt" |
11 "io" | 11 "io" |
12 "net/url" | |
13 "strings" | 12 "strings" |
14 "sync" | 13 "sync" |
15 "time" | 14 "time" |
16 | 15 |
17 "github.com/golang/protobuf/proto" | 16 "github.com/golang/protobuf/proto" |
18 "github.com/luci/luci-go/client/logdog/annotee/annotation" | 17 "github.com/luci/luci-go/client/logdog/annotee/annotation" |
19 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient" | 18 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient" |
20 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" | 19 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" |
21 "github.com/luci/luci-go/common/clock" | 20 "github.com/luci/luci-go/common/clock" |
22 "github.com/luci/luci-go/common/clock/clockflag" | 21 "github.com/luci/luci-go/common/clock/clockflag" |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
63 Annotate bool | 62 Annotate bool |
64 // StripAnnotations, if true, causes all encountered annotations to be | 63 // StripAnnotations, if true, causes all encountered annotations to be |
65 // stripped from incoming stream data. | 64 // stripped from incoming stream data. |
66 StripAnnotations bool | 65 StripAnnotations bool |
67 | 66 |
68 // BufferSize is the size of the read buffer that will be used when proc
essing | 67 // BufferSize is the size of the read buffer that will be used when proc
essing |
69 // this stream's data. | 68 // this stream's data. |
70 BufferSize int | 69 BufferSize int |
71 } | 70 } |
72 | 71 |
| 72 // LinkGenerator generates links for a given log stream. |
| 73 type LinkGenerator interface { |
| 74 // GetLink returns a link for the specified aggregate streams. |
| 75 // |
| 76 // If no link could be generated, GetLink may return an empty string. |
| 77 GetLink(name ...types.StreamName) string |
| 78 } |
| 79 |
73 // Options are the configuration options for a Processor. | 80 // Options are the configuration options for a Processor. |
74 type Options struct { | 81 type Options struct { |
75 // Base is the base log stream name. This is prepended to every log name
, as | 82 // Base is the base log stream name. This is prepended to every log name
, as |
76 // well as any generate log names. | 83 // well as any generate log names. |
77 Base types.StreamName | 84 Base types.StreamName |
78 | 85 |
79 » // Prefix is the log stream prefix. If this is empty, no log stream link
s will | 86 » // LinkGenerator generates links to alias for a given log stream. |
80 » // be generated. | 87 » // |
81 » Prefix types.StreamName | 88 » // If nil, no link annotations will be injected. |
82 | 89 » LinkGenerator LinkGenerator |
83 » // LogDogHost is the host name of the LogDog Coordinator instance that t
his | |
84 » // stream will be published to. If not empty, additional links will be | |
85 » // injected into the annotation stream to link to the generated LogDog l
ogs. | |
86 » LogDogHost string | |
87 | 90 |
88 // Client is the LogDog Butler Client to use for stream creation. | 91 // Client is the LogDog Butler Client to use for stream creation. |
89 Client streamclient.Client | 92 Client streamclient.Client |
90 | 93 |
91 // Execution describes the current applicaton's execution parameters. Th
is | 94 // Execution describes the current applicaton's execution parameters. Th
is |
92 // will be used to construct annotation state. | 95 // will be used to construct annotation state. |
93 Execution *annotation.Execution | 96 Execution *annotation.Execution |
94 | 97 |
95 // MetadataUpdateInterval is the amount of time to wait after stream met
adata | 98 // MetadataUpdateInterval is the amount of time to wait after stream met
adata |
96 // updates to push the updated metadata protobuf to the metadata stream. | 99 // updates to push the updated metadata protobuf to the metadata stream. |
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
283 func (p *Processor) closeStepHandler(h *stepHandler) { | 286 func (p *Processor) closeStepHandler(h *stepHandler) { |
284 // Remove this handler from our list. This will stop us from | 287 // Remove this handler from our list. This will stop us from |
285 // double-finishing when finish() calls Close(), which calls the StepClo
sed | 288 // double-finishing when finish() calls Close(), which calls the StepClo
sed |
286 // callback. | 289 // callback. |
287 delete(p.stepHandlers, h.String()) | 290 delete(p.stepHandlers, h.String()) |
288 | 291 |
289 // Finish the step. | 292 // Finish the step. |
290 h.finish() | 293 h.finish() |
291 } | 294 } |
292 | 295 |
293 // coordinatorLink returns a link to the rendered log stream in the Coordinator. | |
294 // If no Coordinator host is configured, this will return an empty string. | |
295 func (p *Processor) coordinatorLink(name ...types.StreamName) string { | |
296 if p.o.LogDogHost == "" || p.o.Prefix == "" { | |
297 return "" | |
298 } | |
299 | |
300 links := make([]string, len(name)) | |
301 for i, n := range name { | |
302 links[i] = fmt.Sprintf("s=%s", url.QueryEscape(string(p.o.Prefix
.Join(n)))) | |
303 } | |
304 return fmt.Sprintf("https://%s.appspot.com/v/?%s", p.o.LogDogHost, strin
gs.Join(links, "&")) | |
305 } | |
306 | |
307 type annotationCallbacks struct { | 296 type annotationCallbacks struct { |
308 *Processor | 297 *Processor |
309 } | 298 } |
310 | 299 |
311 func (c *annotationCallbacks) StepClosed(step *annotation.Step) { | 300 func (c *annotationCallbacks) StepClosed(step *annotation.Step) { |
312 c.closeStep(step) | 301 c.closeStep(step) |
313 } | 302 } |
314 | 303 |
315 func (c *annotationCallbacks) Updated(step *annotation.Step) { | 304 func (c *annotationCallbacks) Updated(step *annotation.Step) { |
316 if h, _ := c.getStepHandler(step, false); h != nil { | 305 if h, _ := c.getStepHandler(step, false); h != nil { |
(...skipping 10 matching lines...) Expand all Loading... |
327 s, created, err := h.getStream(name, &textStreamArchetype) | 316 s, created, err := h.getStream(name, &textStreamArchetype) |
328 if err != nil { | 317 if err != nil { |
329 log.Fields{ | 318 log.Fields{ |
330 log.ErrorKey: err, | 319 log.ErrorKey: err, |
331 "step": h, | 320 "step": h, |
332 "stream": name, | 321 "stream": name, |
333 }.Errorf(c.ctx, "Failed to get log substream.") | 322 }.Errorf(c.ctx, "Failed to get log substream.") |
334 return | 323 return |
335 } | 324 } |
336 if created { | 325 if created { |
337 » » h.maybeInjectCoordinatorLink(label, "logdog", name) | 326 » » h.maybeInjectLink(label, "logdog", name) |
338 } | 327 } |
339 | 328 |
340 if err := writeTextLine(s, line); err != nil { | 329 if err := writeTextLine(s, line); err != nil { |
341 log.Fields{ | 330 log.Fields{ |
342 log.ErrorKey: err, | 331 log.ErrorKey: err, |
343 "stream": name, | 332 "stream": name, |
344 }.Errorf(c.ctx, "Failed to export log line.") | 333 }.Errorf(c.ctx, "Failed to export log line.") |
345 } | 334 } |
346 } | 335 } |
347 | 336 |
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
484 } | 473 } |
485 | 474 |
486 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { | 475 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { |
487 name := h.step.BaseStream(s.Name) | 476 name := h.step.BaseStream(s.Name) |
488 stream, created, err := h.getStream(name, &textStreamArchetype) | 477 stream, created, err := h.getStream(name, &textStreamArchetype) |
489 if err != nil { | 478 if err != nil { |
490 return err | 479 return err |
491 } | 480 } |
492 if created { | 481 if created { |
493 segs := s.Name.Segments() | 482 segs := s.Name.Segments() |
494 » » h.maybeInjectCoordinatorLink("stdio", segs[len(segs)-1], name) | 483 » » h.maybeInjectLink("stdio", segs[len(segs)-1], name) |
495 } | 484 } |
496 return writeTextLine(stream, line) | 485 return writeTextLine(stream, line) |
497 } | 486 } |
498 | 487 |
499 func (h *stepHandler) updated() { | 488 func (h *stepHandler) updated() { |
500 // Ignore updates after the step has closed. | 489 // Ignore updates after the step has closed. |
501 if h.closed { | 490 if h.closed { |
502 return | 491 return |
503 } | 492 } |
504 | 493 |
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
573 return nil | 562 return nil |
574 } | 563 } |
575 | 564 |
576 lines := make([]string, len(h.injectedLines)) | 565 lines := make([]string, len(h.injectedLines)) |
577 copy(lines, h.injectedLines) | 566 copy(lines, h.injectedLines) |
578 h.injectedLines = h.injectedLines[:0] | 567 h.injectedLines = h.injectedLines[:0] |
579 | 568 |
580 return lines | 569 return lines |
581 } | 570 } |
582 | 571 |
583 func (h *stepHandler) injectAliasAnnotation(base, text, url string) { | 572 func (h *stepHandler) maybeInjectLink(base, text string, names ...types.StreamNa
me) { |
584 » h.injectLines(buildAnnotation("STEP_LINK", fmt.Sprintf("%s-->%s", text,
base), url)) | 573 » if lg := h.processor.o.LinkGenerator; lg != nil { |
585 } | 574 » » if link := lg.GetLink(names...); link != "" { |
586 | 575 » » » h.injectLines(buildAnnotation("STEP_LINK", fmt.Sprintf("
%s-->%s", text, base), link)) |
587 func (h *stepHandler) maybeInjectCoordinatorLink(base, text string, names ...typ
es.StreamName) { | 576 » » } |
588 » url := h.processor.coordinatorLink(names...) | |
589 » if url == "" { | |
590 » » return | |
591 } | 577 } |
592 h.injectAliasAnnotation(base, text, url) | |
593 } | 578 } |
594 | 579 |
595 // lineReader reads from an input stream and returns the data line-by-line. | 580 // lineReader reads from an input stream and returns the data line-by-line. |
596 // | 581 // |
597 // We don't use a Scanner because we want to be able to handle lines that may | 582 // We don't use a Scanner because we want to be able to handle lines that may |
598 // exceed the buffer length. We don't use ReadBytes here because we need to | 583 // exceed the buffer length. We don't use ReadBytes here because we need to |
599 // capture the last line in the stream, even if it doesn't end with a newline. | 584 // capture the last line in the stream, even if it doesn't end with a newline. |
600 type lineReader struct { | 585 type lineReader struct { |
601 r *bufio.Reader | 586 r *bufio.Reader |
602 buf bytes.Buffer | 587 buf bytes.Buffer |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
634 return "" | 619 return "" |
635 } | 620 } |
636 return strings.TrimSpace(line[3 : len(line)-3]) | 621 return strings.TrimSpace(line[3 : len(line)-3]) |
637 } | 622 } |
638 | 623 |
639 func buildAnnotation(name string, params ...string) string { | 624 func buildAnnotation(name string, params ...string) string { |
640 v := make([]string, 1, 1+len(params)) | 625 v := make([]string, 1, 1+len(params)) |
641 v[0] = name | 626 v[0] = name |
642 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" | 627 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" |
643 } | 628 } |
OLD | NEW |