Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(163)

Side by Side Diff: client/logdog/annotee/processor.go

Issue 1916813002: Annotee: Add project name support. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-collector-butler
Patch Set: Better URL generation. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « client/cmd/logdog_annotee/main.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package annotee 5 package annotee
6 6
7 import ( 7 import (
8 "bufio" 8 "bufio"
9 "bytes" 9 "bytes"
10 "fmt" 10 "fmt"
11 "io" 11 "io"
12 "net/url"
13 "strings" 12 "strings"
14 "sync" 13 "sync"
15 "time" 14 "time"
16 15
17 "github.com/golang/protobuf/proto" 16 "github.com/golang/protobuf/proto"
18 "github.com/luci/luci-go/client/logdog/annotee/annotation" 17 "github.com/luci/luci-go/client/logdog/annotee/annotation"
19 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient" 18 "github.com/luci/luci-go/client/logdog/butlerlib/streamclient"
20 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto" 19 "github.com/luci/luci-go/client/logdog/butlerlib/streamproto"
21 "github.com/luci/luci-go/common/clock" 20 "github.com/luci/luci-go/common/clock"
22 "github.com/luci/luci-go/common/clock/clockflag" 21 "github.com/luci/luci-go/common/clock/clockflag"
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
63 Annotate bool 62 Annotate bool
64 // StripAnnotations, if true, causes all encountered annotations to be 63 // StripAnnotations, if true, causes all encountered annotations to be
65 // stripped from incoming stream data. 64 // stripped from incoming stream data.
66 StripAnnotations bool 65 StripAnnotations bool
67 66
68 // BufferSize is the size of the read buffer that will be used when proc essing 67 // BufferSize is the size of the read buffer that will be used when proc essing
69 // this stream's data. 68 // this stream's data.
70 BufferSize int 69 BufferSize int
71 } 70 }
72 71
72 // LinkGenerator generates links for a given log stream.
73 type LinkGenerator interface {
74 // GetLink returns a link for the specified aggregate streams.
75 //
76 // If no link could be generated, GetLink may return an empty string.
77 GetLink(name ...types.StreamName) string
78 }
79
73 // Options are the configuration options for a Processor. 80 // Options are the configuration options for a Processor.
74 type Options struct { 81 type Options struct {
75 // Base is the base log stream name. This is prepended to every log name , as 82 // Base is the base log stream name. This is prepended to every log name , as
76 // well as any generate log names. 83 // well as any generate log names.
77 Base types.StreamName 84 Base types.StreamName
78 85
79 » // Prefix is the log stream prefix. If this is empty, no log stream link s will 86 » // LinkGenerator generates links to alias for a given log stream.
80 » // be generated. 87 » //
81 » Prefix types.StreamName 88 » // If nil, no link annotations will be injected.
82 89 » LinkGenerator LinkGenerator
83 » // LogDogHost is the host name of the LogDog Coordinator instance that t his
84 » // stream will be published to. If not empty, additional links will be
85 » // injected into the annotation stream to link to the generated LogDog l ogs.
86 » LogDogHost string
87 90
88 // Client is the LogDog Butler Client to use for stream creation. 91 // Client is the LogDog Butler Client to use for stream creation.
89 Client streamclient.Client 92 Client streamclient.Client
90 93
91 // Execution describes the current applicaton's execution parameters. Th is 94 // Execution describes the current applicaton's execution parameters. Th is
92 // will be used to construct annotation state. 95 // will be used to construct annotation state.
93 Execution *annotation.Execution 96 Execution *annotation.Execution
94 97
95 // MetadataUpdateInterval is the amount of time to wait after stream met adata 98 // MetadataUpdateInterval is the amount of time to wait after stream met adata
96 // updates to push the updated metadata protobuf to the metadata stream. 99 // updates to push the updated metadata protobuf to the metadata stream.
(...skipping 186 matching lines...) Expand 10 before | Expand all | Expand 10 after
283 func (p *Processor) closeStepHandler(h *stepHandler) { 286 func (p *Processor) closeStepHandler(h *stepHandler) {
284 // Remove this handler from our list. This will stop us from 287 // Remove this handler from our list. This will stop us from
285 // double-finishing when finish() calls Close(), which calls the StepClo sed 288 // double-finishing when finish() calls Close(), which calls the StepClo sed
286 // callback. 289 // callback.
287 delete(p.stepHandlers, h.String()) 290 delete(p.stepHandlers, h.String())
288 291
289 // Finish the step. 292 // Finish the step.
290 h.finish() 293 h.finish()
291 } 294 }
292 295
293 // coordinatorLink returns a link to the rendered log stream in the Coordinator.
294 // If no Coordinator host is configured, this will return an empty string.
295 func (p *Processor) coordinatorLink(name ...types.StreamName) string {
296 if p.o.LogDogHost == "" || p.o.Prefix == "" {
297 return ""
298 }
299
300 links := make([]string, len(name))
301 for i, n := range name {
302 links[i] = fmt.Sprintf("s=%s", url.QueryEscape(string(p.o.Prefix .Join(n))))
303 }
304 return fmt.Sprintf("https://%s.appspot.com/v/?%s", p.o.LogDogHost, strin gs.Join(links, "&"))
305 }
306
307 type annotationCallbacks struct { 296 type annotationCallbacks struct {
308 *Processor 297 *Processor
309 } 298 }
310 299
311 func (c *annotationCallbacks) StepClosed(step *annotation.Step) { 300 func (c *annotationCallbacks) StepClosed(step *annotation.Step) {
312 c.closeStep(step) 301 c.closeStep(step)
313 } 302 }
314 303
315 func (c *annotationCallbacks) Updated(step *annotation.Step) { 304 func (c *annotationCallbacks) Updated(step *annotation.Step) {
316 if h, _ := c.getStepHandler(step, false); h != nil { 305 if h, _ := c.getStepHandler(step, false); h != nil {
(...skipping 10 matching lines...) Expand all
327 s, created, err := h.getStream(name, &textStreamArchetype) 316 s, created, err := h.getStream(name, &textStreamArchetype)
328 if err != nil { 317 if err != nil {
329 log.Fields{ 318 log.Fields{
330 log.ErrorKey: err, 319 log.ErrorKey: err,
331 "step": h, 320 "step": h,
332 "stream": name, 321 "stream": name,
333 }.Errorf(c.ctx, "Failed to get log substream.") 322 }.Errorf(c.ctx, "Failed to get log substream.")
334 return 323 return
335 } 324 }
336 if created { 325 if created {
337 » » h.maybeInjectCoordinatorLink(label, "logdog", name) 326 » » h.maybeInjectLink(label, "logdog", name)
338 } 327 }
339 328
340 if err := writeTextLine(s, line); err != nil { 329 if err := writeTextLine(s, line); err != nil {
341 log.Fields{ 330 log.Fields{
342 log.ErrorKey: err, 331 log.ErrorKey: err,
343 "stream": name, 332 "stream": name,
344 }.Errorf(c.ctx, "Failed to export log line.") 333 }.Errorf(c.ctx, "Failed to export log line.")
345 } 334 }
346 } 335 }
347 336
(...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after
484 } 473 }
485 474
486 func (h *stepHandler) writeBaseStream(s *Stream, line string) error { 475 func (h *stepHandler) writeBaseStream(s *Stream, line string) error {
487 name := h.step.BaseStream(s.Name) 476 name := h.step.BaseStream(s.Name)
488 stream, created, err := h.getStream(name, &textStreamArchetype) 477 stream, created, err := h.getStream(name, &textStreamArchetype)
489 if err != nil { 478 if err != nil {
490 return err 479 return err
491 } 480 }
492 if created { 481 if created {
493 segs := s.Name.Segments() 482 segs := s.Name.Segments()
494 » » h.maybeInjectCoordinatorLink("stdio", segs[len(segs)-1], name) 483 » » h.maybeInjectLink("stdio", segs[len(segs)-1], name)
495 } 484 }
496 return writeTextLine(stream, line) 485 return writeTextLine(stream, line)
497 } 486 }
498 487
499 func (h *stepHandler) updated() { 488 func (h *stepHandler) updated() {
500 // Ignore updates after the step has closed. 489 // Ignore updates after the step has closed.
501 if h.closed { 490 if h.closed {
502 return 491 return
503 } 492 }
504 493
(...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after
573 return nil 562 return nil
574 } 563 }
575 564
576 lines := make([]string, len(h.injectedLines)) 565 lines := make([]string, len(h.injectedLines))
577 copy(lines, h.injectedLines) 566 copy(lines, h.injectedLines)
578 h.injectedLines = h.injectedLines[:0] 567 h.injectedLines = h.injectedLines[:0]
579 568
580 return lines 569 return lines
581 } 570 }
582 571
583 func (h *stepHandler) injectAliasAnnotation(base, text, url string) { 572 func (h *stepHandler) maybeInjectLink(base, text string, names ...types.StreamNa me) {
584 » h.injectLines(buildAnnotation("STEP_LINK", fmt.Sprintf("%s-->%s", text, base), url)) 573 » if lg := h.processor.o.LinkGenerator; lg != nil {
585 } 574 » » if link := lg.GetLink(names...); link != "" {
586 575 » » » h.injectLines(buildAnnotation("STEP_LINK", fmt.Sprintf(" %s-->%s", text, base), link))
587 func (h *stepHandler) maybeInjectCoordinatorLink(base, text string, names ...typ es.StreamName) { 576 » » }
588 » url := h.processor.coordinatorLink(names...)
589 » if url == "" {
590 » » return
591 } 577 }
592 h.injectAliasAnnotation(base, text, url)
593 } 578 }
594 579
595 // lineReader reads from an input stream and returns the data line-by-line. 580 // lineReader reads from an input stream and returns the data line-by-line.
596 // 581 //
597 // We don't use a Scanner because we want to be able to handle lines that may 582 // We don't use a Scanner because we want to be able to handle lines that may
598 // exceed the buffer length. We don't use ReadBytes here because we need to 583 // exceed the buffer length. We don't use ReadBytes here because we need to
599 // capture the last line in the stream, even if it doesn't end with a newline. 584 // capture the last line in the stream, even if it doesn't end with a newline.
600 type lineReader struct { 585 type lineReader struct {
601 r *bufio.Reader 586 r *bufio.Reader
602 buf bytes.Buffer 587 buf bytes.Buffer
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
634 return "" 619 return ""
635 } 620 }
636 return strings.TrimSpace(line[3 : len(line)-3]) 621 return strings.TrimSpace(line[3 : len(line)-3])
637 } 622 }
638 623
639 func buildAnnotation(name string, params ...string) string { 624 func buildAnnotation(name string, params ...string) string {
640 v := make([]string, 1, 1+len(params)) 625 v := make([]string, 1, 1+len(params))
641 v[0] = name 626 v[0] = name
642 return "@@@" + strings.Join(append(v, params...), "@") + "@@@" 627 return "@@@" + strings.Join(append(v, params...), "@") + "@@@"
643 } 628 }
OLDNEW
« no previous file with comments | « client/cmd/logdog_annotee/main.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698