Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
|
Ryan Tseng
2016/07/29 18:16:38
rename this build.go, to match up with the other f
dnj
2016/07/29 19:57:34
Done.
| |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package logdog | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "net/http" | |
| 10 "net/url" | |
| 11 "strings" | |
| 12 "time" | |
| 13 | |
| 14 "github.com/luci/luci-go/appengine/cmd/milo/miloerror" | |
| 15 "github.com/luci/luci-go/appengine/cmd/milo/resp" | |
| 16 "github.com/luci/luci-go/appengine/cmd/milo/settings" | |
| 17 authClient "github.com/luci/luci-go/appengine/gaeauth/client" | |
| 18 "github.com/luci/luci-go/common/config" | |
| 19 "github.com/luci/luci-go/common/grpcutil" | |
| 20 log "github.com/luci/luci-go/common/logging" | |
| 21 "github.com/luci/luci-go/common/proto/google" | |
| 22 miloProto "github.com/luci/luci-go/common/proto/milo" | |
| 23 "github.com/luci/luci-go/common/prpc" | |
| 24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | |
| 25 "github.com/luci/luci-go/logdog/api/logpb" | |
| 26 "github.com/luci/luci-go/logdog/common/types" | |
| 27 "github.com/luci/luci-go/server/templates" | |
| 28 | |
| 29 "github.com/golang/protobuf/proto" | |
| 30 "github.com/julienschmidt/httprouter" | |
| 31 "github.com/luci/gae/service/memcache" | |
| 32 "golang.org/x/net/context" | |
| 33 "google.golang.org/grpc/codes" | |
| 34 ) | |
| 35 | |
| 36 const ( | |
| 37 // intermediateCacheLifetime is the amount of time to cache intermediate (non- | |
| 38 // terminal) annotation streams. Terminal annotation streams are cached | |
| 39 // indefinitely. | |
| 40 intermediateCacheLifetime = 10 * time.Second | |
|
nodir
2016/07/29 18:42:06
this is a bit of regression because with swarming
dnj
2016/07/29 19:57:35
I think the caching is a good idea. I don't see 10
| |
| 41 | |
| 42 // defaultLogDogHost is the default LogDog host, if one isn't specified via | |
| 43 // query string. | |
| 44 defaultLogDogHost = "luci-logdog.appspot.com" | |
| 45 ) | |
| 46 | |
| 47 // AnnotationStream is a ThemedHandler that renders a LogDog Milo annotation | |
| 48 // protobuf stream. | |
| 49 // | |
| 50 // The protobuf stream is fetched live from LogDog and cached locally, either | |
| 51 // temporarily (if incomplete) or indefinitely (if complete). | |
| 52 type AnnotationStream struct{} | |
|
Ryan Tseng
2016/07/29 18:16:38
Move AnnotationStream and it's methods to html.go.
dnj
2016/07/29 19:57:34
"AnnotationStream" is explicitly a build.html hand
Ryan Tseng
2016/07/29 20:34:17
In other words, all "ThemedHandler" implementation
dnj
2016/07/29 21:23:37
Done.
| |
| 53 | |
| 54 // GetTemplateName implements settings.ThemedHandler. | |
| 55 func (s *AnnotationStream) GetTemplateName(t settings.Theme) string { | |
| 56 return "build.html" | |
| 57 } | |
| 58 | |
| 59 // Render implements settings.ThemedHandler. | |
| 60 func (s *AnnotationStream) Render(c context.Context, req *http.Request, p httpro uter.Params) (*templates.Args, error) { | |
| 61 as := annotationStreamRequest{ | |
| 62 project: config.ProjectName(p.ByName("project")), | |
| 63 path: types.StreamPath(strings.Trim(p.ByName("path"), "/")), | |
| 64 host: req.FormValue("host"), | |
|
nodir
2016/07/29 18:42:06
buildbucket and swarming handlers use "server" nam
dnj
2016/07/29 19:57:35
Lots of other packages, including pRPC, use "host"
Ryan Tseng
2016/07/29 20:34:17
I agree with nodir, we should keep it consistent w
dnj
2016/07/29 21:23:37
Discussed, using "host" everywhere.
| |
| 65 } | |
| 66 if err := as.normalize(); err != nil { | |
| 67 return nil, err | |
| 68 } | |
| 69 | |
| 70 // Load the Milo annotation protobuf from the annotation stream. | |
| 71 if err := as.load(c); err != nil { | |
| 72 return nil, err | |
| 73 } | |
| 74 | |
| 75 // Convert the Milo Annotation protobuf to Milo objects. | |
| 76 return &templates.Args{ | |
| 77 "Build": as.toMiloBuild(c), | |
| 78 }, nil | |
| 79 } | |
| 80 | |
| 81 type annotationStreamRequest struct { | |
| 82 // host is the name of the LogDog host. | |
| 83 host string | |
| 84 // project is the project. | |
|
nodir
2016/07/29 18:42:06
this is not useful :)
dnj
2016/07/29 19:57:35
Done.
| |
| 85 project config.ProjectName | |
| 86 // path is the stream path. | |
| 87 path types.StreamPath | |
| 88 | |
| 89 // item is the unmarshalled annotation stream Step and associated data. | |
| 90 item Item | |
| 91 } | |
| 92 | |
| 93 func (as *annotationStreamRequest) normalize() error { | |
| 94 if err := as.project.Validate(); err != nil { | |
| 95 return &miloerror.Error{ | |
| 96 Message: "Invalid project name", | |
| 97 Code: http.StatusBadRequest, | |
| 98 } | |
| 99 } | |
| 100 | |
| 101 if err := as.path.Validate(); err != nil { | |
| 102 return &miloerror.Error{ | |
| 103 Message: fmt.Sprintf("Invalid log stream path %q: %s", a s.path, err), | |
| 104 Code: http.StatusBadRequest, | |
| 105 } | |
| 106 } | |
| 107 | |
| 108 // Get the host. We normalize it to lowercase and trim spaces since we u se | |
| 109 // it as a memcache key. | |
| 110 if as.host == "" { | |
|
nodir
2016/07/29 18:42:06
do this after trimming, so as.host = " " is conver
dnj
2016/07/29 19:57:35
Done.
| |
| 111 as.host = defaultLogDogHost | |
| 112 } | |
| 113 as.host = strings.ToLower(strings.TrimSpace(as.host)) | |
|
nodir
2016/07/29 18:42:06
please verify that it does not have slashes, so ou
dnj
2016/07/29 19:57:35
Done.
| |
| 114 | |
| 115 return nil | |
| 116 } | |
| 117 | |
| 118 func (as *annotationStreamRequest) memcacheKey() string { | |
| 119 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path) | |
| 120 } | |
| 121 | |
| 122 func (as *annotationStreamRequest) load(c context.Context) error { | |
| 123 // Load from memcache, if possible. If an error occurs, we will proceed as if | |
| 124 // no cache item was available. | |
| 125 mcKey := as.memcacheKey() | |
| 126 mcItem, err := memcache.Get(c).Get(mcKey) | |
| 127 switch err { | |
| 128 case nil: | |
| 129 if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil { | |
| 130 return nil | |
| 131 } | |
| 132 | |
| 133 // We could not unmarshal the cached value. Try and delete it fr om | |
| 134 // memcache, since it's invalid. | |
| 135 log.Fields{ | |
| 136 log.ErrorKey: err, | |
| 137 "memcacheKey": mcKey, | |
| 138 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") | |
| 139 if err := memcache.Get(c).Delete(mcKey); err != nil { | |
| 140 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.") | |
| 141 } | |
| 142 | |
| 143 case memcache.ErrCacheMiss: | |
| 144 break | |
| 145 | |
| 146 default: | |
| 147 log.Fields{ | |
| 148 log.ErrorKey: err, | |
| 149 "memcacheKey": mcKey, | |
| 150 }.Warningf(c, "Failed to load annotation protobuf memcache item. ") | |
|
nodir
2016/07/29 18:42:06
i think it is ok to make it Errorf since it may ha
dnj
2016/07/29 19:57:34
Done.
| |
| 151 } | |
| 152 | |
| 153 // Load from LogDog directly. | |
| 154 a, err := authClient.Transport(c, nil, nil) | |
| 155 if err != nil { | |
| 156 log.WithError(err).Errorf(c, "Failed to get transport for LogDog server.") | |
| 157 return &miloerror.Error{ | |
| 158 Code: http.StatusInternalServerError, | |
| 159 } | |
| 160 } | |
| 161 | |
| 162 client := logdog.NewLogsPRPCClient(&prpc.Client{ | |
| 163 C: &http.Client{ | |
|
nodir
2016/07/29 18:42:06
http.Client reuses TCP connections, but if we crea
dnj
2016/07/29 19:57:34
Done.
| |
| 164 Transport: a, | |
| 165 }, | |
| 166 Host: as.host, | |
| 167 }) | |
| 168 resp, err := client.Tail(c, &logdog.TailRequest{ | |
| 169 Project: string(as.project), | |
| 170 Path: string(as.path), | |
| 171 State: true, | |
| 172 }) | |
| 173 if err != nil { | |
|
nodir
2016/07/29 18:42:06
nit: consider `case codes.OK:` as an alternative t
dnj
2016/07/29 19:57:35
Done.
| |
| 174 switch code := grpcutil.Code(err); code { | |
| 175 case codes.NotFound: | |
| 176 return &miloerror.Error{ | |
| 177 Message: "Stream not found", | |
| 178 Code: http.StatusNotFound, | |
| 179 } | |
| 180 | |
|
nodir
2016/07/29 18:42:06
return 403 in case of check codes.PermissionDenied
dnj
2016/07/29 19:57:35
Since we're authenticating as Milo rather than the
nodir
2016/07/29 23:00:31
until milo uses delegation tokens, it does not sup
| |
| 181 default: | |
| 182 log.Fields{ | |
| 183 log.ErrorKey: err, | |
| 184 "code": code, | |
| 185 }.Errorf(c, "Failed to load LogDog annotation stream.") | |
| 186 return &miloerror.Error{ | |
| 187 Message: "Failed to load stream", | |
| 188 Code: http.StatusInternalServerError, | |
| 189 } | |
| 190 } | |
| 191 } | |
| 192 | |
| 193 // Make sure that this is an annotation stream! | |
| 194 switch { | |
| 195 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations: | |
| 196 return &miloerror.Error{ | |
| 197 Message: "Requested stream is not a Milo annotation prot obuf", | |
| 198 Code: http.StatusBadRequest, | |
| 199 } | |
| 200 | |
| 201 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM: | |
| 202 return &miloerror.Error{ | |
| 203 Message: "Requested stream is not a datagram stream", | |
| 204 Code: http.StatusBadRequest, | |
| 205 } | |
| 206 | |
| 207 case len(resp.Logs) == 0: | |
| 208 return &miloerror.Error{ | |
| 209 Message: "Annotation stream data is not yet available", | |
| 210 Code: http.StatusInternalServerError, | |
|
nodir
2016/07/29 18:42:06
it is not an internal error. I think we should `re
dnj
2016/07/29 19:57:34
I'm not sure a blank page is better than "hey data
| |
| 211 } | |
| 212 } | |
| 213 | |
| 214 // Get the last log entry in the stream. In reality, this will be index 0, | |
| 215 // since the "Tail" call should only return one log entry. | |
| 216 latestStream := resp.Logs[len(resp.Logs)-1] | |
| 217 dg := latestStream.GetDatagram() | |
| 218 switch { | |
| 219 case dg == nil: | |
| 220 return &miloerror.Error{ | |
| 221 Message: "Datagram stream does not have datagram data", | |
| 222 Code: http.StatusInternalServerError, | |
| 223 } | |
| 224 | |
| 225 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last): | |
| 226 return &miloerror.Error{ | |
| 227 Message: "Partial datagram streams are not supported yet ", | |
| 228 Code: http.StatusNotImplemented, | |
|
nodir
2016/07/29 18:42:06
how often this is going to happen in practice?
if
dnj
2016/07/29 19:57:34
It's part of the protocol, and could happen if a g
| |
| 229 } | |
| 230 } | |
| 231 | |
| 232 // Attempt to decode the Step protobuf. | |
| 233 var step miloProto.Step | |
| 234 if err := proto.Unmarshal(dg.Data, &step); err != nil { | |
| 235 return &miloerror.Error{ | |
| 236 Message: "Failed to unmarshal annotation protobuf", | |
| 237 Code: http.StatusInternalServerError, | |
| 238 } | |
| 239 } | |
| 240 | |
| 241 var latestEndedTime time.Time | |
| 242 for _, sub := range step.Substep { | |
| 243 switch t := sub.Substep.(type) { | |
| 244 case *miloProto.Step_Substep_AnnotationStream: | |
| 245 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if | |
| 246 // specified. | |
| 247 log.Warningf(c, "Annotation stream links LogDog substrea m [%+v], not supported!", t.AnnotationStream) | |
| 248 | |
| 249 case *miloProto.Step_Substep_Step: | |
| 250 endedTime := t.Step.Ended.Time() | |
| 251 if t.Step.Ended != nil && (latestEndedTime.IsZero() || e ndedTime.After(latestEndedTime)) { | |
|
nodir
2016/07/29 18:42:06
i think this condition should be just
if endedTim
dnj
2016/07/29 19:57:34
Done.
| |
| 252 latestEndedTime = endedTime | |
| 253 } | |
| 254 } | |
| 255 } | |
| 256 if latestEndedTime.IsZero() { | |
| 257 // No substep had an ended time :( | |
| 258 latestEndedTime = step.Started.Time() | |
| 259 } | |
| 260 | |
| 261 // Build our Item. | |
| 262 as.item = Item{ | |
| 263 Step: &step, | |
| 264 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI ndex == uint64(resp.State.TerminalIndex)), | |
| 265 } | |
| 266 | |
| 267 // Annotee is apparently not putting an ended time on some annotation pr otos. | |
| 268 // This hack will ensure that a finished build will always have an ended time. | |
| 269 if as.item.Finished && as.item.Step.Ended == nil { | |
| 270 as.item.Step.Ended = google.NewTimestamp(latestEndedTime) | |
| 271 } | |
| 272 | |
| 273 // Marshal and cache the item. If this is the final protobuf in the stre am, | |
| 274 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime. | |
| 275 // | |
| 276 // If this fails, it is non-fatal. | |
| 277 mcData, err := proto.Marshal(&as.item) | |
| 278 if err == nil { | |
| 279 mcItem = memcache.Get(c).NewItem(mcKey) | |
| 280 if !as.item.Finished { | |
| 281 mcItem.SetExpiration(intermediateCacheLifetime) | |
| 282 } | |
| 283 mcItem.SetValue(mcData) | |
| 284 if err := memcache.Get(c).Set(mcItem); err != nil { | |
| 285 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf Item.") | |
| 286 } | |
| 287 } else { | |
| 288 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf Item.") | |
| 289 } | |
| 290 | |
| 291 return nil | |
| 292 } | |
| 293 | |
| 294 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil d { | |
| 295 prefix, name := as.path.Split() | |
| 296 | |
| 297 // Prepare a Streams object with only one stream. | |
| 298 streams := Streams{ | |
| 299 MainStream: &Stream{ | |
| 300 Server: as.host, | |
| 301 Prefix: string(prefix), | |
| 302 Path: string(name), | |
| 303 IsDatagram: true, | |
| 304 Data: as.item.Step, | |
| 305 Closed: as.item.Finished, | |
| 306 }, | |
| 307 } | |
| 308 | |
| 309 var ( | |
| 310 build resp.MiloBuild | |
| 311 ub = logDogURLBuilder{ | |
| 312 project: as.project, | |
| 313 host: as.host, | |
| 314 prefix: prefix, | |
| 315 } | |
| 316 ) | |
| 317 AddLogDogToBuild(c, &ub, &streams, &build) | |
| 318 | |
| 319 // If we're still building, the duration is the difference between start time | |
| 320 // and now. | |
| 321 return &build | |
| 322 } | |
| 323 | |
| 324 type logDogURLBuilder struct { | |
| 325 host string | |
| 326 prefix types.StreamName | |
| 327 project config.ProjectName | |
| 328 } | |
| 329 | |
| 330 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { | |
| 331 switch t := l.Value.(type) { | |
| 332 case *miloProto.Link_LogdogStream: | |
| 333 ls := t.LogdogStream | |
| 334 | |
| 335 server := ls.Server | |
| 336 if server == "" { | |
| 337 server = b.host | |
| 338 } | |
| 339 | |
| 340 prefix := types.StreamName(ls.Prefix) | |
| 341 if prefix == "" { | |
| 342 prefix = b.prefix | |
| 343 } | |
| 344 | |
| 345 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream Name(ls.Name))) | |
| 346 u := url.URL{ | |
| 347 Scheme: "https", | |
| 348 Host: server, | |
| 349 Path: "v/", | |
| 350 RawQuery: url.Values{ | |
| 351 "s": []string{string(path)}, | |
| 352 }.Encode(), | |
| 353 } | |
| 354 | |
| 355 link := resp.Link{ | |
| 356 Label: l.Label, | |
| 357 URL: u.String(), | |
| 358 } | |
| 359 if link.Label == "" { | |
| 360 link.Label = ls.Name | |
| 361 } | |
| 362 return &link | |
| 363 | |
| 364 case *miloProto.Link_Url: | |
| 365 link := resp.Link{ | |
| 366 Label: l.Label, | |
| 367 URL: t.Url, | |
| 368 } | |
| 369 if link.Label == "" { | |
| 370 link.Label = "unnamed" | |
| 371 } | |
| 372 return &link | |
| 373 | |
| 374 default: | |
| 375 // Don't know how to render. | |
| 376 return nil | |
| 377 } | |
| 378 } | |
| OLD | NEW |