Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1663)

Unified Diff: appengine/cmd/milo/logdog/build.go

Issue 2191693003: Milo: Add LogDog annotation stream support. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebarse Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « appengine/cmd/milo/frontend/milo.go ('k') | appengine/cmd/milo/logdog/http.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/cmd/milo/logdog/build.go
diff --git a/appengine/cmd/milo/logdog/build.go b/appengine/cmd/milo/logdog/build.go
new file mode 100644
index 0000000000000000000000000000000000000000..425c0b8f3b8c44840394fce4b0a4f3814f88e199
--- /dev/null
+++ b/appengine/cmd/milo/logdog/build.go
@@ -0,0 +1,363 @@
+// Copyright 2016 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package logdog
+
+import (
+ "errors"
+ "fmt"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/luci/luci-go/appengine/cmd/milo/logdog/internal"
+ "github.com/luci/luci-go/appengine/cmd/milo/miloerror"
+ "github.com/luci/luci-go/appengine/cmd/milo/resp"
+ "github.com/luci/luci-go/common/config"
+ "github.com/luci/luci-go/common/grpcutil"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
+ miloProto "github.com/luci/luci-go/common/proto/milo"
+ "github.com/luci/luci-go/common/prpc"
+ "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
+ "github.com/luci/luci-go/logdog/api/logpb"
+ "github.com/luci/luci-go/logdog/common/types"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/luci/gae/service/memcache"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+)
+
+const (
+ // intermediateCacheLifetime is the amount of time to cache intermediate (non-
+ // terminal) annotation streams. Terminal annotation streams are cached
+ // indefinitely.
+ intermediateCacheLifetime = 10 * time.Second
+
+ // defaultLogDogHost is the default LogDog host, if one isn't specified via
+ // query string.
+ defaultLogDogHost = "luci-logdog.appspot.com"
+)
+
+type annotationStreamRequest struct {
+ *AnnotationStream
+
+ // host is the name of the LogDog host.
+ host string
+
+ project config.ProjectName
+ path types.StreamPath
+
+ // logDogClient is the HTTP client to use for LogDog communication.
+ logDogClient http.Client
+
+ // cs is the unmarshalled annotation stream Step and associated data.
+ cs internal.CachedStep
+}
+
+func (as *annotationStreamRequest) normalize() error {
+ if err := as.project.Validate(); err != nil {
+ return &miloerror.Error{
+ Message: "Invalid project name",
+ Code: http.StatusBadRequest,
+ }
+ }
+
+ if err := as.path.Validate(); err != nil {
+ return &miloerror.Error{
+ Message: fmt.Sprintf("Invalid log stream path %q: %s", as.path, err),
+ Code: http.StatusBadRequest,
+ }
+ }
+
+ // Get the host. We normalize it to lowercase and trim spaces since we use
+ // it as a memcache key.
+ as.host = strings.ToLower(strings.TrimSpace(as.host))
+ if as.host == "" {
+ as.host = defaultLogDogHost
+ }
+ if strings.ContainsRune(as.host, '/') {
+ return errors.New("invalid host name")
+ }
+
+ return nil
+}
+
+func (as *annotationStreamRequest) memcacheKey() string {
+ return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path)
+}
+
+func (as *annotationStreamRequest) load(c context.Context) error {
+ // Load from memcache, if possible. If an error occurs, we will proceed as if
+ // no CachedStep was available.
+ mcKey := as.memcacheKey()
+ mcItem, err := memcache.Get(c).Get(mcKey)
+ switch err {
+ case nil:
+ if err := proto.Unmarshal(mcItem.Value(), &as.cs); err == nil {
+ return nil
+ }
+
+ // We could not unmarshal the cached value. Try and delete it from
+ // memcache, since it's invalid.
+ log.Fields{
+ log.ErrorKey: err,
+ "memcacheKey": mcKey,
+ }.Warningf(c, "Failed to unmarshal cached annotation protobuf.")
+ if err := memcache.Get(c).Delete(mcKey); err != nil {
+ log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.")
+ }
+
+ case memcache.ErrCacheMiss:
+ break
+
+ default:
+ log.Fields{
+ log.ErrorKey: err,
+ "memcacheKey": mcKey,
+ }.Errorf(c, "Failed to load annotation protobuf memcache cached step.")
+ }
+
+ // Load from LogDog directly.
+ client := logdog.NewLogsPRPCClient(&prpc.Client{
+ C: &as.logDogClient,
+ Host: as.host,
+ })
+
+ log.Fields{
+ "project": as.project,
+ "path": as.path,
+ "host": as.host,
+ }.Infof(c, "Making tail request to LogDog to fetch annotation stream.")
+ resp, err := client.Tail(c, &logdog.TailRequest{
+ Project: string(as.project),
+ Path: string(as.path),
+ State: true,
+ })
+ switch code := grpcutil.Code(err); code {
+ case codes.OK:
+ break
+
+ case codes.NotFound:
+ return &miloerror.Error{
+ Message: "Stream not found",
+ Code: http.StatusNotFound,
+ }
+
+ default:
+ // TODO: Once we switch to delegation tokens and are making the request on
+ // behalf of a user rather than the Milo service, handle PermissionDenied.
+ log.Fields{
+ log.ErrorKey: err,
+ "code": code,
+ }.Errorf(c, "Failed to load LogDog annotation stream.")
+ return &miloerror.Error{
+ Message: "Failed to load stream",
+ Code: http.StatusInternalServerError,
+ }
+ }
+
+ // Make sure that this is an annotation stream.
+ switch {
+ case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
+ return &miloerror.Error{
+ Message: "Requested stream is not a Milo annotation protobuf",
+ Code: http.StatusBadRequest,
+ }
+
+ case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
+ return &miloerror.Error{
+ Message: "Requested stream is not a datagram stream",
+ Code: http.StatusBadRequest,
+ }
+
+ case len(resp.Logs) == 0:
+ // No annotation stream data, so render a minimal page.
+ return nil
+ }
+
+ // Get the last log entry in the stream. In reality, this will be index 0,
+ // since the "Tail" call should only return one log entry.
+ latestStream := resp.Logs[len(resp.Logs)-1]
+ dg := latestStream.GetDatagram()
+ switch {
+ case dg == nil:
+ return &miloerror.Error{
+ Message: "Datagram stream does not have datagram data",
+ Code: http.StatusInternalServerError,
+ }
+
+ case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
+ // LogDog splits large datagrams into consecutive fragments. If the
+ // annotation state is fragmented, a reconstruction algorithm will have to
+ // be employed here to build the full datagram before processing.
+ //
+ // At the moment, no annotation streams are expected to be anywhere close to
+ // this large, so we're going to handle this case by erroring. A
+ // reconstruction algorithm would look like:
+ // 1) "Tail" to get the latest datagram, identify it as partial.
+ // 1a) Perform a bounds check on the total datagram size to ensure that it
+ // can be safely reconstructed.
+ // 2) Determine if it's the last partial index. If not, then the latest
+ // datagram is incomplete. Determine our initial datagram's stream index
+ // the by subtracting the partial index from this message's stream index.
+ // 2a) If this datagram index is "0", the first datagram in the stream is
+ // partial and all of the data isn't here, so treat this as "no data".
+ // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus
+ // one to get the previous datagram.
+ // 3) Issue a "Get" request for our initial datagram index through the index
+ // preceding ours.
+ // 4) Reassemble the binary data from the full set of datagrams.
+ return &miloerror.Error{
+ Message: "Partial datagram streams are not supported yet",
+ Code: http.StatusNotImplemented,
+ }
+ }
+
+ // Attempt to decode the Step protobuf.
+ var step miloProto.Step
+ if err := proto.Unmarshal(dg.Data, &step); err != nil {
+ return &miloerror.Error{
+ Message: "Failed to unmarshal annotation protobuf",
+ Code: http.StatusInternalServerError,
+ }
+ }
+
+ var latestEndedTime time.Time
+ for _, sub := range step.Substep {
+ switch t := sub.Substep.(type) {
+ case *miloProto.Step_Substep_AnnotationStream:
+ // TODO(hinoka,dnj): Implement recursive / embedded substream fetching if
+ // specified.
+ log.Warningf(c, "Annotation stream links LogDog substream [%+v], not supported!", t.AnnotationStream)
+
+ case *miloProto.Step_Substep_Step:
+ endedTime := t.Step.Ended.Time()
+ if t.Step.Ended != nil && endedTime.After(latestEndedTime) {
+ latestEndedTime = endedTime
+ }
+ }
+ }
+ if latestEndedTime.IsZero() {
+ // No substep had an ended time :(
+ latestEndedTime = step.Started.Time()
+ }
+
+ // Build our CachedStep.
+ as.cs = internal.CachedStep{
+ Step: &step,
+ Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamIndex == uint64(resp.State.TerminalIndex)),
+ }
+
+ // Annotee is apparently not putting an ended time on some annotation protos.
+ // This hack will ensure that a finished build will always have an ended time.
+ if as.cs.Finished && as.cs.Step.Ended == nil {
+ as.cs.Step.Ended = google.NewTimestamp(latestEndedTime)
+ }
+
+ // Marshal and cache the step. If this is the final protobuf in the stream,
+ // cache it indefinitely; otherwise, cache it for intermediateCacheLifetime.
+ //
+ // If this fails, it is non-fatal.
+ mcData, err := proto.Marshal(&as.cs)
+ if err == nil {
+ mcItem = memcache.Get(c).NewItem(mcKey)
+ if !as.cs.Finished {
+ mcItem.SetExpiration(intermediateCacheLifetime)
+ }
+ mcItem.SetValue(mcData)
+ if err := memcache.Get(c).Set(mcItem); err != nil {
+ log.WithError(err).Warningf(c, "Failed to cache annotation protobuf CachedStep.")
+ }
+ } else {
+ log.WithError(err).Warningf(c, "Failed to marshal annotation protobuf CachedStep.")
+ }
+
+ return nil
+}
+
+func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuild {
+ prefix, name := as.path.Split()
+
+ // Prepare a Streams object with only one stream.
+ streams := Streams{
+ MainStream: &Stream{
+ Server: as.host,
+ Prefix: string(prefix),
+ Path: string(name),
+ IsDatagram: true,
+ Data: as.cs.Step,
+ Closed: as.cs.Finished,
+ },
+ }
+
+ var (
+ build resp.MiloBuild
+ ub = logDogURLBuilder{
+ project: as.project,
+ host: as.host,
+ prefix: prefix,
+ }
+ )
+ AddLogDogToBuild(c, &ub, &streams, &build)
+ return &build
+}
+
+type logDogURLBuilder struct {
+ host string
+ prefix types.StreamName
+ project config.ProjectName
+}
+
+func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
+ switch t := l.Value.(type) {
+ case *miloProto.Link_LogdogStream:
+ ls := t.LogdogStream
+
+ server := ls.Server
+ if server == "" {
+ server = b.host
+ }
+
+ prefix := types.StreamName(ls.Prefix)
+ if prefix == "" {
+ prefix = b.prefix
+ }
+
+ path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.StreamName(ls.Name)))
+ u := url.URL{
+ Scheme: "https",
+ Host: server,
+ Path: "v/",
+ RawQuery: url.Values{
+ "s": []string{string(path)},
+ }.Encode(),
+ }
+
+ link := resp.Link{
+ Label: l.Label,
+ URL: u.String(),
+ }
+ if link.Label == "" {
+ link.Label = ls.Name
+ }
+ return &link
+
+ case *miloProto.Link_Url:
+ link := resp.Link{
+ Label: l.Label,
+ URL: t.Url,
+ }
+ if link.Label == "" {
+ link.Label = "unnamed"
+ }
+ return &link
+
+ default:
+ // Don't know how to render.
+ return nil
+ }
+}
« no previous file with comments | « appengine/cmd/milo/frontend/milo.go ('k') | appengine/cmd/milo/logdog/http.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698