Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1682)

Side by Side Diff: milo/appengine/logdog/build.go

Issue 2674603002: milo: Export LogDog annotation stream, remove host (Closed)
Patch Set: Comments Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « milo/appengine/frontend/milo.go ('k') | milo/appengine/logdog/http.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logdog 5 package logdog
6 6
7 import ( 7 import (
8 "errors"
9 "fmt" 8 "fmt"
10 "net/http" 9 "net/http"
11 "net/url"
12 "strings"
13 "time" 10 "time"
14 11
15 log "github.com/luci/luci-go/common/logging" 12 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/proto/google" 13 "github.com/luci/luci-go/common/proto/google"
17 miloProto "github.com/luci/luci-go/common/proto/milo" 14 miloProto "github.com/luci/luci-go/common/proto/milo"
18 "github.com/luci/luci-go/grpc/grpcutil" 15 "github.com/luci/luci-go/grpc/grpcutil"
19 "github.com/luci/luci-go/logdog/api/logpb" 16 "github.com/luci/luci-go/logdog/api/logpb"
20 "github.com/luci/luci-go/logdog/client/coordinator" 17 "github.com/luci/luci-go/logdog/client/coordinator"
21 "github.com/luci/luci-go/logdog/common/types" 18 "github.com/luci/luci-go/logdog/common/types"
19 "github.com/luci/luci-go/logdog/common/viewer"
22 "github.com/luci/luci-go/luci_config/common/cfgtypes" 20 "github.com/luci/luci-go/luci_config/common/cfgtypes"
23 "github.com/luci/luci-go/milo/api/resp" 21 "github.com/luci/luci-go/milo/api/resp"
24 "github.com/luci/luci-go/milo/appengine/logdog/internal" 22 "github.com/luci/luci-go/milo/appengine/logdog/internal"
25 "github.com/luci/luci-go/milo/common/miloerror" 23 "github.com/luci/luci-go/milo/common/miloerror"
26 24
27 "github.com/golang/protobuf/proto" 25 "github.com/golang/protobuf/proto"
28 mc "github.com/luci/gae/service/memcache" 26 mc "github.com/luci/gae/service/memcache"
29 "golang.org/x/net/context" 27 "golang.org/x/net/context"
30 "google.golang.org/grpc/codes" 28 "google.golang.org/grpc/codes"
31 ) 29 )
32 30
33 const ( 31 const (
34 // intermediateCacheLifetime is the amount of time to cache intermediate (non- 32 // intermediateCacheLifetime is the amount of time to cache intermediate (non-
35 // terminal) annotation streams. Terminal annotation streams are cached 33 // terminal) annotation streams. Terminal annotation streams are cached
36 // indefinitely. 34 // indefinitely.
37 intermediateCacheLifetime = 10 * time.Second 35 intermediateCacheLifetime = 10 * time.Second
38 36
39 // defaultLogDogHost is the default LogDog host, if one isn't specified via 37 // defaultLogDogHost is the default LogDog host, if one isn't specified via
40 // query string. 38 // query string.
41 defaultLogDogHost = "luci-logdog.appspot.com" 39 defaultLogDogHost = "luci-logdog.appspot.com"
42 ) 40 )
43 41
44 type annotationStreamRequest struct { 42 // AnnotationStream represents a LogDog annotation protobuf stream.
45 » *AnnotationStream 43 type AnnotationStream struct {
46 44 » Project cfgtypes.ProjectName
47 » // host is the name of the LogDog host. 45 » Path types.StreamPath
48 » host string
49
50 » project cfgtypes.ProjectName
51 » path types.StreamPath
52 46
53 // logDogClient is the HTTP client to use for LogDog communication. 47 // logDogClient is the HTTP client to use for LogDog communication.
54 » logDogClient *coordinator.Client 48 » Client *coordinator.Client
55 49
56 // cs is the unmarshalled annotation stream Step and associated data. 50 // cs is the unmarshalled annotation stream Step and associated data.
57 cs internal.CachedStep 51 cs internal.CachedStep
58 } 52 }
59 53
60 func (as *annotationStreamRequest) normalize() error { 54 // Normalize validates and normalizes the stream's parameters.
61 » if err := as.project.Validate(); err != nil { 55 func (as *AnnotationStream) Normalize() error {
62 » » return &miloerror.Error{ 56 » if err := as.Project.Validate(); err != nil {
63 » » » Message: "Invalid project name", 57 » » return fmt.Errorf("Invalid project name: %s", as.Project)
64 » » » Code: http.StatusBadRequest,
65 » » }
66 } 58 }
67 59
68 » if err := as.path.Validate(); err != nil { 60 » if err := as.Path.Validate(); err != nil {
69 » » return &miloerror.Error{ 61 » » return fmt.Errorf("Invalid log stream path %q: %s", as.Path, err )
70 » » » Message: fmt.Sprintf("Invalid log stream path %q: %s", a s.path, err),
71 » » » Code: http.StatusBadRequest,
72 » » }
73 » }
74
75 » // Get the host. We normalize it to lowercase and trim spaces since we u se
76 » // it as a memcache key.
77 » as.host = strings.ToLower(strings.TrimSpace(as.host))
78 » if as.host == "" {
79 » » as.host = defaultLogDogHost
80 » }
81 » if strings.ContainsRune(as.host, '/') {
82 » » return errors.New("invalid host name")
83 } 62 }
84 63
85 return nil 64 return nil
86 } 65 }
87 66
88 func (as *annotationStreamRequest) memcacheKey() string { 67 // Load loads the annotation stream from LogDog.
89 » return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path) 68 //
90 } 69 // If the stream does not exist, or is invalid, Load will return a Milo error.
70 // Otherwise, it will return the Step that was loaded.
71 //
72 // Load caches the step, so multiple calls to Load will return the same Step
73 // value.
74 func (as *AnnotationStream) Load(c context.Context) (*miloProto.Step, error) {
75 » // Cached?
76 » if as.cs.Step != nil {
77 » » return as.cs.Step, nil
78 » }
91 79
92 func (as *annotationStreamRequest) load(c context.Context) error {
93 // Load from memcache, if possible. If an error occurs, we will proceed as if 80 // Load from memcache, if possible. If an error occurs, we will proceed as if
94 // no CachedStep was available. 81 // no CachedStep was available.
95 mcKey := as.memcacheKey() 82 mcKey := as.memcacheKey()
96 mcItem, err := mc.GetKey(c, mcKey) 83 mcItem, err := mc.GetKey(c, mcKey)
97 switch err { 84 switch err {
98 case nil: 85 case nil:
99 if err := proto.Unmarshal(mcItem.Value(), &as.cs); err == nil { 86 if err := proto.Unmarshal(mcItem.Value(), &as.cs); err == nil {
100 » » » return nil 87 » » » return as.cs.Step, nil
101 } 88 }
102 89
103 // We could not unmarshal the cached value. Try and delete it fr om 90 // We could not unmarshal the cached value. Try and delete it fr om
104 // memcache, since it's invalid. 91 // memcache, since it's invalid.
105 log.Fields{ 92 log.Fields{
106 log.ErrorKey: err, 93 log.ErrorKey: err,
107 "memcacheKey": mcKey, 94 "memcacheKey": mcKey,
108 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.") 95 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.")
109 if err := mc.Delete(c, mcKey); err != nil { 96 if err := mc.Delete(c, mcKey); err != nil {
110 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.") 97 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.")
111 } 98 }
112 99
113 case mc.ErrCacheMiss: 100 case mc.ErrCacheMiss:
114 break 101 break
115 102
116 default: 103 default:
117 log.Fields{ 104 log.Fields{
118 log.ErrorKey: err, 105 log.ErrorKey: err,
119 "memcacheKey": mcKey, 106 "memcacheKey": mcKey,
120 }.Errorf(c, "Failed to load annotation protobuf memcache cached step.") 107 }.Errorf(c, "Failed to load annotation protobuf memcache cached step.")
121 } 108 }
122 109
123 // Load from LogDog directly. 110 // Load from LogDog directly.
124 log.Fields{ 111 log.Fields{
125 » » "project": as.project, 112 » » "host": as.Client.Host,
126 » » "path": as.path, 113 » » "project": as.Project,
127 » » "host": as.host, 114 » » "path": as.Path,
128 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") 115 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.")
129 116
130 var ( 117 var (
131 state coordinator.LogStream 118 state coordinator.LogStream
132 » » stream = as.logDogClient.Stream(as.project, as.path) 119 » » stream = as.Client.Stream(as.Project, as.Path)
133 ) 120 )
134 le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com plete()) 121 le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com plete())
135 switch code := grpcutil.Code(err); code { 122 switch code := grpcutil.Code(err); code {
136 case codes.OK: 123 case codes.OK:
137 break 124 break
138 125
139 case codes.NotFound: 126 case codes.NotFound:
140 » » return &miloerror.Error{ 127 » » return nil, &miloerror.Error{
141 Message: "Stream not found", 128 Message: "Stream not found",
142 Code: http.StatusNotFound, 129 Code: http.StatusNotFound,
143 } 130 }
144 131
145 default: 132 default:
146 // TODO: Once we switch to delegation tokens and are making the request on 133 // TODO: Once we switch to delegation tokens and are making the request on
147 // behalf of a user rather than the Milo service, handle Permiss ionDenied. 134 // behalf of a user rather than the Milo service, handle Permiss ionDenied.
148 log.Fields{ 135 log.Fields{
149 log.ErrorKey: err, 136 log.ErrorKey: err,
150 "code": code, 137 "code": code,
151 }.Errorf(c, "Failed to load LogDog annotation stream.") 138 }.Errorf(c, "Failed to load LogDog annotation stream.")
152 » » return &miloerror.Error{ 139 » » return nil, &miloerror.Error{
153 Message: "Failed to load stream", 140 Message: "Failed to load stream",
154 Code: http.StatusInternalServerError, 141 Code: http.StatusInternalServerError,
155 } 142 }
156 } 143 }
157 144
158 // Make sure that this is an annotation stream. 145 // Make sure that this is an annotation stream.
159 switch { 146 switch {
160 case state.Desc.ContentType != miloProto.ContentTypeAnnotations: 147 case state.Desc.ContentType != miloProto.ContentTypeAnnotations:
161 » » return &miloerror.Error{ 148 » » return nil, &miloerror.Error{
162 Message: "Requested stream is not a Milo annotation prot obuf", 149 Message: "Requested stream is not a Milo annotation prot obuf",
163 Code: http.StatusBadRequest, 150 Code: http.StatusBadRequest,
164 } 151 }
165 152
166 case state.Desc.StreamType != logpb.StreamType_DATAGRAM: 153 case state.Desc.StreamType != logpb.StreamType_DATAGRAM:
167 » » return &miloerror.Error{ 154 » » return nil, &miloerror.Error{
168 Message: "Requested stream is not a datagram stream", 155 Message: "Requested stream is not a datagram stream",
169 Code: http.StatusBadRequest, 156 Code: http.StatusBadRequest,
170 } 157 }
171 158
172 case le == nil: 159 case le == nil:
173 // No annotation stream data, so render a minimal page. 160 // No annotation stream data, so render a minimal page.
174 » » return nil 161 » » return nil, &miloerror.Error{
162 » » » Message: "Log stream has no annotation entries",
163 » » » Code: http.StatusNotFound,
164 » » }
175 } 165 }
176 166
177 // Get the last log entry in the stream. In reality, this will be index 0, 167 // Get the last log entry in the stream. In reality, this will be index 0,
178 // since the "Tail" call should only return one log entry. 168 // since the "Tail" call should only return one log entry.
179 // 169 //
180 // Because we supplied the "Complete" flag to Tail and suceeded, this da tagram 170 // Because we supplied the "Complete" flag to Tail and suceeded, this da tagram
181 // will be complete even if its source datagram(s) are fragments. 171 // will be complete even if its source datagram(s) are fragments.
182 dg := le.GetDatagram() 172 dg := le.GetDatagram()
183 if dg == nil { 173 if dg == nil {
184 » » return &miloerror.Error{ 174 » » return nil, &miloerror.Error{
185 Message: "Datagram stream does not have datagram data", 175 Message: "Datagram stream does not have datagram data",
186 Code: http.StatusInternalServerError, 176 Code: http.StatusInternalServerError,
187 } 177 }
188 } 178 }
189 179
190 // Attempt to decode the Step protobuf. 180 // Attempt to decode the Step protobuf.
191 var step miloProto.Step 181 var step miloProto.Step
192 if err := proto.Unmarshal(dg.Data, &step); err != nil { 182 if err := proto.Unmarshal(dg.Data, &step); err != nil {
193 » » return &miloerror.Error{ 183 » » return nil, &miloerror.Error{
194 Message: "Failed to unmarshal annotation protobuf", 184 Message: "Failed to unmarshal annotation protobuf",
195 Code: http.StatusInternalServerError, 185 Code: http.StatusInternalServerError,
196 } 186 }
197 } 187 }
198 188
199 var latestEndedTime time.Time 189 var latestEndedTime time.Time
200 for _, sub := range step.Substep { 190 for _, sub := range step.Substep {
201 switch t := sub.Substep.(type) { 191 switch t := sub.Substep.(type) {
202 case *miloProto.Step_Substep_AnnotationStream: 192 case *miloProto.Step_Substep_AnnotationStream:
203 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if 193 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 mcItem.SetExpiration(intermediateCacheLifetime) 229 mcItem.SetExpiration(intermediateCacheLifetime)
240 } 230 }
241 mcItem.SetValue(mcData) 231 mcItem.SetValue(mcData)
242 if err := mc.Set(c, mcItem); err != nil { 232 if err := mc.Set(c, mcItem); err != nil {
243 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf CachedStep.") 233 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf CachedStep.")
244 } 234 }
245 } else { 235 } else {
246 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf CachedStep.") 236 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf CachedStep.")
247 } 237 }
248 238
249 » return nil 239 » return as.cs.Step, nil
250 } 240 }
251 241
252 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil d { 242 func (as *AnnotationStream) memcacheKey() string {
253 » prefix, name := as.path.Split() 243 » return fmt.Sprintf("logdog/%s/%s/%s", as.Client.Host, as.Project, as.Pat h)
244 }
245
246 func (as *AnnotationStream) toMiloBuild(c context.Context) *resp.MiloBuild {
247 » prefix, name := as.Path.Split()
254 248
255 // Prepare a Streams object with only one stream. 249 // Prepare a Streams object with only one stream.
256 streams := Streams{ 250 streams := Streams{
257 MainStream: &Stream{ 251 MainStream: &Stream{
258 » » » Server: as.host, 252 » » » Server: as.Client.Host,
259 Prefix: string(prefix), 253 Prefix: string(prefix),
260 Path: string(name), 254 Path: string(name),
261 IsDatagram: true, 255 IsDatagram: true,
262 Data: as.cs.Step, 256 Data: as.cs.Step,
263 Closed: as.cs.Finished, 257 Closed: as.cs.Finished,
264 }, 258 },
265 } 259 }
266 260
267 var ( 261 var (
268 build resp.MiloBuild 262 build resp.MiloBuild
269 ub = logDogURLBuilder{ 263 ub = logDogURLBuilder{
270 » » » project: as.project, 264 » » » host: as.Client.Host,
271 » » » host: as.host, 265 » » » project: as.Project,
272 prefix: prefix, 266 prefix: prefix,
273 } 267 }
274 ) 268 )
275 AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build) 269 AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build)
276 return &build 270 return &build
277 } 271 }
278 272
279 type logDogURLBuilder struct { 273 type logDogURLBuilder struct {
280 host string 274 host string
281 prefix types.StreamName 275 prefix types.StreamName
282 project cfgtypes.ProjectName 276 project cfgtypes.ProjectName
283 } 277 }
284 278
285 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { 279 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
286 switch t := l.Value.(type) { 280 switch t := l.Value.(type) {
287 case *miloProto.Link_LogdogStream: 281 case *miloProto.Link_LogdogStream:
288 ls := t.LogdogStream 282 ls := t.LogdogStream
289 283
290 server := ls.Server 284 server := ls.Server
291 if server == "" { 285 if server == "" {
292 server = b.host 286 server = b.host
293 } 287 }
294 288
295 prefix := types.StreamName(ls.Prefix) 289 prefix := types.StreamName(ls.Prefix)
296 if prefix == "" { 290 if prefix == "" {
297 prefix = b.prefix 291 prefix = b.prefix
298 } 292 }
299 293
300 » » path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream Name(ls.Name))) 294 » » u := viewer.GetURL(server, b.project, prefix.Join(types.StreamNa me(ls.Name)))
301 » » u := url.URL{
302 » » » Scheme: "https",
303 » » » Host: server,
304 » » » Path: "v/",
305 » » » RawQuery: url.Values{
306 » » » » "s": []string{string(path)},
307 » » » }.Encode(),
308 » » }
309
310 link := resp.Link{ 295 link := resp.Link{
311 Label: l.Label, 296 Label: l.Label,
312 » » » URL: u.String(), 297 » » » URL: u,
313 } 298 }
314 if link.Label == "" { 299 if link.Label == "" {
315 link.Label = ls.Name 300 link.Label = ls.Name
316 } 301 }
317 return &link 302 return &link
318 303
319 case *miloProto.Link_Url: 304 case *miloProto.Link_Url:
320 link := resp.Link{ 305 link := resp.Link{
321 Label: l.Label, 306 Label: l.Label,
322 URL: t.Url, 307 URL: t.Url,
323 } 308 }
324 if link.Label == "" { 309 if link.Label == "" {
325 link.Label = "unnamed" 310 link.Label = "unnamed"
326 } 311 }
327 return &link 312 return &link
328 313
329 default: 314 default:
330 // Don't know how to render. 315 // Don't know how to render.
331 return nil 316 return nil
332 } 317 }
333 } 318 }
OLDNEW
« no previous file with comments | « milo/appengine/frontend/milo.go ('k') | milo/appengine/logdog/http.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698