| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logdog | 5 package logdog |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | |
| 9 "fmt" | 8 "fmt" |
| 10 "net/http" | 9 "net/http" |
| 11 "net/url" | |
| 12 "strings" | |
| 13 "time" | 10 "time" |
| 14 | 11 |
| 15 log "github.com/luci/luci-go/common/logging" | 12 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/proto/google" | 13 "github.com/luci/luci-go/common/proto/google" |
| 17 miloProto "github.com/luci/luci-go/common/proto/milo" | 14 miloProto "github.com/luci/luci-go/common/proto/milo" |
| 18 "github.com/luci/luci-go/grpc/grpcutil" | 15 "github.com/luci/luci-go/grpc/grpcutil" |
| 19 "github.com/luci/luci-go/logdog/api/logpb" | 16 "github.com/luci/luci-go/logdog/api/logpb" |
| 20 "github.com/luci/luci-go/logdog/client/coordinator" | 17 "github.com/luci/luci-go/logdog/client/coordinator" |
| 21 "github.com/luci/luci-go/logdog/common/types" | 18 "github.com/luci/luci-go/logdog/common/types" |
| 19 "github.com/luci/luci-go/logdog/common/viewer" |
| 22 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 20 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 23 "github.com/luci/luci-go/milo/api/resp" | 21 "github.com/luci/luci-go/milo/api/resp" |
| 24 "github.com/luci/luci-go/milo/appengine/logdog/internal" | 22 "github.com/luci/luci-go/milo/appengine/logdog/internal" |
| 25 "github.com/luci/luci-go/milo/common/miloerror" | 23 "github.com/luci/luci-go/milo/common/miloerror" |
| 26 | 24 |
| 27 "github.com/golang/protobuf/proto" | 25 "github.com/golang/protobuf/proto" |
| 28 mc "github.com/luci/gae/service/memcache" | 26 mc "github.com/luci/gae/service/memcache" |
| 29 "golang.org/x/net/context" | 27 "golang.org/x/net/context" |
| 30 "google.golang.org/grpc/codes" | 28 "google.golang.org/grpc/codes" |
| 31 ) | 29 ) |
| 32 | 30 |
| 33 const ( | 31 const ( |
| 34 // intermediateCacheLifetime is the amount of time to cache intermediate
(non- | 32 // intermediateCacheLifetime is the amount of time to cache intermediate
(non- |
| 35 // terminal) annotation streams. Terminal annotation streams are cached | 33 // terminal) annotation streams. Terminal annotation streams are cached |
| 36 // indefinitely. | 34 // indefinitely. |
| 37 intermediateCacheLifetime = 10 * time.Second | 35 intermediateCacheLifetime = 10 * time.Second |
| 38 | 36 |
| 39 // defaultLogDogHost is the default LogDog host, if one isn't specified
via | 37 // defaultLogDogHost is the default LogDog host, if one isn't specified
via |
| 40 // query string. | 38 // query string. |
| 41 defaultLogDogHost = "luci-logdog.appspot.com" | 39 defaultLogDogHost = "luci-logdog.appspot.com" |
| 42 ) | 40 ) |
| 43 | 41 |
| 44 type annotationStreamRequest struct { | 42 // AnnotationStream represents a LogDog annotation protobuf stream. |
| 45 » *AnnotationStream | 43 type AnnotationStream struct { |
| 46 | 44 » Project cfgtypes.ProjectName |
| 47 » // host is the name of the LogDog host. | 45 » Path types.StreamPath |
| 48 » host string | |
| 49 | |
| 50 » project cfgtypes.ProjectName | |
| 51 » path types.StreamPath | |
| 52 | 46 |
| 53 // logDogClient is the HTTP client to use for LogDog communication. | 47 // logDogClient is the HTTP client to use for LogDog communication. |
| 54 » logDogClient *coordinator.Client | 48 » Client *coordinator.Client |
| 55 | 49 |
| 56 // cs is the unmarshalled annotation stream Step and associated data. | 50 // cs is the unmarshalled annotation stream Step and associated data. |
| 57 cs internal.CachedStep | 51 cs internal.CachedStep |
| 58 } | 52 } |
| 59 | 53 |
| 60 func (as *annotationStreamRequest) normalize() error { | 54 // Normalize validates and normalizes the stream's parameters. |
| 61 » if err := as.project.Validate(); err != nil { | 55 func (as *AnnotationStream) Normalize() error { |
| 62 » » return &miloerror.Error{ | 56 » if err := as.Project.Validate(); err != nil { |
| 63 » » » Message: "Invalid project name", | 57 » » return fmt.Errorf("Invalid project name: %s", as.Project) |
| 64 » » » Code: http.StatusBadRequest, | |
| 65 » » } | |
| 66 } | 58 } |
| 67 | 59 |
| 68 » if err := as.path.Validate(); err != nil { | 60 » if err := as.Path.Validate(); err != nil { |
| 69 » » return &miloerror.Error{ | 61 » » return fmt.Errorf("Invalid log stream path %q: %s", as.Path, err
) |
| 70 » » » Message: fmt.Sprintf("Invalid log stream path %q: %s", a
s.path, err), | |
| 71 » » » Code: http.StatusBadRequest, | |
| 72 » » } | |
| 73 » } | |
| 74 | |
| 75 » // Get the host. We normalize it to lowercase and trim spaces since we u
se | |
| 76 » // it as a memcache key. | |
| 77 » as.host = strings.ToLower(strings.TrimSpace(as.host)) | |
| 78 » if as.host == "" { | |
| 79 » » as.host = defaultLogDogHost | |
| 80 » } | |
| 81 » if strings.ContainsRune(as.host, '/') { | |
| 82 » » return errors.New("invalid host name") | |
| 83 } | 62 } |
| 84 | 63 |
| 85 return nil | 64 return nil |
| 86 } | 65 } |
| 87 | 66 |
| 88 func (as *annotationStreamRequest) memcacheKey() string { | 67 // Load loads the annotation stream from LogDog. |
| 89 » return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path) | 68 // |
| 90 } | 69 // If the stream does not exist, or is invalid, Load will return a Milo error. |
| 70 // Otherwise, it will return the Step that was loaded. |
| 71 // |
| 72 // Load caches the step, so multiple calls to Load will return the same Step |
| 73 // value. |
| 74 func (as *AnnotationStream) Load(c context.Context) (*miloProto.Step, error) { |
| 75 » // Cached? |
| 76 » if as.cs.Step != nil { |
| 77 » » return as.cs.Step, nil |
| 78 » } |
| 91 | 79 |
| 92 func (as *annotationStreamRequest) load(c context.Context) error { | |
| 93 // Load from memcache, if possible. If an error occurs, we will proceed
as if | 80 // Load from memcache, if possible. If an error occurs, we will proceed
as if |
| 94 // no CachedStep was available. | 81 // no CachedStep was available. |
| 95 mcKey := as.memcacheKey() | 82 mcKey := as.memcacheKey() |
| 96 mcItem, err := mc.GetKey(c, mcKey) | 83 mcItem, err := mc.GetKey(c, mcKey) |
| 97 switch err { | 84 switch err { |
| 98 case nil: | 85 case nil: |
| 99 if err := proto.Unmarshal(mcItem.Value(), &as.cs); err == nil { | 86 if err := proto.Unmarshal(mcItem.Value(), &as.cs); err == nil { |
| 100 » » » return nil | 87 » » » return as.cs.Step, nil |
| 101 } | 88 } |
| 102 | 89 |
| 103 // We could not unmarshal the cached value. Try and delete it fr
om | 90 // We could not unmarshal the cached value. Try and delete it fr
om |
| 104 // memcache, since it's invalid. | 91 // memcache, since it's invalid. |
| 105 log.Fields{ | 92 log.Fields{ |
| 106 log.ErrorKey: err, | 93 log.ErrorKey: err, |
| 107 "memcacheKey": mcKey, | 94 "memcacheKey": mcKey, |
| 108 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") | 95 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") |
| 109 if err := mc.Delete(c, mcKey); err != nil { | 96 if err := mc.Delete(c, mcKey); err != nil { |
| 110 log.WithError(err).Warningf(c, "Failed to delete invalid
annotation protobuf memcache entry.") | 97 log.WithError(err).Warningf(c, "Failed to delete invalid
annotation protobuf memcache entry.") |
| 111 } | 98 } |
| 112 | 99 |
| 113 case mc.ErrCacheMiss: | 100 case mc.ErrCacheMiss: |
| 114 break | 101 break |
| 115 | 102 |
| 116 default: | 103 default: |
| 117 log.Fields{ | 104 log.Fields{ |
| 118 log.ErrorKey: err, | 105 log.ErrorKey: err, |
| 119 "memcacheKey": mcKey, | 106 "memcacheKey": mcKey, |
| 120 }.Errorf(c, "Failed to load annotation protobuf memcache cached
step.") | 107 }.Errorf(c, "Failed to load annotation protobuf memcache cached
step.") |
| 121 } | 108 } |
| 122 | 109 |
| 123 // Load from LogDog directly. | 110 // Load from LogDog directly. |
| 124 log.Fields{ | 111 log.Fields{ |
| 125 » » "project": as.project, | 112 » » "host": as.Client.Host, |
| 126 » » "path": as.path, | 113 » » "project": as.Project, |
| 127 » » "host": as.host, | 114 » » "path": as.Path, |
| 128 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") | 115 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") |
| 129 | 116 |
| 130 var ( | 117 var ( |
| 131 state coordinator.LogStream | 118 state coordinator.LogStream |
| 132 » » stream = as.logDogClient.Stream(as.project, as.path) | 119 » » stream = as.Client.Stream(as.Project, as.Path) |
| 133 ) | 120 ) |
| 134 le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com
plete()) | 121 le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com
plete()) |
| 135 switch code := grpcutil.Code(err); code { | 122 switch code := grpcutil.Code(err); code { |
| 136 case codes.OK: | 123 case codes.OK: |
| 137 break | 124 break |
| 138 | 125 |
| 139 case codes.NotFound: | 126 case codes.NotFound: |
| 140 » » return &miloerror.Error{ | 127 » » return nil, &miloerror.Error{ |
| 141 Message: "Stream not found", | 128 Message: "Stream not found", |
| 142 Code: http.StatusNotFound, | 129 Code: http.StatusNotFound, |
| 143 } | 130 } |
| 144 | 131 |
| 145 default: | 132 default: |
| 146 // TODO: Once we switch to delegation tokens and are making the
request on | 133 // TODO: Once we switch to delegation tokens and are making the
request on |
| 147 // behalf of a user rather than the Milo service, handle Permiss
ionDenied. | 134 // behalf of a user rather than the Milo service, handle Permiss
ionDenied. |
| 148 log.Fields{ | 135 log.Fields{ |
| 149 log.ErrorKey: err, | 136 log.ErrorKey: err, |
| 150 "code": code, | 137 "code": code, |
| 151 }.Errorf(c, "Failed to load LogDog annotation stream.") | 138 }.Errorf(c, "Failed to load LogDog annotation stream.") |
| 152 » » return &miloerror.Error{ | 139 » » return nil, &miloerror.Error{ |
| 153 Message: "Failed to load stream", | 140 Message: "Failed to load stream", |
| 154 Code: http.StatusInternalServerError, | 141 Code: http.StatusInternalServerError, |
| 155 } | 142 } |
| 156 } | 143 } |
| 157 | 144 |
| 158 // Make sure that this is an annotation stream. | 145 // Make sure that this is an annotation stream. |
| 159 switch { | 146 switch { |
| 160 case state.Desc.ContentType != miloProto.ContentTypeAnnotations: | 147 case state.Desc.ContentType != miloProto.ContentTypeAnnotations: |
| 161 » » return &miloerror.Error{ | 148 » » return nil, &miloerror.Error{ |
| 162 Message: "Requested stream is not a Milo annotation prot
obuf", | 149 Message: "Requested stream is not a Milo annotation prot
obuf", |
| 163 Code: http.StatusBadRequest, | 150 Code: http.StatusBadRequest, |
| 164 } | 151 } |
| 165 | 152 |
| 166 case state.Desc.StreamType != logpb.StreamType_DATAGRAM: | 153 case state.Desc.StreamType != logpb.StreamType_DATAGRAM: |
| 167 » » return &miloerror.Error{ | 154 » » return nil, &miloerror.Error{ |
| 168 Message: "Requested stream is not a datagram stream", | 155 Message: "Requested stream is not a datagram stream", |
| 169 Code: http.StatusBadRequest, | 156 Code: http.StatusBadRequest, |
| 170 } | 157 } |
| 171 | 158 |
| 172 case le == nil: | 159 case le == nil: |
| 173 // No annotation stream data, so render a minimal page. | 160 // No annotation stream data, so render a minimal page. |
| 174 » » return nil | 161 » » return nil, &miloerror.Error{ |
| 162 » » » Message: "Log stream has no annotation entries", |
| 163 » » » Code: http.StatusNotFound, |
| 164 » » } |
| 175 } | 165 } |
| 176 | 166 |
| 177 // Get the last log entry in the stream. In reality, this will be index
0, | 167 // Get the last log entry in the stream. In reality, this will be index
0, |
| 178 // since the "Tail" call should only return one log entry. | 168 // since the "Tail" call should only return one log entry. |
| 179 // | 169 // |
| 180 // Because we supplied the "Complete" flag to Tail and suceeded, this da
tagram | 170 // Because we supplied the "Complete" flag to Tail and suceeded, this da
tagram |
| 181 // will be complete even if its source datagram(s) are fragments. | 171 // will be complete even if its source datagram(s) are fragments. |
| 182 dg := le.GetDatagram() | 172 dg := le.GetDatagram() |
| 183 if dg == nil { | 173 if dg == nil { |
| 184 » » return &miloerror.Error{ | 174 » » return nil, &miloerror.Error{ |
| 185 Message: "Datagram stream does not have datagram data", | 175 Message: "Datagram stream does not have datagram data", |
| 186 Code: http.StatusInternalServerError, | 176 Code: http.StatusInternalServerError, |
| 187 } | 177 } |
| 188 } | 178 } |
| 189 | 179 |
| 190 // Attempt to decode the Step protobuf. | 180 // Attempt to decode the Step protobuf. |
| 191 var step miloProto.Step | 181 var step miloProto.Step |
| 192 if err := proto.Unmarshal(dg.Data, &step); err != nil { | 182 if err := proto.Unmarshal(dg.Data, &step); err != nil { |
| 193 » » return &miloerror.Error{ | 183 » » return nil, &miloerror.Error{ |
| 194 Message: "Failed to unmarshal annotation protobuf", | 184 Message: "Failed to unmarshal annotation protobuf", |
| 195 Code: http.StatusInternalServerError, | 185 Code: http.StatusInternalServerError, |
| 196 } | 186 } |
| 197 } | 187 } |
| 198 | 188 |
| 199 var latestEndedTime time.Time | 189 var latestEndedTime time.Time |
| 200 for _, sub := range step.Substep { | 190 for _, sub := range step.Substep { |
| 201 switch t := sub.Substep.(type) { | 191 switch t := sub.Substep.(type) { |
| 202 case *miloProto.Step_Substep_AnnotationStream: | 192 case *miloProto.Step_Substep_AnnotationStream: |
| 203 // TODO(hinoka,dnj): Implement recursive / embedded subs
tream fetching if | 193 // TODO(hinoka,dnj): Implement recursive / embedded subs
tream fetching if |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 239 mcItem.SetExpiration(intermediateCacheLifetime) | 229 mcItem.SetExpiration(intermediateCacheLifetime) |
| 240 } | 230 } |
| 241 mcItem.SetValue(mcData) | 231 mcItem.SetValue(mcData) |
| 242 if err := mc.Set(c, mcItem); err != nil { | 232 if err := mc.Set(c, mcItem); err != nil { |
| 243 log.WithError(err).Warningf(c, "Failed to cache annotati
on protobuf CachedStep.") | 233 log.WithError(err).Warningf(c, "Failed to cache annotati
on protobuf CachedStep.") |
| 244 } | 234 } |
| 245 } else { | 235 } else { |
| 246 log.WithError(err).Warningf(c, "Failed to marshal annotation pro
tobuf CachedStep.") | 236 log.WithError(err).Warningf(c, "Failed to marshal annotation pro
tobuf CachedStep.") |
| 247 } | 237 } |
| 248 | 238 |
| 249 » return nil | 239 » return as.cs.Step, nil |
| 250 } | 240 } |
| 251 | 241 |
| 252 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil
d { | 242 func (as *AnnotationStream) memcacheKey() string { |
| 253 » prefix, name := as.path.Split() | 243 » return fmt.Sprintf("logdog/%s/%s/%s", as.Client.Host, as.Project, as.Pat
h) |
| 244 } |
| 245 |
| 246 func (as *AnnotationStream) toMiloBuild(c context.Context) *resp.MiloBuild { |
| 247 » prefix, name := as.Path.Split() |
| 254 | 248 |
| 255 // Prepare a Streams object with only one stream. | 249 // Prepare a Streams object with only one stream. |
| 256 streams := Streams{ | 250 streams := Streams{ |
| 257 MainStream: &Stream{ | 251 MainStream: &Stream{ |
| 258 » » » Server: as.host, | 252 » » » Server: as.Client.Host, |
| 259 Prefix: string(prefix), | 253 Prefix: string(prefix), |
| 260 Path: string(name), | 254 Path: string(name), |
| 261 IsDatagram: true, | 255 IsDatagram: true, |
| 262 Data: as.cs.Step, | 256 Data: as.cs.Step, |
| 263 Closed: as.cs.Finished, | 257 Closed: as.cs.Finished, |
| 264 }, | 258 }, |
| 265 } | 259 } |
| 266 | 260 |
| 267 var ( | 261 var ( |
| 268 build resp.MiloBuild | 262 build resp.MiloBuild |
| 269 ub = logDogURLBuilder{ | 263 ub = logDogURLBuilder{ |
| 270 » » » project: as.project, | 264 » » » host: as.Client.Host, |
| 271 » » » host: as.host, | 265 » » » project: as.Project, |
| 272 prefix: prefix, | 266 prefix: prefix, |
| 273 } | 267 } |
| 274 ) | 268 ) |
| 275 AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build) | 269 AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build) |
| 276 return &build | 270 return &build |
| 277 } | 271 } |
| 278 | 272 |
| 279 type logDogURLBuilder struct { | 273 type logDogURLBuilder struct { |
| 280 host string | 274 host string |
| 281 prefix types.StreamName | 275 prefix types.StreamName |
| 282 project cfgtypes.ProjectName | 276 project cfgtypes.ProjectName |
| 283 } | 277 } |
| 284 | 278 |
| 285 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { | 279 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { |
| 286 switch t := l.Value.(type) { | 280 switch t := l.Value.(type) { |
| 287 case *miloProto.Link_LogdogStream: | 281 case *miloProto.Link_LogdogStream: |
| 288 ls := t.LogdogStream | 282 ls := t.LogdogStream |
| 289 | 283 |
| 290 server := ls.Server | 284 server := ls.Server |
| 291 if server == "" { | 285 if server == "" { |
| 292 server = b.host | 286 server = b.host |
| 293 } | 287 } |
| 294 | 288 |
| 295 prefix := types.StreamName(ls.Prefix) | 289 prefix := types.StreamName(ls.Prefix) |
| 296 if prefix == "" { | 290 if prefix == "" { |
| 297 prefix = b.prefix | 291 prefix = b.prefix |
| 298 } | 292 } |
| 299 | 293 |
| 300 » » path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream
Name(ls.Name))) | 294 » » u := viewer.GetURL(server, b.project, prefix.Join(types.StreamNa
me(ls.Name))) |
| 301 » » u := url.URL{ | |
| 302 » » » Scheme: "https", | |
| 303 » » » Host: server, | |
| 304 » » » Path: "v/", | |
| 305 » » » RawQuery: url.Values{ | |
| 306 » » » » "s": []string{string(path)}, | |
| 307 » » » }.Encode(), | |
| 308 » » } | |
| 309 | |
| 310 link := resp.Link{ | 295 link := resp.Link{ |
| 311 Label: l.Label, | 296 Label: l.Label, |
| 312 » » » URL: u.String(), | 297 » » » URL: u, |
| 313 } | 298 } |
| 314 if link.Label == "" { | 299 if link.Label == "" { |
| 315 link.Label = ls.Name | 300 link.Label = ls.Name |
| 316 } | 301 } |
| 317 return &link | 302 return &link |
| 318 | 303 |
| 319 case *miloProto.Link_Url: | 304 case *miloProto.Link_Url: |
| 320 link := resp.Link{ | 305 link := resp.Link{ |
| 321 Label: l.Label, | 306 Label: l.Label, |
| 322 URL: t.Url, | 307 URL: t.Url, |
| 323 } | 308 } |
| 324 if link.Label == "" { | 309 if link.Label == "" { |
| 325 link.Label = "unnamed" | 310 link.Label = "unnamed" |
| 326 } | 311 } |
| 327 return &link | 312 return &link |
| 328 | 313 |
| 329 default: | 314 default: |
| 330 // Don't know how to render. | 315 // Don't know how to render. |
| 331 return nil | 316 return nil |
| 332 } | 317 } |
| 333 } | 318 } |
| OLD | NEW |