Chromium Code Reviews| Index: appengine/cmd/milo/logdog/handler.go |
| diff --git a/appengine/cmd/milo/logdog/handler.go b/appengine/cmd/milo/logdog/handler.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..c581a76de19fc430f0ff7f43db1c5e86d49d56f6 |
| --- /dev/null |
| +++ b/appengine/cmd/milo/logdog/handler.go |
| @@ -0,0 +1,378 @@ |
| +// Copyright 2016 The LUCI Authors. All rights reserved. |
|
Ryan Tseng
2016/07/29 18:16:38
rename this build.go, to match up with the other f
dnj
2016/07/29 19:57:34
Done.
|
| +// Use of this source code is governed under the Apache License, Version 2.0 |
| +// that can be found in the LICENSE file. |
| + |
| +package logdog |
| + |
| +import ( |
| + "fmt" |
| + "net/http" |
| + "net/url" |
| + "strings" |
| + "time" |
| + |
| + "github.com/luci/luci-go/appengine/cmd/milo/miloerror" |
| + "github.com/luci/luci-go/appengine/cmd/milo/resp" |
| + "github.com/luci/luci-go/appengine/cmd/milo/settings" |
| + authClient "github.com/luci/luci-go/appengine/gaeauth/client" |
| + "github.com/luci/luci-go/common/config" |
| + "github.com/luci/luci-go/common/grpcutil" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/proto/google" |
| + miloProto "github.com/luci/luci-go/common/proto/milo" |
| + "github.com/luci/luci-go/common/prpc" |
| + "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| + "github.com/luci/luci-go/logdog/api/logpb" |
| + "github.com/luci/luci-go/logdog/common/types" |
| + "github.com/luci/luci-go/server/templates" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/julienschmidt/httprouter" |
| + "github.com/luci/gae/service/memcache" |
| + "golang.org/x/net/context" |
| + "google.golang.org/grpc/codes" |
| +) |
| + |
| +const ( |
| + // intermediateCacheLifetime is the amount of time to cache intermediate (non- |
| + // terminal) annotation streams. Terminal annotation streams are cached |
| + // indefinitely. |
| + intermediateCacheLifetime = 10 * time.Second |
|
nodir
2016/07/29 18:42:06
this is a bit of regression because with swarming
dnj
2016/07/29 19:57:35
I think the caching is a good idea. I don't see 10
|
| + |
| + // defaultLogDogHost is the default LogDog host, if one isn't specified via |
| + // query string. |
| + defaultLogDogHost = "luci-logdog.appspot.com" |
| +) |
| + |
| +// AnnotationStream is a ThemedHandler that renders a LogDog Milo annotation |
| +// protobuf stream. |
| +// |
| +// The protobuf stream is fetched live from LogDog and cached locally, either |
| +// temporarily (if incomplete) or indefinitely (if complete). |
| +type AnnotationStream struct{} |
|
Ryan Tseng
2016/07/29 18:16:38
Move AnnotationStream and it's methods to html.go.
dnj
2016/07/29 19:57:34
"AnnotationStream" is explicitly a build.html hand
Ryan Tseng
2016/07/29 20:34:17
In other words, all "ThemedHandler" implementation
dnj
2016/07/29 21:23:37
Done.
|
| + |
| +// GetTemplateName implements settings.ThemedHandler. |
| +func (s *AnnotationStream) GetTemplateName(t settings.Theme) string { |
| + return "build.html" |
| +} |
| + |
| +// Render implements settings.ThemedHandler. |
| +func (s *AnnotationStream) Render(c context.Context, req *http.Request, p httprouter.Params) (*templates.Args, error) { |
| + as := annotationStreamRequest{ |
| + project: config.ProjectName(p.ByName("project")), |
| + path: types.StreamPath(strings.Trim(p.ByName("path"), "/")), |
| + host: req.FormValue("host"), |
|
nodir
2016/07/29 18:42:06
buildbucket and swarming handlers use "server" nam
dnj
2016/07/29 19:57:35
Lots of other packages, including pRPC, use "host"
Ryan Tseng
2016/07/29 20:34:17
I agree with nodir, we should keep it consistent w
dnj
2016/07/29 21:23:37
Discussed, using "host" everywhere.
|
| + } |
| + if err := as.normalize(); err != nil { |
| + return nil, err |
| + } |
| + |
| + // Load the Milo annotation protobuf from the annotation stream. |
| + if err := as.load(c); err != nil { |
| + return nil, err |
| + } |
| + |
| + // Convert the Milo Annotation protobuf to Milo objects. |
| + return &templates.Args{ |
| + "Build": as.toMiloBuild(c), |
| + }, nil |
| +} |
| + |
| +type annotationStreamRequest struct { |
| + // host is the name of the LogDog host. |
| + host string |
| + // project is the project. |
|
nodir
2016/07/29 18:42:06
this is not useful :)
dnj
2016/07/29 19:57:35
Done.
|
| + project config.ProjectName |
| + // path is the stream path. |
| + path types.StreamPath |
| + |
| + // item is the unmarshalled annotation stream Step and associated data. |
| + item Item |
| +} |
| + |
| +func (as *annotationStreamRequest) normalize() error { |
| + if err := as.project.Validate(); err != nil { |
| + return &miloerror.Error{ |
| + Message: "Invalid project name", |
| + Code: http.StatusBadRequest, |
| + } |
| + } |
| + |
| + if err := as.path.Validate(); err != nil { |
| + return &miloerror.Error{ |
| + Message: fmt.Sprintf("Invalid log stream path %q: %s", as.path, err), |
| + Code: http.StatusBadRequest, |
| + } |
| + } |
| + |
| + // Get the host. We normalize it to lowercase and trim spaces since we use |
| + // it as a memcache key. |
| + if as.host == "" { |
|
nodir
2016/07/29 18:42:06
do this after trimming, so as.host = " " is conver
dnj
2016/07/29 19:57:35
Done.
|
| + as.host = defaultLogDogHost |
| + } |
| + as.host = strings.ToLower(strings.TrimSpace(as.host)) |
|
nodir
2016/07/29 18:42:06
please verify that it does not have slashes, so ou
dnj
2016/07/29 19:57:35
Done.
|
| + |
| + return nil |
| +} |
| + |
| +func (as *annotationStreamRequest) memcacheKey() string { |
| + return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path) |
| +} |
| + |
| +func (as *annotationStreamRequest) load(c context.Context) error { |
| + // Load from memcache, if possible. If an error occurs, we will proceed as if |
| + // no cache item was available. |
| + mcKey := as.memcacheKey() |
| + mcItem, err := memcache.Get(c).Get(mcKey) |
| + switch err { |
| + case nil: |
| + if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil { |
| + return nil |
| + } |
| + |
| + // We could not unmarshal the cached value. Try and delete it from |
| + // memcache, since it's invalid. |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "memcacheKey": mcKey, |
| + }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") |
| + if err := memcache.Get(c).Delete(mcKey); err != nil { |
| + log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.") |
| + } |
| + |
| + case memcache.ErrCacheMiss: |
| + break |
| + |
| + default: |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "memcacheKey": mcKey, |
| + }.Warningf(c, "Failed to load annotation protobuf memcache item.") |
|
nodir
2016/07/29 18:42:06
i think it is ok to make it Errorf since it may ha
dnj
2016/07/29 19:57:34
Done.
|
| + } |
| + |
| + // Load from LogDog directly. |
| + a, err := authClient.Transport(c, nil, nil) |
| + if err != nil { |
| + log.WithError(err).Errorf(c, "Failed to get transport for LogDog server.") |
| + return &miloerror.Error{ |
| + Code: http.StatusInternalServerError, |
| + } |
| + } |
| + |
| + client := logdog.NewLogsPRPCClient(&prpc.Client{ |
| + C: &http.Client{ |
|
nodir
2016/07/29 18:42:06
http.Client reuses TCP connections, but if we crea
dnj
2016/07/29 19:57:34
Done.
|
| + Transport: a, |
| + }, |
| + Host: as.host, |
| + }) |
| + resp, err := client.Tail(c, &logdog.TailRequest{ |
| + Project: string(as.project), |
| + Path: string(as.path), |
| + State: true, |
| + }) |
| + if err != nil { |
|
nodir
2016/07/29 18:42:06
nit: consider `case codes.OK:` as an alternative t
dnj
2016/07/29 19:57:35
Done.
|
| + switch code := grpcutil.Code(err); code { |
| + case codes.NotFound: |
| + return &miloerror.Error{ |
| + Message: "Stream not found", |
| + Code: http.StatusNotFound, |
| + } |
| + |
|
nodir
2016/07/29 18:42:06
return 403 in case of check codes.PermissionDenied
dnj
2016/07/29 19:57:35
Since we're authenticating as Milo rather than the
nodir
2016/07/29 23:00:31
until milo uses delegation tokens, it does not sup
|
| + default: |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "code": code, |
| + }.Errorf(c, "Failed to load LogDog annotation stream.") |
| + return &miloerror.Error{ |
| + Message: "Failed to load stream", |
| + Code: http.StatusInternalServerError, |
| + } |
| + } |
| + } |
| + |
| + // Make sure that this is an annotation stream! |
| + switch { |
| + case resp.Desc.ContentType != miloProto.ContentTypeAnnotations: |
| + return &miloerror.Error{ |
| + Message: "Requested stream is not a Milo annotation protobuf", |
| + Code: http.StatusBadRequest, |
| + } |
| + |
| + case resp.Desc.StreamType != logpb.StreamType_DATAGRAM: |
| + return &miloerror.Error{ |
| + Message: "Requested stream is not a datagram stream", |
| + Code: http.StatusBadRequest, |
| + } |
| + |
| + case len(resp.Logs) == 0: |
| + return &miloerror.Error{ |
| + Message: "Annotation stream data is not yet available", |
| + Code: http.StatusInternalServerError, |
|
nodir
2016/07/29 18:42:06
it is not an internal error. I think we should `re
dnj
2016/07/29 19:57:34
I'm not sure a blank page is better than "hey data
|
| + } |
| + } |
| + |
| + // Get the last log entry in the stream. In reality, this will be index 0, |
| + // since the "Tail" call should only return one log entry. |
| + latestStream := resp.Logs[len(resp.Logs)-1] |
| + dg := latestStream.GetDatagram() |
| + switch { |
| + case dg == nil: |
| + return &miloerror.Error{ |
| + Message: "Datagram stream does not have datagram data", |
| + Code: http.StatusInternalServerError, |
| + } |
| + |
| + case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last): |
| + return &miloerror.Error{ |
| + Message: "Partial datagram streams are not supported yet", |
| + Code: http.StatusNotImplemented, |
|
nodir
2016/07/29 18:42:06
how often this is going to happen in practice?
if
dnj
2016/07/29 19:57:34
It's part of the protocol, and could happen if a g
|
| + } |
| + } |
| + |
| + // Attempt to decode the Step protobuf. |
| + var step miloProto.Step |
| + if err := proto.Unmarshal(dg.Data, &step); err != nil { |
| + return &miloerror.Error{ |
| + Message: "Failed to unmarshal annotation protobuf", |
| + Code: http.StatusInternalServerError, |
| + } |
| + } |
| + |
| + var latestEndedTime time.Time |
| + for _, sub := range step.Substep { |
| + switch t := sub.Substep.(type) { |
| + case *miloProto.Step_Substep_AnnotationStream: |
| + // TODO(hinoka,dnj): Implement recursive / embedded substream fetching if |
| + // specified. |
| + log.Warningf(c, "Annotation stream links LogDog substream [%+v], not supported!", t.AnnotationStream) |
| + |
| + case *miloProto.Step_Substep_Step: |
| + endedTime := t.Step.Ended.Time() |
| + if t.Step.Ended != nil && (latestEndedTime.IsZero() || endedTime.After(latestEndedTime)) { |
|
nodir
2016/07/29 18:42:06
i think this condition should be just
if endedTim
dnj
2016/07/29 19:57:34
Done.
|
| + latestEndedTime = endedTime |
| + } |
| + } |
| + } |
| + if latestEndedTime.IsZero() { |
| + // No substep had an ended time :( |
| + latestEndedTime = step.Started.Time() |
| + } |
| + |
| + // Build our Item. |
| + as.item = Item{ |
| + Step: &step, |
| + Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamIndex == uint64(resp.State.TerminalIndex)), |
| + } |
| + |
| + // Annotee is apparently not putting an ended time on some annotation protos. |
| + // This hack will ensure that a finished build will always have an ended time. |
| + if as.item.Finished && as.item.Step.Ended == nil { |
| + as.item.Step.Ended = google.NewTimestamp(latestEndedTime) |
| + } |
| + |
| + // Marshal and cache the item. If this is the final protobuf in the stream, |
| + // cache it indefinitely; otherwise, cache it for intermediateCacheLifetime. |
| + // |
| + // If this fails, it is non-fatal. |
| + mcData, err := proto.Marshal(&as.item) |
| + if err == nil { |
| + mcItem = memcache.Get(c).NewItem(mcKey) |
| + if !as.item.Finished { |
| + mcItem.SetExpiration(intermediateCacheLifetime) |
| + } |
| + mcItem.SetValue(mcData) |
| + if err := memcache.Get(c).Set(mcItem); err != nil { |
| + log.WithError(err).Warningf(c, "Failed to cache annotation protobuf Item.") |
| + } |
| + } else { |
| + log.WithError(err).Warningf(c, "Failed to marshal annotation protobuf Item.") |
| + } |
| + |
| + return nil |
| +} |
| + |
| +func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuild { |
| + prefix, name := as.path.Split() |
| + |
| + // Prepare a Streams object with only one stream. |
| + streams := Streams{ |
| + MainStream: &Stream{ |
| + Server: as.host, |
| + Prefix: string(prefix), |
| + Path: string(name), |
| + IsDatagram: true, |
| + Data: as.item.Step, |
| + Closed: as.item.Finished, |
| + }, |
| + } |
| + |
| + var ( |
| + build resp.MiloBuild |
| + ub = logDogURLBuilder{ |
| + project: as.project, |
| + host: as.host, |
| + prefix: prefix, |
| + } |
| + ) |
| + AddLogDogToBuild(c, &ub, &streams, &build) |
| + |
| + // If we're still building, the duration is the difference between start time |
| + // and now. |
| + return &build |
| +} |
| + |
| +type logDogURLBuilder struct { |
| + host string |
| + prefix types.StreamName |
| + project config.ProjectName |
| +} |
| + |
| +func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { |
| + switch t := l.Value.(type) { |
| + case *miloProto.Link_LogdogStream: |
| + ls := t.LogdogStream |
| + |
| + server := ls.Server |
| + if server == "" { |
| + server = b.host |
| + } |
| + |
| + prefix := types.StreamName(ls.Prefix) |
| + if prefix == "" { |
| + prefix = b.prefix |
| + } |
| + |
| + path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.StreamName(ls.Name))) |
| + u := url.URL{ |
| + Scheme: "https", |
| + Host: server, |
| + Path: "v/", |
| + RawQuery: url.Values{ |
| + "s": []string{string(path)}, |
| + }.Encode(), |
| + } |
| + |
| + link := resp.Link{ |
| + Label: l.Label, |
| + URL: u.String(), |
| + } |
| + if link.Label == "" { |
| + link.Label = ls.Name |
| + } |
| + return &link |
| + |
| + case *miloProto.Link_Url: |
| + link := resp.Link{ |
| + Label: l.Label, |
| + URL: t.Url, |
| + } |
| + if link.Label == "" { |
| + link.Label = "unnamed" |
| + } |
| + return &link |
| + |
| + default: |
| + // Don't know how to render. |
| + return nil |
| + } |
| +} |