| OLD | NEW |
| (Empty) |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package raw_presentation | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "fmt" | |
| 10 "time" | |
| 11 | |
| 12 log "github.com/luci/luci-go/common/logging" | |
| 13 "github.com/luci/luci-go/common/proto/google" | |
| 14 miloProto "github.com/luci/luci-go/common/proto/milo" | |
| 15 "github.com/luci/luci-go/logdog/api/logpb" | |
| 16 "github.com/luci/luci-go/logdog/client/coordinator" | |
| 17 "github.com/luci/luci-go/logdog/common/types" | |
| 18 "github.com/luci/luci-go/logdog/common/viewer" | |
| 19 "github.com/luci/luci-go/luci_config/common/cfgtypes" | |
| 20 "github.com/luci/luci-go/milo/api/resp" | |
| 21 "github.com/luci/luci-go/milo/build_source/raw_presentation/internal" | |
| 22 | |
| 23 "github.com/golang/protobuf/proto" | |
| 24 "golang.org/x/net/context" | |
| 25 ) | |
| 26 | |
| 27 const ( | |
| 28 // intermediateCacheLifetime is the amount of time to cache intermediate
(non- | |
| 29 // terminal) annotation streams. Terminal annotation streams are cached | |
| 30 // indefinitely. | |
| 31 intermediateCacheLifetime = 10 * time.Second | |
| 32 | |
| 33 // defaultLogDogHost is the default LogDog host, if one isn't specified
via | |
| 34 // query string. | |
| 35 defaultLogDogHost = "luci-logdog.appspot.com" | |
| 36 ) | |
| 37 | |
| 38 // AnnotationStream represents a LogDog annotation protobuf stream. | |
| 39 type AnnotationStream struct { | |
| 40 Project cfgtypes.ProjectName | |
| 41 Path types.StreamPath | |
| 42 | |
| 43 // Client is the HTTP client to use for LogDog communication. | |
| 44 Client *coordinator.Client | |
| 45 | |
| 46 // cs is the unmarshalled annotation stream Step and associated data. | |
| 47 cs internal.CachedStep | |
| 48 } | |
| 49 | |
| 50 // Normalize validates and normalizes the stream's parameters. | |
| 51 func (as *AnnotationStream) Normalize() error { | |
| 52 if err := as.Project.Validate(); err != nil { | |
| 53 return fmt.Errorf("Invalid project name: %s", as.Project) | |
| 54 } | |
| 55 | |
| 56 if err := as.Path.Validate(); err != nil { | |
| 57 return fmt.Errorf("Invalid log stream path %q: %s", as.Path, err
) | |
| 58 } | |
| 59 | |
| 60 return nil | |
| 61 } | |
| 62 | |
| 63 var errNotMilo = errors.New("Requested stream is not a Milo annotation protobuf"
) | |
| 64 var errNotDatagram = errors.New("Requested stream is not a datagram stream") | |
| 65 var errNoEntries = errors.New("Log stream has no annotation entries") | |
| 66 | |
| 67 // Fetch loads the annotation stream from LogDog. | |
| 68 // | |
| 69 // If the stream does not exist, or is invalid, Fetch will return a Milo error. | |
| 70 // Otherwise, it will return the Step that was loaded. | |
| 71 // | |
| 72 // Fetch caches the step, so multiple calls to Fetch will return the same Step | |
| 73 // value. | |
| 74 func (as *AnnotationStream) Fetch(c context.Context) (*miloProto.Step, error) { | |
| 75 // Cached? | |
| 76 if as.cs.Step != nil { | |
| 77 return as.cs.Step, nil | |
| 78 } | |
| 79 | |
| 80 // Load from LogDog directly. | |
| 81 log.Fields{ | |
| 82 "host": as.Client.Host, | |
| 83 "project": as.Project, | |
| 84 "path": as.Path, | |
| 85 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") | |
| 86 | |
| 87 var ( | |
| 88 state coordinator.LogStream | |
| 89 stream = as.Client.Stream(as.Project, as.Path) | |
| 90 ) | |
| 91 | |
| 92 le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com
plete()) | |
| 93 if err != nil { | |
| 94 log.WithError(err).Errorf(c, "Failed to load stream.") | |
| 95 return nil, err | |
| 96 } | |
| 97 | |
| 98 // Make sure that this is an annotation stream. | |
| 99 switch { | |
| 100 case state.Desc.ContentType != miloProto.ContentTypeAnnotations: | |
| 101 return nil, errNotMilo | |
| 102 | |
| 103 case state.Desc.StreamType != logpb.StreamType_DATAGRAM: | |
| 104 return nil, errNotDatagram | |
| 105 | |
| 106 case le == nil: | |
| 107 // No annotation stream data, so render a minimal page. | |
| 108 return nil, errNoEntries | |
| 109 } | |
| 110 | |
| 111 // Get the last log entry in the stream. In reality, this will be index
0, | |
| 112 // since the "Tail" call should only return one log entry. | |
| 113 // | |
| 114 // Because we supplied the "Complete" flag to Tail and suceeded, this da
tagram | |
| 115 // will be complete even if its source datagram(s) are fragments. | |
| 116 dg := le.GetDatagram() | |
| 117 if dg == nil { | |
| 118 return nil, errors.New("Datagram stream does not have datagram d
ata") | |
| 119 } | |
| 120 | |
| 121 // Attempt to decode the Step protobuf. | |
| 122 var step miloProto.Step | |
| 123 if err := proto.Unmarshal(dg.Data, &step); err != nil { | |
| 124 return nil, err | |
| 125 } | |
| 126 | |
| 127 var latestEndedTime time.Time | |
| 128 for _, sub := range step.Substep { | |
| 129 switch t := sub.Substep.(type) { | |
| 130 case *miloProto.Step_Substep_AnnotationStream: | |
| 131 // TODO(hinoka,dnj): Implement recursive / embedded subs
tream fetching if | |
| 132 // specified. | |
| 133 log.Warningf(c, "Annotation stream links LogDog substrea
m [%+v], not supported!", t.AnnotationStream) | |
| 134 | |
| 135 case *miloProto.Step_Substep_Step: | |
| 136 endedTime := google.TimeFromProto(t.Step.Ended) | |
| 137 if t.Step.Ended != nil && endedTime.After(latestEndedTim
e) { | |
| 138 latestEndedTime = endedTime | |
| 139 } | |
| 140 } | |
| 141 } | |
| 142 if latestEndedTime.IsZero() { | |
| 143 // No substep had an ended time :( | |
| 144 latestEndedTime = google.TimeFromProto(step.Started) | |
| 145 } | |
| 146 | |
| 147 // Build our CachedStep. | |
| 148 as.cs = internal.CachedStep{ | |
| 149 Step: &step, | |
| 150 Finished: (state.State.TerminalIndex >= 0 && le.StreamIndex == u
int64(state.State.TerminalIndex)), | |
| 151 } | |
| 152 return as.cs.Step, nil | |
| 153 } | |
| 154 | |
| 155 func (as *AnnotationStream) toMiloBuild(c context.Context) *resp.MiloBuild { | |
| 156 prefix, name := as.Path.Split() | |
| 157 | |
| 158 // Prepare a Streams object with only one stream. | |
| 159 streams := Streams{ | |
| 160 MainStream: &Stream{ | |
| 161 Server: as.Client.Host, | |
| 162 Prefix: string(prefix), | |
| 163 Path: string(name), | |
| 164 IsDatagram: true, | |
| 165 Data: as.cs.Step, | |
| 166 Closed: as.cs.Finished, | |
| 167 }, | |
| 168 } | |
| 169 | |
| 170 var ( | |
| 171 build resp.MiloBuild | |
| 172 ub = ViewerURLBuilder{ | |
| 173 Host: as.Client.Host, | |
| 174 Project: as.Project, | |
| 175 Prefix: prefix, | |
| 176 } | |
| 177 ) | |
| 178 AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build) | |
| 179 return &build | |
| 180 } | |
| 181 | |
| 182 // ViewerURLBuilder is a URL builder that constructs LogDog viewer URLs. | |
| 183 type ViewerURLBuilder struct { | |
| 184 Host string | |
| 185 Prefix types.StreamName | |
| 186 Project cfgtypes.ProjectName | |
| 187 } | |
| 188 | |
| 189 // BuildLink implements URLBuilder. | |
| 190 func (b *ViewerURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { | |
| 191 switch t := l.Value.(type) { | |
| 192 case *miloProto.Link_LogdogStream: | |
| 193 ls := t.LogdogStream | |
| 194 | |
| 195 server := ls.Server | |
| 196 if server == "" { | |
| 197 server = b.Host | |
| 198 } | |
| 199 | |
| 200 prefix := types.StreamName(ls.Prefix) | |
| 201 if prefix == "" { | |
| 202 prefix = b.Prefix | |
| 203 } | |
| 204 | |
| 205 u := viewer.GetURL(server, b.Project, prefix.Join(types.StreamNa
me(ls.Name))) | |
| 206 link := resp.NewLink(l.Label, u) | |
| 207 if link.Label == "" { | |
| 208 link.Label = ls.Name | |
| 209 } | |
| 210 return link | |
| 211 | |
| 212 case *miloProto.Link_Url: | |
| 213 link := resp.NewLink(l.Label, t.Url) | |
| 214 if link.Label == "" { | |
| 215 link.Label = "unnamed" | |
| 216 } | |
| 217 return link | |
| 218 | |
| 219 default: | |
| 220 // Don't know how to render. | |
| 221 return nil | |
| 222 } | |
| 223 } | |
| OLD | NEW |