| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package logdog |
| 6 |
| 7 import ( |
| 8 "errors" |
| 9 "fmt" |
| 10 "net/http" |
| 11 "net/url" |
| 12 "strings" |
| 13 "time" |
| 14 |
| 15 "github.com/luci/luci-go/appengine/cmd/milo/logdog/internal" |
| 16 "github.com/luci/luci-go/appengine/cmd/milo/miloerror" |
| 17 "github.com/luci/luci-go/appengine/cmd/milo/resp" |
| 18 "github.com/luci/luci-go/common/config" |
| 19 "github.com/luci/luci-go/common/grpcutil" |
| 20 log "github.com/luci/luci-go/common/logging" |
| 21 "github.com/luci/luci-go/common/proto/google" |
| 22 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 23 "github.com/luci/luci-go/common/prpc" |
| 24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 25 "github.com/luci/luci-go/logdog/api/logpb" |
| 26 "github.com/luci/luci-go/logdog/common/types" |
| 27 |
| 28 "github.com/golang/protobuf/proto" |
| 29 "github.com/luci/gae/service/memcache" |
| 30 "golang.org/x/net/context" |
| 31 "google.golang.org/grpc/codes" |
| 32 ) |
| 33 |
| 34 const ( |
| 35 // intermediateCacheLifetime is the amount of time to cache intermediate
(non- |
| 36 // terminal) annotation streams. Terminal annotation streams are cached |
| 37 // indefinitely. |
| 38 intermediateCacheLifetime = 10 * time.Second |
| 39 |
| 40 // defaultLogDogHost is the default LogDog host, if one isn't specified
via |
| 41 // query string. |
| 42 defaultLogDogHost = "luci-logdog.appspot.com" |
| 43 ) |
| 44 |
| 45 type annotationStreamRequest struct { |
| 46 *AnnotationStream |
| 47 |
| 48 // host is the name of the LogDog host. |
| 49 host string |
| 50 |
| 51 project config.ProjectName |
| 52 path types.StreamPath |
| 53 |
| 54 // logDogClient is the HTTP client to use for LogDog communication. |
| 55 logDogClient http.Client |
| 56 |
| 57 // cs is the unmarshalled annotation stream Step and associated data. |
| 58 cs internal.CachedStep |
| 59 } |
| 60 |
| 61 func (as *annotationStreamRequest) normalize() error { |
| 62 if err := as.project.Validate(); err != nil { |
| 63 return &miloerror.Error{ |
| 64 Message: "Invalid project name", |
| 65 Code: http.StatusBadRequest, |
| 66 } |
| 67 } |
| 68 |
| 69 if err := as.path.Validate(); err != nil { |
| 70 return &miloerror.Error{ |
| 71 Message: fmt.Sprintf("Invalid log stream path %q: %s", a
s.path, err), |
| 72 Code: http.StatusBadRequest, |
| 73 } |
| 74 } |
| 75 |
| 76 // Get the host. We normalize it to lowercase and trim spaces since we u
se |
| 77 // it as a memcache key. |
| 78 as.host = strings.ToLower(strings.TrimSpace(as.host)) |
| 79 if as.host == "" { |
| 80 as.host = defaultLogDogHost |
| 81 } |
| 82 if strings.ContainsRune(as.host, '/') { |
| 83 return errors.New("invalid host name") |
| 84 } |
| 85 |
| 86 return nil |
| 87 } |
| 88 |
| 89 func (as *annotationStreamRequest) memcacheKey() string { |
| 90 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path) |
| 91 } |
| 92 |
| 93 func (as *annotationStreamRequest) load(c context.Context) error { |
| 94 // Load from memcache, if possible. If an error occurs, we will proceed
as if |
| 95 // no CachedStep was available. |
| 96 mcKey := as.memcacheKey() |
| 97 mcItem, err := memcache.Get(c).Get(mcKey) |
| 98 switch err { |
| 99 case nil: |
| 100 if err := proto.Unmarshal(mcItem.Value(), &as.cs); err == nil { |
| 101 return nil |
| 102 } |
| 103 |
| 104 // We could not unmarshal the cached value. Try and delete it fr
om |
| 105 // memcache, since it's invalid. |
| 106 log.Fields{ |
| 107 log.ErrorKey: err, |
| 108 "memcacheKey": mcKey, |
| 109 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") |
| 110 if err := memcache.Get(c).Delete(mcKey); err != nil { |
| 111 log.WithError(err).Warningf(c, "Failed to delete invalid
annotation protobuf memcache entry.") |
| 112 } |
| 113 |
| 114 case memcache.ErrCacheMiss: |
| 115 break |
| 116 |
| 117 default: |
| 118 log.Fields{ |
| 119 log.ErrorKey: err, |
| 120 "memcacheKey": mcKey, |
| 121 }.Errorf(c, "Failed to load annotation protobuf memcache cached
step.") |
| 122 } |
| 123 |
| 124 // Load from LogDog directly. |
| 125 client := logdog.NewLogsPRPCClient(&prpc.Client{ |
| 126 C: &as.logDogClient, |
| 127 Host: as.host, |
| 128 }) |
| 129 |
| 130 log.Fields{ |
| 131 "project": as.project, |
| 132 "path": as.path, |
| 133 "host": as.host, |
| 134 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") |
| 135 resp, err := client.Tail(c, &logdog.TailRequest{ |
| 136 Project: string(as.project), |
| 137 Path: string(as.path), |
| 138 State: true, |
| 139 }) |
| 140 switch code := grpcutil.Code(err); code { |
| 141 case codes.OK: |
| 142 break |
| 143 |
| 144 case codes.NotFound: |
| 145 return &miloerror.Error{ |
| 146 Message: "Stream not found", |
| 147 Code: http.StatusNotFound, |
| 148 } |
| 149 |
| 150 default: |
| 151 // TODO: Once we switch to delegation tokens and are making the
request on |
| 152 // behalf of a user rather than the Milo service, handle Permiss
ionDenied. |
| 153 log.Fields{ |
| 154 log.ErrorKey: err, |
| 155 "code": code, |
| 156 }.Errorf(c, "Failed to load LogDog annotation stream.") |
| 157 return &miloerror.Error{ |
| 158 Message: "Failed to load stream", |
| 159 Code: http.StatusInternalServerError, |
| 160 } |
| 161 } |
| 162 |
| 163 // Make sure that this is an annotation stream. |
| 164 switch { |
| 165 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations: |
| 166 return &miloerror.Error{ |
| 167 Message: "Requested stream is not a Milo annotation prot
obuf", |
| 168 Code: http.StatusBadRequest, |
| 169 } |
| 170 |
| 171 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM: |
| 172 return &miloerror.Error{ |
| 173 Message: "Requested stream is not a datagram stream", |
| 174 Code: http.StatusBadRequest, |
| 175 } |
| 176 |
| 177 case len(resp.Logs) == 0: |
| 178 // No annotation stream data, so render a minimal page. |
| 179 return nil |
| 180 } |
| 181 |
| 182 // Get the last log entry in the stream. In reality, this will be index
0, |
| 183 // since the "Tail" call should only return one log entry. |
| 184 latestStream := resp.Logs[len(resp.Logs)-1] |
| 185 dg := latestStream.GetDatagram() |
| 186 switch { |
| 187 case dg == nil: |
| 188 return &miloerror.Error{ |
| 189 Message: "Datagram stream does not have datagram data", |
| 190 Code: http.StatusInternalServerError, |
| 191 } |
| 192 |
| 193 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last): |
| 194 // LogDog splits large datagrams into consecutive fragments. If
the |
| 195 // annotation state is fragmented, a reconstruction algorithm wi
ll have to |
| 196 // be employed here to build the full datagram before processing
. |
| 197 // |
| 198 // At the moment, no annotation streams are expected to be anywh
ere close to |
| 199 // this large, so we're going to handle this case by erroring. A |
| 200 // reconstruction algorithm would look like: |
| 201 // 1) "Tail" to get the latest datagram, identify it as partial. |
| 202 // 1a) Perform a bounds check on the total datagram size to ensu
re that it |
| 203 // can be safely reconstructed. |
| 204 // 2) Determine if it's the last partial index. If not, then the
latest |
| 205 // datagram is incomplete. Determine our initial datagram's
stream index |
| 206 // the by subtracting the partial index from this message's
stream index. |
| 207 // 2a) If this datagram index is "0", the first datagram in the
stream is |
| 208 // partial and all of the data isn't here, so treat this as
"no data". |
| 209 // 2b) Otherwise, goto (1), using "Get" request on the datagram
index minus |
| 210 // one to get the previous datagram. |
| 211 // 3) Issue a "Get" request for our initial datagram index throu
gh the index |
| 212 // preceding ours. |
| 213 // 4) Reassemble the binary data from the full set of datagrams. |
| 214 return &miloerror.Error{ |
| 215 Message: "Partial datagram streams are not supported yet
", |
| 216 Code: http.StatusNotImplemented, |
| 217 } |
| 218 } |
| 219 |
| 220 // Attempt to decode the Step protobuf. |
| 221 var step miloProto.Step |
| 222 if err := proto.Unmarshal(dg.Data, &step); err != nil { |
| 223 return &miloerror.Error{ |
| 224 Message: "Failed to unmarshal annotation protobuf", |
| 225 Code: http.StatusInternalServerError, |
| 226 } |
| 227 } |
| 228 |
| 229 var latestEndedTime time.Time |
| 230 for _, sub := range step.Substep { |
| 231 switch t := sub.Substep.(type) { |
| 232 case *miloProto.Step_Substep_AnnotationStream: |
| 233 // TODO(hinoka,dnj): Implement recursive / embedded subs
tream fetching if |
| 234 // specified. |
| 235 log.Warningf(c, "Annotation stream links LogDog substrea
m [%+v], not supported!", t.AnnotationStream) |
| 236 |
| 237 case *miloProto.Step_Substep_Step: |
| 238 endedTime := t.Step.Ended.Time() |
| 239 if t.Step.Ended != nil && endedTime.After(latestEndedTim
e) { |
| 240 latestEndedTime = endedTime |
| 241 } |
| 242 } |
| 243 } |
| 244 if latestEndedTime.IsZero() { |
| 245 // No substep had an ended time :( |
| 246 latestEndedTime = step.Started.Time() |
| 247 } |
| 248 |
| 249 // Build our CachedStep. |
| 250 as.cs = internal.CachedStep{ |
| 251 Step: &step, |
| 252 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI
ndex == uint64(resp.State.TerminalIndex)), |
| 253 } |
| 254 |
| 255 // Annotee is apparently not putting an ended time on some annotation pr
otos. |
| 256 // This hack will ensure that a finished build will always have an ended
time. |
| 257 if as.cs.Finished && as.cs.Step.Ended == nil { |
| 258 as.cs.Step.Ended = google.NewTimestamp(latestEndedTime) |
| 259 } |
| 260 |
| 261 // Marshal and cache the step. If this is the final protobuf in the stre
am, |
| 262 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet
ime. |
| 263 // |
| 264 // If this fails, it is non-fatal. |
| 265 mcData, err := proto.Marshal(&as.cs) |
| 266 if err == nil { |
| 267 mcItem = memcache.Get(c).NewItem(mcKey) |
| 268 if !as.cs.Finished { |
| 269 mcItem.SetExpiration(intermediateCacheLifetime) |
| 270 } |
| 271 mcItem.SetValue(mcData) |
| 272 if err := memcache.Get(c).Set(mcItem); err != nil { |
| 273 log.WithError(err).Warningf(c, "Failed to cache annotati
on protobuf CachedStep.") |
| 274 } |
| 275 } else { |
| 276 log.WithError(err).Warningf(c, "Failed to marshal annotation pro
tobuf CachedStep.") |
| 277 } |
| 278 |
| 279 return nil |
| 280 } |
| 281 |
| 282 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil
d { |
| 283 prefix, name := as.path.Split() |
| 284 |
| 285 // Prepare a Streams object with only one stream. |
| 286 streams := Streams{ |
| 287 MainStream: &Stream{ |
| 288 Server: as.host, |
| 289 Prefix: string(prefix), |
| 290 Path: string(name), |
| 291 IsDatagram: true, |
| 292 Data: as.cs.Step, |
| 293 Closed: as.cs.Finished, |
| 294 }, |
| 295 } |
| 296 |
| 297 var ( |
| 298 build resp.MiloBuild |
| 299 ub = logDogURLBuilder{ |
| 300 project: as.project, |
| 301 host: as.host, |
| 302 prefix: prefix, |
| 303 } |
| 304 ) |
| 305 AddLogDogToBuild(c, &ub, &streams, &build) |
| 306 return &build |
| 307 } |
| 308 |
| 309 type logDogURLBuilder struct { |
| 310 host string |
| 311 prefix types.StreamName |
| 312 project config.ProjectName |
| 313 } |
| 314 |
| 315 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { |
| 316 switch t := l.Value.(type) { |
| 317 case *miloProto.Link_LogdogStream: |
| 318 ls := t.LogdogStream |
| 319 |
| 320 server := ls.Server |
| 321 if server == "" { |
| 322 server = b.host |
| 323 } |
| 324 |
| 325 prefix := types.StreamName(ls.Prefix) |
| 326 if prefix == "" { |
| 327 prefix = b.prefix |
| 328 } |
| 329 |
| 330 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream
Name(ls.Name))) |
| 331 u := url.URL{ |
| 332 Scheme: "https", |
| 333 Host: server, |
| 334 Path: "v/", |
| 335 RawQuery: url.Values{ |
| 336 "s": []string{string(path)}, |
| 337 }.Encode(), |
| 338 } |
| 339 |
| 340 link := resp.Link{ |
| 341 Label: l.Label, |
| 342 URL: u.String(), |
| 343 } |
| 344 if link.Label == "" { |
| 345 link.Label = ls.Name |
| 346 } |
| 347 return &link |
| 348 |
| 349 case *miloProto.Link_Url: |
| 350 link := resp.Link{ |
| 351 Label: l.Label, |
| 352 URL: t.Url, |
| 353 } |
| 354 if link.Label == "" { |
| 355 link.Label = "unnamed" |
| 356 } |
| 357 return &link |
| 358 |
| 359 default: |
| 360 // Don't know how to render. |
| 361 return nil |
| 362 } |
| 363 } |
| OLD | NEW |