Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | |
| 3 // that can be found in the LICENSE file. | |
| 4 | |
| 5 package logdog | |
| 6 | |
| 7 import ( | |
| 8 "errors" | |
| 9 "fmt" | |
| 10 "net/http" | |
| 11 "net/url" | |
| 12 "strings" | |
| 13 "time" | |
| 14 | |
| 15 "github.com/luci/luci-go/appengine/cmd/milo/logdog/internal" | |
| 16 "github.com/luci/luci-go/appengine/cmd/milo/miloerror" | |
| 17 "github.com/luci/luci-go/appengine/cmd/milo/resp" | |
| 18 "github.com/luci/luci-go/common/config" | |
| 19 "github.com/luci/luci-go/common/grpcutil" | |
| 20 log "github.com/luci/luci-go/common/logging" | |
| 21 "github.com/luci/luci-go/common/proto/google" | |
| 22 miloProto "github.com/luci/luci-go/common/proto/milo" | |
| 23 "github.com/luci/luci-go/common/prpc" | |
| 24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | |
| 25 "github.com/luci/luci-go/logdog/api/logpb" | |
| 26 "github.com/luci/luci-go/logdog/common/types" | |
| 27 | |
| 28 "github.com/golang/protobuf/proto" | |
| 29 "github.com/luci/gae/service/memcache" | |
| 30 "golang.org/x/net/context" | |
| 31 "google.golang.org/grpc/codes" | |
| 32 ) | |
| 33 | |
| 34 const ( | |
| 35 // intermediateCacheLifetime is the amount of time to cache intermediate (non- | |
| 36 // terminal) annotation streams. Terminal annotation streams are cached | |
| 37 // indefinitely. | |
| 38 intermediateCacheLifetime = 10 * time.Second | |
| 39 | |
| 40 // defaultLogDogHost is the default LogDog host, if one isn't specified via | |
| 41 // query string. | |
| 42 defaultLogDogHost = "luci-logdog.appspot.com" | |
| 43 ) | |
| 44 | |
| 45 type annotationStreamRequest struct { | |
| 46 *AnnotationStream | |
| 47 | |
| 48 // host is the name of the LogDog host. | |
| 49 host string | |
| 50 | |
| 51 project config.ProjectName | |
| 52 path types.StreamPath | |
| 53 | |
| 54 // item is the unmarshalled annotation stream Step and associated data. | |
| 55 item internal.Item | |
| 56 } | |
| 57 | |
| 58 func (as *annotationStreamRequest) normalize() error { | |
| 59 if err := as.project.Validate(); err != nil { | |
| 60 return &miloerror.Error{ | |
| 61 Message: "Invalid project name", | |
| 62 Code: http.StatusBadRequest, | |
| 63 } | |
| 64 } | |
| 65 | |
| 66 if err := as.path.Validate(); err != nil { | |
| 67 return &miloerror.Error{ | |
| 68 Message: fmt.Sprintf("Invalid log stream path %q: %s", a s.path, err), | |
| 69 Code: http.StatusBadRequest, | |
| 70 } | |
| 71 } | |
| 72 | |
| 73 // Get the host. We normalize it to lowercase and trim spaces since we u se | |
| 74 // it as a memcache key. | |
| 75 as.host = strings.ToLower(strings.TrimSpace(as.host)) | |
| 76 if as.host == "" { | |
| 77 as.host = defaultLogDogHost | |
| 78 } | |
| 79 if strings.IndexRune(as.host, '/') >= 0 { | |
|
nodir
2016/07/29 23:00:32
nit: use string.ContainsRune
dnj
2016/07/29 23:24:43
Done.
| |
| 80 return errors.New("invalid host name") | |
| 81 } | |
| 82 | |
| 83 return nil | |
| 84 } | |
| 85 | |
| 86 func (as *annotationStreamRequest) memcacheKey() string { | |
| 87 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path) | |
| 88 } | |
| 89 | |
| 90 func (as *annotationStreamRequest) load(c context.Context) error { | |
| 91 // Load from memcache, if possible. If an error occurs, we will proceed as if | |
| 92 // no cache item was available. | |
| 93 mcKey := as.memcacheKey() | |
| 94 mcItem, err := memcache.Get(c).Get(mcKey) | |
| 95 switch err { | |
| 96 case nil: | |
| 97 if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil { | |
| 98 return nil | |
| 99 } | |
| 100 | |
| 101 // We could not unmarshal the cached value. Try and delete it fr om | |
| 102 // memcache, since it's invalid. | |
| 103 log.Fields{ | |
| 104 log.ErrorKey: err, | |
| 105 "memcacheKey": mcKey, | |
| 106 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") | |
| 107 if err := memcache.Get(c).Delete(mcKey); err != nil { | |
| 108 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.") | |
| 109 } | |
| 110 | |
| 111 case memcache.ErrCacheMiss: | |
| 112 break | |
| 113 | |
| 114 default: | |
| 115 log.Fields{ | |
| 116 log.ErrorKey: err, | |
| 117 "memcacheKey": mcKey, | |
| 118 }.Errorf(c, "Failed to load annotation protobuf memcache item.") | |
| 119 } | |
| 120 | |
| 121 // Load from LogDog directly. | |
| 122 client := logdog.NewLogsPRPCClient(&prpc.Client{ | |
| 123 C: as.logDogClient, | |
| 124 Host: as.host, | |
| 125 }) | |
| 126 resp, err := client.Tail(c, &logdog.TailRequest{ | |
|
nodir
2016/07/29 23:00:31
please add logging: would be nice to have a log li
dnj
2016/07/29 23:24:43
Done.
| |
| 127 Project: string(as.project), | |
| 128 Path: string(as.path), | |
| 129 State: true, | |
| 130 }) | |
| 131 switch code := grpcutil.Code(err); code { | |
| 132 case codes.OK: | |
| 133 break | |
| 134 | |
| 135 case codes.NotFound: | |
| 136 return &miloerror.Error{ | |
| 137 Message: "Stream not found", | |
| 138 Code: http.StatusNotFound, | |
| 139 } | |
| 140 | |
| 141 default: | |
| 142 // TODO: Once we switch to delegation tokens and are making the request on | |
| 143 // behalf of a user rather than the Milo service, handle Permiss ionDenied. | |
| 144 log.Fields{ | |
| 145 log.ErrorKey: err, | |
| 146 "code": code, | |
| 147 }.Errorf(c, "Failed to load LogDog annotation stream.") | |
| 148 return &miloerror.Error{ | |
| 149 Message: "Failed to load stream", | |
| 150 Code: http.StatusInternalServerError, | |
| 151 } | |
| 152 } | |
| 153 | |
| 154 // Make sure that this is an annotation stream. | |
| 155 switch { | |
| 156 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations: | |
| 157 return &miloerror.Error{ | |
| 158 Message: "Requested stream is not a Milo annotation prot obuf", | |
| 159 Code: http.StatusBadRequest, | |
| 160 } | |
| 161 | |
| 162 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM: | |
| 163 return &miloerror.Error{ | |
| 164 Message: "Requested stream is not a datagram stream", | |
| 165 Code: http.StatusBadRequest, | |
| 166 } | |
| 167 | |
| 168 case len(resp.Logs) == 0: | |
| 169 // No annotation stream data, so render a minimal page. | |
| 170 return nil | |
| 171 } | |
| 172 | |
| 173 // Get the last log entry in the stream. In reality, this will be index 0, | |
| 174 // since the "Tail" call should only return one log entry. | |
| 175 latestStream := resp.Logs[len(resp.Logs)-1] | |
| 176 dg := latestStream.GetDatagram() | |
| 177 switch { | |
| 178 case dg == nil: | |
| 179 return &miloerror.Error{ | |
| 180 Message: "Datagram stream does not have datagram data", | |
| 181 Code: http.StatusInternalServerError, | |
| 182 } | |
| 183 | |
| 184 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last): | |
| 185 // LogDog splits large datagrams into consecutive fragments. If the | |
| 186 // annotation state is fragmented, a reconstruction algorithm wi ll have to | |
| 187 // be employed here to build the full datagram before processing . | |
| 188 // | |
| 189 // At the moment, no annotation streams are expected to be anywh ere close to | |
| 190 // this large, so we're going to handle this case by erroring. A | |
| 191 // reconstruction algorithm would look like: | |
| 192 // 1) "Tail" to get the latest datagram, identify it as partial. | |
| 193 // 1a) Perform a bounds check on the total datagram size to ensu re that it | |
| 194 // can be safely reconstructed. | |
| 195 // 2) Determine if it's the last partial index. If not, then the latest | |
| 196 // datagram is incomplete. Determine our initial datagram's stream index | |
| 197 // the by subtracting the partial index from this message's stream index. | |
| 198 // 2a) If this datagram index is "0", the first datagram in the stream is | |
| 199 // partial and all of the data isn't here, so treat this as "no data". | |
| 200 // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus | |
| 201 // one to get the previous datagram. | |
| 202 // 3) Issue a "Get" request for our initial datagram index throu gh the index | |
| 203 // preceding ours. | |
| 204 // 4) Reassemble the binary data from the full set of datagrams. | |
| 205 return &miloerror.Error{ | |
| 206 Message: "Partial datagram streams are not supported yet ", | |
| 207 Code: http.StatusNotImplemented, | |
| 208 } | |
| 209 } | |
| 210 | |
| 211 // Attempt to decode the Step protobuf. | |
| 212 var step miloProto.Step | |
| 213 if err := proto.Unmarshal(dg.Data, &step); err != nil { | |
| 214 return &miloerror.Error{ | |
| 215 Message: "Failed to unmarshal annotation protobuf", | |
| 216 Code: http.StatusInternalServerError, | |
| 217 } | |
| 218 } | |
| 219 | |
| 220 var latestEndedTime time.Time | |
| 221 for _, sub := range step.Substep { | |
| 222 switch t := sub.Substep.(type) { | |
| 223 case *miloProto.Step_Substep_AnnotationStream: | |
| 224 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if | |
| 225 // specified. | |
| 226 log.Warningf(c, "Annotation stream links LogDog substrea m [%+v], not supported!", t.AnnotationStream) | |
| 227 | |
| 228 case *miloProto.Step_Substep_Step: | |
| 229 endedTime := t.Step.Ended.Time() | |
| 230 if t.Step.Ended != nil && endedTime.After(latestEndedTim e) { | |
| 231 latestEndedTime = endedTime | |
| 232 } | |
| 233 } | |
| 234 } | |
| 235 if latestEndedTime.IsZero() { | |
| 236 // No substep had an ended time :( | |
| 237 latestEndedTime = step.Started.Time() | |
| 238 } | |
| 239 | |
| 240 // Build our Item. | |
| 241 as.item = internal.Item{ | |
| 242 Step: &step, | |
| 243 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI ndex == uint64(resp.State.TerminalIndex)), | |
| 244 } | |
| 245 | |
| 246 // Annotee is apparently not putting an ended time on some annotation pr otos. | |
| 247 // This hack will ensure that a finished build will always have an ended time. | |
| 248 if as.item.Finished && as.item.Step.Ended == nil { | |
| 249 as.item.Step.Ended = google.NewTimestamp(latestEndedTime) | |
| 250 } | |
| 251 | |
| 252 // Marshal and cache the item. If this is the final protobuf in the stre am, | |
| 253 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime. | |
| 254 // | |
| 255 // If this fails, it is non-fatal. | |
| 256 mcData, err := proto.Marshal(&as.item) | |
| 257 if err == nil { | |
| 258 mcItem = memcache.Get(c).NewItem(mcKey) | |
| 259 if !as.item.Finished { | |
| 260 mcItem.SetExpiration(intermediateCacheLifetime) | |
| 261 } | |
| 262 mcItem.SetValue(mcData) | |
| 263 if err := memcache.Get(c).Set(mcItem); err != nil { | |
| 264 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf Item.") | |
| 265 } | |
| 266 } else { | |
| 267 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf Item.") | |
| 268 } | |
| 269 | |
| 270 return nil | |
| 271 } | |
| 272 | |
| 273 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil d { | |
| 274 prefix, name := as.path.Split() | |
| 275 | |
| 276 // Prepare a Streams object with only one stream. | |
| 277 streams := Streams{ | |
| 278 MainStream: &Stream{ | |
| 279 Server: as.host, | |
| 280 Prefix: string(prefix), | |
| 281 Path: string(name), | |
| 282 IsDatagram: true, | |
| 283 Data: as.item.Step, | |
| 284 Closed: as.item.Finished, | |
| 285 }, | |
| 286 } | |
| 287 | |
| 288 var ( | |
| 289 build resp.MiloBuild | |
| 290 ub = logDogURLBuilder{ | |
| 291 project: as.project, | |
| 292 host: as.host, | |
| 293 prefix: prefix, | |
| 294 } | |
| 295 ) | |
| 296 AddLogDogToBuild(c, &ub, &streams, &build) | |
| 297 | |
| 298 // If we're still building, the duration is the difference between start time | |
| 299 // and now. | |
|
nodir
2016/07/29 23:00:31
why this comment is here?
dnj
2016/07/29 23:24:43
Done.
| |
| 300 return &build | |
| 301 } | |
| 302 | |
| 303 type logDogURLBuilder struct { | |
| 304 host string | |
| 305 prefix types.StreamName | |
| 306 project config.ProjectName | |
| 307 } | |
| 308 | |
| 309 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { | |
| 310 switch t := l.Value.(type) { | |
| 311 case *miloProto.Link_LogdogStream: | |
| 312 ls := t.LogdogStream | |
| 313 | |
| 314 server := ls.Server | |
| 315 if server == "" { | |
| 316 server = b.host | |
| 317 } | |
| 318 | |
| 319 prefix := types.StreamName(ls.Prefix) | |
| 320 if prefix == "" { | |
| 321 prefix = b.prefix | |
| 322 } | |
| 323 | |
| 324 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream Name(ls.Name))) | |
| 325 u := url.URL{ | |
| 326 Scheme: "https", | |
| 327 Host: server, | |
| 328 Path: "v/", | |
| 329 RawQuery: url.Values{ | |
| 330 "s": []string{string(path)}, | |
| 331 }.Encode(), | |
| 332 } | |
| 333 | |
| 334 link := resp.Link{ | |
| 335 Label: l.Label, | |
| 336 URL: u.String(), | |
| 337 } | |
| 338 if link.Label == "" { | |
| 339 link.Label = ls.Name | |
| 340 } | |
| 341 return &link | |
| 342 | |
| 343 case *miloProto.Link_Url: | |
| 344 link := resp.Link{ | |
| 345 Label: l.Label, | |
| 346 URL: t.Url, | |
| 347 } | |
| 348 if link.Label == "" { | |
| 349 link.Label = "unnamed" | |
| 350 } | |
| 351 return &link | |
| 352 | |
| 353 default: | |
| 354 // Don't know how to render. | |
| 355 return nil | |
| 356 } | |
| 357 } | |
| OLD | NEW |