Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1177)

Unified Diff: appengine/cmd/milo/logdog/handler.go

Issue 2191693003: Milo: Add LogDog annotation stream support. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Fix successful build state, derive more Swarming parameters from milo proto common code. Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/cmd/milo/logdog/handler.go
diff --git a/appengine/cmd/milo/logdog/handler.go b/appengine/cmd/milo/logdog/handler.go
new file mode 100644
index 0000000000000000000000000000000000000000..c581a76de19fc430f0ff7f43db1c5e86d49d56f6
--- /dev/null
+++ b/appengine/cmd/milo/logdog/handler.go
@@ -0,0 +1,378 @@
+// Copyright 2016 The LUCI Authors. All rights reserved.
Ryan Tseng 2016/07/29 18:16:38 rename this build.go, to match up with the other f
dnj 2016/07/29 19:57:34 Done.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package logdog
+
+import (
+ "fmt"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+
+ "github.com/luci/luci-go/appengine/cmd/milo/miloerror"
+ "github.com/luci/luci-go/appengine/cmd/milo/resp"
+ "github.com/luci/luci-go/appengine/cmd/milo/settings"
+ authClient "github.com/luci/luci-go/appengine/gaeauth/client"
+ "github.com/luci/luci-go/common/config"
+ "github.com/luci/luci-go/common/grpcutil"
+ log "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/proto/google"
+ miloProto "github.com/luci/luci-go/common/proto/milo"
+ "github.com/luci/luci-go/common/prpc"
+ "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
+ "github.com/luci/luci-go/logdog/api/logpb"
+ "github.com/luci/luci-go/logdog/common/types"
+ "github.com/luci/luci-go/server/templates"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/julienschmidt/httprouter"
+ "github.com/luci/gae/service/memcache"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+)
+
+const (
+ // intermediateCacheLifetime is the amount of time to cache intermediate (non-
+ // terminal) annotation streams. Terminal annotation streams are cached
+ // indefinitely.
+ intermediateCacheLifetime = 10 * time.Second
nodir 2016/07/29 18:42:06 this is a bit of regression because with swarming
dnj 2016/07/29 19:57:35 I think the caching is a good idea. I don't see 10
+
+ // defaultLogDogHost is the default LogDog host, if one isn't specified via
+ // query string.
+ defaultLogDogHost = "luci-logdog.appspot.com"
+)
+
+// AnnotationStream is a ThemedHandler that renders a LogDog Milo annotation
+// protobuf stream.
+//
+// The protobuf stream is fetched live from LogDog and cached locally, either
+// temporarily (if incomplete) or indefinitely (if complete).
+type AnnotationStream struct{}
Ryan Tseng 2016/07/29 18:16:38 Move AnnotationStream and it's methods to html.go.
dnj 2016/07/29 19:57:34 "AnnotationStream" is explicitly a build.html hand
Ryan Tseng 2016/07/29 20:34:17 In other words, all "ThemedHandler" implementation
dnj 2016/07/29 21:23:37 Done.
+
+// GetTemplateName implements settings.ThemedHandler.
+func (s *AnnotationStream) GetTemplateName(t settings.Theme) string {
+ return "build.html"
+}
+
+// Render implements settings.ThemedHandler.
+func (s *AnnotationStream) Render(c context.Context, req *http.Request, p httprouter.Params) (*templates.Args, error) {
+ as := annotationStreamRequest{
+ project: config.ProjectName(p.ByName("project")),
+ path: types.StreamPath(strings.Trim(p.ByName("path"), "/")),
+ host: req.FormValue("host"),
nodir 2016/07/29 18:42:06 buildbucket and swarming handlers use "server" nam
dnj 2016/07/29 19:57:35 Lots of other packages, including pRPC, use "host"
Ryan Tseng 2016/07/29 20:34:17 I agree with nodir, we should keep it consistent w
dnj 2016/07/29 21:23:37 Discussed, using "host" everywhere.
+ }
+ if err := as.normalize(); err != nil {
+ return nil, err
+ }
+
+ // Load the Milo annotation protobuf from the annotation stream.
+ if err := as.load(c); err != nil {
+ return nil, err
+ }
+
+ // Convert the Milo Annotation protobuf to Milo objects.
+ return &templates.Args{
+ "Build": as.toMiloBuild(c),
+ }, nil
+}
+
+type annotationStreamRequest struct {
+ // host is the name of the LogDog host.
+ host string
+ // project is the project.
nodir 2016/07/29 18:42:06 this is not useful :)
dnj 2016/07/29 19:57:35 Done.
+ project config.ProjectName
+ // path is the stream path.
+ path types.StreamPath
+
+ // item is the unmarshalled annotation stream Step and associated data.
+ item Item
+}
+
+func (as *annotationStreamRequest) normalize() error {
+ if err := as.project.Validate(); err != nil {
+ return &miloerror.Error{
+ Message: "Invalid project name",
+ Code: http.StatusBadRequest,
+ }
+ }
+
+ if err := as.path.Validate(); err != nil {
+ return &miloerror.Error{
+ Message: fmt.Sprintf("Invalid log stream path %q: %s", as.path, err),
+ Code: http.StatusBadRequest,
+ }
+ }
+
+ // Get the host. We normalize it to lowercase and trim spaces since we use
+ // it as a memcache key.
+ if as.host == "" {
nodir 2016/07/29 18:42:06 do this after trimming, so as.host = " " is conver
dnj 2016/07/29 19:57:35 Done.
+ as.host = defaultLogDogHost
+ }
+ as.host = strings.ToLower(strings.TrimSpace(as.host))
nodir 2016/07/29 18:42:06 please verify that it does not have slashes, so ou
dnj 2016/07/29 19:57:35 Done.
+
+ return nil
+}
+
+func (as *annotationStreamRequest) memcacheKey() string {
+ return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path)
+}
+
+func (as *annotationStreamRequest) load(c context.Context) error {
+ // Load from memcache, if possible. If an error occurs, we will proceed as if
+ // no cache item was available.
+ mcKey := as.memcacheKey()
+ mcItem, err := memcache.Get(c).Get(mcKey)
+ switch err {
+ case nil:
+ if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil {
+ return nil
+ }
+
+ // We could not unmarshal the cached value. Try and delete it from
+ // memcache, since it's invalid.
+ log.Fields{
+ log.ErrorKey: err,
+ "memcacheKey": mcKey,
+ }.Warningf(c, "Failed to unmarshal cached annotation protobuf.")
+ if err := memcache.Get(c).Delete(mcKey); err != nil {
+ log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.")
+ }
+
+ case memcache.ErrCacheMiss:
+ break
+
+ default:
+ log.Fields{
+ log.ErrorKey: err,
+ "memcacheKey": mcKey,
+ }.Warningf(c, "Failed to load annotation protobuf memcache item.")
nodir 2016/07/29 18:42:06 i think it is ok to make it Errorf since it may ha
dnj 2016/07/29 19:57:34 Done.
+ }
+
+ // Load from LogDog directly.
+ a, err := authClient.Transport(c, nil, nil)
+ if err != nil {
+ log.WithError(err).Errorf(c, "Failed to get transport for LogDog server.")
+ return &miloerror.Error{
+ Code: http.StatusInternalServerError,
+ }
+ }
+
+ client := logdog.NewLogsPRPCClient(&prpc.Client{
+ C: &http.Client{
nodir 2016/07/29 18:42:06 http.Client reuses TCP connections, but if we crea
dnj 2016/07/29 19:57:34 Done.
+ Transport: a,
+ },
+ Host: as.host,
+ })
+ resp, err := client.Tail(c, &logdog.TailRequest{
+ Project: string(as.project),
+ Path: string(as.path),
+ State: true,
+ })
+ if err != nil {
nodir 2016/07/29 18:42:06 nit: consider `case codes.OK:` as an alternative t
dnj 2016/07/29 19:57:35 Done.
+ switch code := grpcutil.Code(err); code {
+ case codes.NotFound:
+ return &miloerror.Error{
+ Message: "Stream not found",
+ Code: http.StatusNotFound,
+ }
+
nodir 2016/07/29 18:42:06 return 403 in case of check codes.PermissionDenied
dnj 2016/07/29 19:57:35 Since we're authenticating as Milo rather than the
nodir 2016/07/29 23:00:31 until milo uses delegation tokens, it does not sup
+ default:
+ log.Fields{
+ log.ErrorKey: err,
+ "code": code,
+ }.Errorf(c, "Failed to load LogDog annotation stream.")
+ return &miloerror.Error{
+ Message: "Failed to load stream",
+ Code: http.StatusInternalServerError,
+ }
+ }
+ }
+
+ // Make sure that this is an annotation stream!
+ switch {
+ case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
+ return &miloerror.Error{
+ Message: "Requested stream is not a Milo annotation protobuf",
+ Code: http.StatusBadRequest,
+ }
+
+ case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
+ return &miloerror.Error{
+ Message: "Requested stream is not a datagram stream",
+ Code: http.StatusBadRequest,
+ }
+
+ case len(resp.Logs) == 0:
+ return &miloerror.Error{
+ Message: "Annotation stream data is not yet available",
+ Code: http.StatusInternalServerError,
nodir 2016/07/29 18:42:06 it is not an internal error. I think we should `re
dnj 2016/07/29 19:57:34 I'm not sure a blank page is better than "hey data
+ }
+ }
+
+ // Get the last log entry in the stream. In reality, this will be index 0,
+ // since the "Tail" call should only return one log entry.
+ latestStream := resp.Logs[len(resp.Logs)-1]
+ dg := latestStream.GetDatagram()
+ switch {
+ case dg == nil:
+ return &miloerror.Error{
+ Message: "Datagram stream does not have datagram data",
+ Code: http.StatusInternalServerError,
+ }
+
+ case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
+ return &miloerror.Error{
+ Message: "Partial datagram streams are not supported yet",
+ Code: http.StatusNotImplemented,
nodir 2016/07/29 18:42:06 how often this is going to happen in practice? if
dnj 2016/07/29 19:57:34 It's part of the protocol, and could happen if a g
+ }
+ }
+
+ // Attempt to decode the Step protobuf.
+ var step miloProto.Step
+ if err := proto.Unmarshal(dg.Data, &step); err != nil {
+ return &miloerror.Error{
+ Message: "Failed to unmarshal annotation protobuf",
+ Code: http.StatusInternalServerError,
+ }
+ }
+
+ var latestEndedTime time.Time
+ for _, sub := range step.Substep {
+ switch t := sub.Substep.(type) {
+ case *miloProto.Step_Substep_AnnotationStream:
+ // TODO(hinoka,dnj): Implement recursive / embedded substream fetching if
+ // specified.
+ log.Warningf(c, "Annotation stream links LogDog substream [%+v], not supported!", t.AnnotationStream)
+
+ case *miloProto.Step_Substep_Step:
+ endedTime := t.Step.Ended.Time()
+ if t.Step.Ended != nil && (latestEndedTime.IsZero() || endedTime.After(latestEndedTime)) {
nodir 2016/07/29 18:42:06 i think this condition should be just if endedTim
dnj 2016/07/29 19:57:34 Done.
+ latestEndedTime = endedTime
+ }
+ }
+ }
+ if latestEndedTime.IsZero() {
+ // No substep had an ended time :(
+ latestEndedTime = step.Started.Time()
+ }
+
+ // Build our Item.
+ as.item = Item{
+ Step: &step,
+ Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamIndex == uint64(resp.State.TerminalIndex)),
+ }
+
+ // Annotee is apparently not putting an ended time on some annotation protos.
+ // This hack will ensure that a finished build will always have an ended time.
+ if as.item.Finished && as.item.Step.Ended == nil {
+ as.item.Step.Ended = google.NewTimestamp(latestEndedTime)
+ }
+
+ // Marshal and cache the item. If this is the final protobuf in the stream,
+ // cache it indefinitely; otherwise, cache it for intermediateCacheLifetime.
+ //
+ // If this fails, it is non-fatal.
+ mcData, err := proto.Marshal(&as.item)
+ if err == nil {
+ mcItem = memcache.Get(c).NewItem(mcKey)
+ if !as.item.Finished {
+ mcItem.SetExpiration(intermediateCacheLifetime)
+ }
+ mcItem.SetValue(mcData)
+ if err := memcache.Get(c).Set(mcItem); err != nil {
+ log.WithError(err).Warningf(c, "Failed to cache annotation protobuf Item.")
+ }
+ } else {
+ log.WithError(err).Warningf(c, "Failed to marshal annotation protobuf Item.")
+ }
+
+ return nil
+}
+
+func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuild {
+ prefix, name := as.path.Split()
+
+ // Prepare a Streams object with only one stream.
+ streams := Streams{
+ MainStream: &Stream{
+ Server: as.host,
+ Prefix: string(prefix),
+ Path: string(name),
+ IsDatagram: true,
+ Data: as.item.Step,
+ Closed: as.item.Finished,
+ },
+ }
+
+ var (
+ build resp.MiloBuild
+ ub = logDogURLBuilder{
+ project: as.project,
+ host: as.host,
+ prefix: prefix,
+ }
+ )
+ AddLogDogToBuild(c, &ub, &streams, &build)
+
+ // If we're still building, the duration is the difference between start time
+ // and now.
+ return &build
+}
+
+type logDogURLBuilder struct {
+ host string
+ prefix types.StreamName
+ project config.ProjectName
+}
+
+func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
+ switch t := l.Value.(type) {
+ case *miloProto.Link_LogdogStream:
+ ls := t.LogdogStream
+
+ server := ls.Server
+ if server == "" {
+ server = b.host
+ }
+
+ prefix := types.StreamName(ls.Prefix)
+ if prefix == "" {
+ prefix = b.prefix
+ }
+
+ path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.StreamName(ls.Name)))
+ u := url.URL{
+ Scheme: "https",
+ Host: server,
+ Path: "v/",
+ RawQuery: url.Values{
+ "s": []string{string(path)},
+ }.Encode(),
+ }
+
+ link := resp.Link{
+ Label: l.Label,
+ URL: u.String(),
+ }
+ if link.Label == "" {
+ link.Label = ls.Name
+ }
+ return &link
+
+ case *miloProto.Link_Url:
+ link := resp.Link{
+ Label: l.Label,
+ URL: t.Url,
+ }
+ if link.Label == "" {
+ link.Label = "unnamed"
+ }
+ return &link
+
+ default:
+ // Don't know how to render.
+ return nil
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698