| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package logdog |
| 6 |
| 7 import ( |
| 8 "errors" |
| 9 "fmt" |
| 10 "net/http" |
| 11 "net/url" |
| 12 "strings" |
| 13 "sync" |
| 14 "time" |
| 15 |
| 16 "github.com/luci/luci-go/appengine/cmd/milo/logdog/internal" |
| 17 "github.com/luci/luci-go/appengine/cmd/milo/miloerror" |
| 18 "github.com/luci/luci-go/appengine/cmd/milo/resp" |
| 19 "github.com/luci/luci-go/appengine/cmd/milo/settings" |
| 20 authClient "github.com/luci/luci-go/appengine/gaeauth/client" |
| 21 "github.com/luci/luci-go/common/config" |
| 22 "github.com/luci/luci-go/common/grpcutil" |
| 23 log "github.com/luci/luci-go/common/logging" |
| 24 "github.com/luci/luci-go/common/proto/google" |
| 25 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 26 "github.com/luci/luci-go/common/prpc" |
| 27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 28 "github.com/luci/luci-go/logdog/api/logpb" |
| 29 "github.com/luci/luci-go/logdog/common/types" |
| 30 "github.com/luci/luci-go/server/templates" |
| 31 |
| 32 "github.com/golang/protobuf/proto" |
| 33 "github.com/julienschmidt/httprouter" |
| 34 "github.com/luci/gae/service/memcache" |
| 35 "golang.org/x/net/context" |
| 36 "google.golang.org/grpc/codes" |
| 37 ) |
| 38 |
| 39 const ( |
| 40 // intermediateCacheLifetime is the amount of time to cache intermediate
(non- |
| 41 // terminal) annotation streams. Terminal annotation streams are cached |
| 42 // indefinitely. |
| 43 intermediateCacheLifetime = 10 * time.Second |
| 44 |
| 45 // defaultLogDogHost is the default LogDog host, if one isn't specified
via |
| 46 // query string. |
| 47 defaultLogDogHost = "luci-logdog.appspot.com" |
| 48 ) |
| 49 |
| 50 // AnnotationStream is a ThemedHandler that renders a LogDog Milo annotation |
| 51 // protobuf stream. |
| 52 // |
| 53 // The protobuf stream is fetched live from LogDog and cached locally, either |
| 54 // temporarily (if incomplete) or indefinitely (if complete). |
| 55 type AnnotationStream struct { |
| 56 // logDogClient is a reusable HTTP client to use for LogDog. |
| 57 logDogClient *http.Client |
| 58 |
| 59 initMu sync.Mutex |
| 60 } |
| 61 |
| 62 // init initializes the process-global state of the AnnotationStream. |
| 63 func (s *AnnotationStream) init(c context.Context) *miloerror.Error { |
| 64 s.initMu.Lock() |
| 65 defer s.initMu.Unlock() |
| 66 |
| 67 if s.logDogClient != nil { |
| 68 return nil |
| 69 } |
| 70 |
| 71 // Initialize the LogDog client. |
| 72 a, err := authClient.Transport(c, nil, nil) |
| 73 if err != nil { |
| 74 log.WithError(err).Errorf(c, "Failed to get transport for LogDog
server.") |
| 75 return &miloerror.Error{ |
| 76 Code: http.StatusInternalServerError, |
| 77 } |
| 78 } |
| 79 |
| 80 s.logDogClient = &http.Client{ |
| 81 Transport: a, |
| 82 } |
| 83 return nil |
| 84 } |
| 85 |
| 86 // GetTemplateName implements settings.ThemedHandler. |
| 87 func (s *AnnotationStream) GetTemplateName(t settings.Theme) string { |
| 88 return "build.html" |
| 89 } |
| 90 |
| 91 // Render implements settings.ThemedHandler. |
| 92 func (s *AnnotationStream) Render(c context.Context, req *http.Request, p httpro
uter.Params) (*templates.Args, error) { |
| 93 if err := s.init(c); err != nil { |
| 94 return nil, err |
| 95 } |
| 96 |
| 97 as := annotationStreamRequest{ |
| 98 AnnotationStream: s, |
| 99 |
| 100 project: config.ProjectName(p.ByName("project")), |
| 101 path: types.StreamPath(strings.Trim(p.ByName("path"), "/")), |
| 102 host: req.FormValue("host"), |
| 103 } |
| 104 if err := as.normalize(); err != nil { |
| 105 return nil, err |
| 106 } |
| 107 |
| 108 // Load the Milo annotation protobuf from the annotation stream. |
| 109 if err := as.load(c); err != nil { |
| 110 return nil, err |
| 111 } |
| 112 |
| 113 // Convert the Milo Annotation protobuf to Milo objects. |
| 114 return &templates.Args{ |
| 115 "Build": as.toMiloBuild(c), |
| 116 }, nil |
| 117 } |
| 118 |
| 119 type annotationStreamRequest struct { |
| 120 *AnnotationStream |
| 121 |
| 122 // host is the name of the LogDog host. |
| 123 host string |
| 124 |
| 125 project config.ProjectName |
| 126 path types.StreamPath |
| 127 |
| 128 // item is the unmarshalled annotation stream Step and associated data. |
| 129 item internal.Item |
| 130 } |
| 131 |
| 132 func (as *annotationStreamRequest) normalize() error { |
| 133 if err := as.project.Validate(); err != nil { |
| 134 return &miloerror.Error{ |
| 135 Message: "Invalid project name", |
| 136 Code: http.StatusBadRequest, |
| 137 } |
| 138 } |
| 139 |
| 140 if err := as.path.Validate(); err != nil { |
| 141 return &miloerror.Error{ |
| 142 Message: fmt.Sprintf("Invalid log stream path %q: %s", a
s.path, err), |
| 143 Code: http.StatusBadRequest, |
| 144 } |
| 145 } |
| 146 |
| 147 // Get the host. We normalize it to lowercase and trim spaces since we u
se |
| 148 // it as a memcache key. |
| 149 as.host = strings.ToLower(strings.TrimSpace(as.host)) |
| 150 if as.host == "" { |
| 151 as.host = defaultLogDogHost |
| 152 } |
| 153 if strings.IndexRune(as.host, '/') >= 0 { |
| 154 return errors.New("invalid host name") |
| 155 } |
| 156 |
| 157 return nil |
| 158 } |
| 159 |
| 160 func (as *annotationStreamRequest) memcacheKey() string { |
| 161 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path) |
| 162 } |
| 163 |
| 164 func (as *annotationStreamRequest) load(c context.Context) error { |
| 165 // Load from memcache, if possible. If an error occurs, we will proceed
as if |
| 166 // no cache item was available. |
| 167 mcKey := as.memcacheKey() |
| 168 mcItem, err := memcache.Get(c).Get(mcKey) |
| 169 switch err { |
| 170 case nil: |
| 171 if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil
{ |
| 172 return nil |
| 173 } |
| 174 |
| 175 // We could not unmarshal the cached value. Try and delete it fr
om |
| 176 // memcache, since it's invalid. |
| 177 log.Fields{ |
| 178 log.ErrorKey: err, |
| 179 "memcacheKey": mcKey, |
| 180 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") |
| 181 if err := memcache.Get(c).Delete(mcKey); err != nil { |
| 182 log.WithError(err).Warningf(c, "Failed to delete invalid
annotation protobuf memcache entry.") |
| 183 } |
| 184 |
| 185 case memcache.ErrCacheMiss: |
| 186 break |
| 187 |
| 188 default: |
| 189 log.Fields{ |
| 190 log.ErrorKey: err, |
| 191 "memcacheKey": mcKey, |
| 192 }.Errorf(c, "Failed to load annotation protobuf memcache item.") |
| 193 } |
| 194 |
| 195 // Load from LogDog directly. |
| 196 client := logdog.NewLogsPRPCClient(&prpc.Client{ |
| 197 C: as.logDogClient, |
| 198 Host: as.host, |
| 199 }) |
| 200 resp, err := client.Tail(c, &logdog.TailRequest{ |
| 201 Project: string(as.project), |
| 202 Path: string(as.path), |
| 203 State: true, |
| 204 }) |
| 205 switch code := grpcutil.Code(err); code { |
| 206 case codes.OK: |
| 207 break |
| 208 |
| 209 case codes.NotFound: |
| 210 return &miloerror.Error{ |
| 211 Message: "Stream not found", |
| 212 Code: http.StatusNotFound, |
| 213 } |
| 214 |
| 215 default: |
| 216 // TODO: Once we switch to delegation tokens and are making the
request on |
| 217 // behalf of a user rather than the Milo service, handle Permiss
ionDenied. |
| 218 log.Fields{ |
| 219 log.ErrorKey: err, |
| 220 "code": code, |
| 221 }.Errorf(c, "Failed to load LogDog annotation stream.") |
| 222 return &miloerror.Error{ |
| 223 Message: "Failed to load stream", |
| 224 Code: http.StatusInternalServerError, |
| 225 } |
| 226 } |
| 227 |
| 228 // Make sure that this is an annotation stream. |
| 229 switch { |
| 230 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations: |
| 231 return &miloerror.Error{ |
| 232 Message: "Requested stream is not a Milo annotation prot
obuf", |
| 233 Code: http.StatusBadRequest, |
| 234 } |
| 235 |
| 236 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM: |
| 237 return &miloerror.Error{ |
| 238 Message: "Requested stream is not a datagram stream", |
| 239 Code: http.StatusBadRequest, |
| 240 } |
| 241 |
| 242 case len(resp.Logs) == 0: |
| 243 // No annotation stream data, so render a minimal page. |
| 244 return nil |
| 245 } |
| 246 |
| 247 // Get the last log entry in the stream. In reality, this will be index
0, |
| 248 // since the "Tail" call should only return one log entry. |
| 249 latestStream := resp.Logs[len(resp.Logs)-1] |
| 250 dg := latestStream.GetDatagram() |
| 251 switch { |
| 252 case dg == nil: |
| 253 return &miloerror.Error{ |
| 254 Message: "Datagram stream does not have datagram data", |
| 255 Code: http.StatusInternalServerError, |
| 256 } |
| 257 |
| 258 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last): |
| 259 // LogDog splits large datagrams into consecutive fragments. If
the |
| 260 // annotation state is fragmented, a reconstruction algorithm wi
ll have to |
| 261 // be employed here to build the full datagram before processing
. |
| 262 // |
| 263 // At the moment, no annotation streams are expected to be anywh
ere close to |
| 264 // this large, so we're going to handle this case by erroring. A |
| 265 // reconstruction algorithm would look like: |
| 266 // 1) "Tail" to get the latest datagram, identify it as partial. |
| 267 // 1a) Perform a bounds check on the total datagram size to ensu
re that it |
| 268 // can be safely reconstructed. |
| 269 // 2) Determine if it's the last partial index. If not, then the
latest |
| 270 // datagram is incomplete. Determine our initial datagram's
stream index |
| 271 // the by subtracting the partial index from this message's
stream index. |
| 272 // 2a) If this datagram index is "0", the first datagram in the
stream is |
| 273 // partial and all of the data isn't here, so treat this as
"no data". |
| 274 // 2b) Otherwise, goto (1), using "Get" request on the datagram
index minus |
| 275 // one to get the previous datagram. |
| 276 // 3) Issue a "Get" request for our initial datagram index throu
gh the index |
| 277 // preceding ours. |
| 278 // 4) Reassemble the binary data from the full set of datagrams. |
| 279 return &miloerror.Error{ |
| 280 Message: "Partial datagram streams are not supported yet
", |
| 281 Code: http.StatusNotImplemented, |
| 282 } |
| 283 } |
| 284 |
| 285 // Attempt to decode the Step protobuf. |
| 286 var step miloProto.Step |
| 287 if err := proto.Unmarshal(dg.Data, &step); err != nil { |
| 288 return &miloerror.Error{ |
| 289 Message: "Failed to unmarshal annotation protobuf", |
| 290 Code: http.StatusInternalServerError, |
| 291 } |
| 292 } |
| 293 |
| 294 var latestEndedTime time.Time |
| 295 for _, sub := range step.Substep { |
| 296 switch t := sub.Substep.(type) { |
| 297 case *miloProto.Step_Substep_AnnotationStream: |
| 298 // TODO(hinoka,dnj): Implement recursive / embedded subs
tream fetching if |
| 299 // specified. |
| 300 log.Warningf(c, "Annotation stream links LogDog substrea
m [%+v], not supported!", t.AnnotationStream) |
| 301 |
| 302 case *miloProto.Step_Substep_Step: |
| 303 endedTime := t.Step.Ended.Time() |
| 304 if t.Step.Ended != nil && endedTime.After(latestEndedTim
e) { |
| 305 latestEndedTime = endedTime |
| 306 } |
| 307 } |
| 308 } |
| 309 if latestEndedTime.IsZero() { |
| 310 // No substep had an ended time :( |
| 311 latestEndedTime = step.Started.Time() |
| 312 } |
| 313 |
| 314 // Build our Item. |
| 315 as.item = internal.Item{ |
| 316 Step: &step, |
| 317 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI
ndex == uint64(resp.State.TerminalIndex)), |
| 318 } |
| 319 |
| 320 // Annotee is apparently not putting an ended time on some annotation pr
otos. |
| 321 // This hack will ensure that a finished build will always have an ended
time. |
| 322 if as.item.Finished && as.item.Step.Ended == nil { |
| 323 as.item.Step.Ended = google.NewTimestamp(latestEndedTime) |
| 324 } |
| 325 |
| 326 // Marshal and cache the item. If this is the final protobuf in the stre
am, |
| 327 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet
ime. |
| 328 // |
| 329 // If this fails, it is non-fatal. |
| 330 mcData, err := proto.Marshal(&as.item) |
| 331 if err == nil { |
| 332 mcItem = memcache.Get(c).NewItem(mcKey) |
| 333 if !as.item.Finished { |
| 334 mcItem.SetExpiration(intermediateCacheLifetime) |
| 335 } |
| 336 mcItem.SetValue(mcData) |
| 337 if err := memcache.Get(c).Set(mcItem); err != nil { |
| 338 log.WithError(err).Warningf(c, "Failed to cache annotati
on protobuf Item.") |
| 339 } |
| 340 } else { |
| 341 log.WithError(err).Warningf(c, "Failed to marshal annotation pro
tobuf Item.") |
| 342 } |
| 343 |
| 344 return nil |
| 345 } |
| 346 |
| 347 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil
d { |
| 348 prefix, name := as.path.Split() |
| 349 |
| 350 // Prepare a Streams object with only one stream. |
| 351 streams := Streams{ |
| 352 MainStream: &Stream{ |
| 353 Server: as.host, |
| 354 Prefix: string(prefix), |
| 355 Path: string(name), |
| 356 IsDatagram: true, |
| 357 Data: as.item.Step, |
| 358 Closed: as.item.Finished, |
| 359 }, |
| 360 } |
| 361 |
| 362 var ( |
| 363 build resp.MiloBuild |
| 364 ub = logDogURLBuilder{ |
| 365 project: as.project, |
| 366 host: as.host, |
| 367 prefix: prefix, |
| 368 } |
| 369 ) |
| 370 AddLogDogToBuild(c, &ub, &streams, &build) |
| 371 |
| 372 // If we're still building, the duration is the difference between start
time |
| 373 // and now. |
| 374 return &build |
| 375 } |
| 376 |
| 377 type logDogURLBuilder struct { |
| 378 host string |
| 379 prefix types.StreamName |
| 380 project config.ProjectName |
| 381 } |
| 382 |
| 383 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { |
| 384 switch t := l.Value.(type) { |
| 385 case *miloProto.Link_LogdogStream: |
| 386 ls := t.LogdogStream |
| 387 |
| 388 server := ls.Server |
| 389 if server == "" { |
| 390 server = b.host |
| 391 } |
| 392 |
| 393 prefix := types.StreamName(ls.Prefix) |
| 394 if prefix == "" { |
| 395 prefix = b.prefix |
| 396 } |
| 397 |
| 398 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream
Name(ls.Name))) |
| 399 u := url.URL{ |
| 400 Scheme: "https", |
| 401 Host: server, |
| 402 Path: "v/", |
| 403 RawQuery: url.Values{ |
| 404 "s": []string{string(path)}, |
| 405 }.Encode(), |
| 406 } |
| 407 |
| 408 link := resp.Link{ |
| 409 Label: l.Label, |
| 410 URL: u.String(), |
| 411 } |
| 412 if link.Label == "" { |
| 413 link.Label = ls.Name |
| 414 } |
| 415 return &link |
| 416 |
| 417 case *miloProto.Link_Url: |
| 418 link := resp.Link{ |
| 419 Label: l.Label, |
| 420 URL: t.Url, |
| 421 } |
| 422 if link.Label == "" { |
| 423 link.Label = "unnamed" |
| 424 } |
| 425 return &link |
| 426 |
| 427 default: |
| 428 // Don't know how to render. |
| 429 return nil |
| 430 } |
| 431 } |
| OLD | NEW |