Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(113)

Side by Side Diff: appengine/cmd/milo/logdog/handler.go

Issue 2191693003: Milo: Add LogDog annotation stream support. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Fix successful build state, derive more Swarming parameters from milo proto common code. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
Ryan Tseng 2016/07/29 18:16:38 rename this build.go, to match up with the other f
dnj 2016/07/29 19:57:34 Done.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package logdog
6
7 import (
8 "fmt"
9 "net/http"
10 "net/url"
11 "strings"
12 "time"
13
14 "github.com/luci/luci-go/appengine/cmd/milo/miloerror"
15 "github.com/luci/luci-go/appengine/cmd/milo/resp"
16 "github.com/luci/luci-go/appengine/cmd/milo/settings"
17 authClient "github.com/luci/luci-go/appengine/gaeauth/client"
18 "github.com/luci/luci-go/common/config"
19 "github.com/luci/luci-go/common/grpcutil"
20 log "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/proto/google"
22 miloProto "github.com/luci/luci-go/common/proto/milo"
23 "github.com/luci/luci-go/common/prpc"
24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
25 "github.com/luci/luci-go/logdog/api/logpb"
26 "github.com/luci/luci-go/logdog/common/types"
27 "github.com/luci/luci-go/server/templates"
28
29 "github.com/golang/protobuf/proto"
30 "github.com/julienschmidt/httprouter"
31 "github.com/luci/gae/service/memcache"
32 "golang.org/x/net/context"
33 "google.golang.org/grpc/codes"
34 )
35
36 const (
37 // intermediateCacheLifetime is the amount of time to cache intermediate (non-
38 // terminal) annotation streams. Terminal annotation streams are cached
39 // indefinitely.
40 intermediateCacheLifetime = 10 * time.Second
nodir 2016/07/29 18:42:06 this is a bit of regression because with swarming
dnj 2016/07/29 19:57:35 I think the caching is a good idea. I don't see 10
41
42 // defaultLogDogHost is the default LogDog host, if one isn't specified via
43 // query string.
44 defaultLogDogHost = "luci-logdog.appspot.com"
45 )
46
47 // AnnotationStream is a ThemedHandler that renders a LogDog Milo annotation
48 // protobuf stream.
49 //
50 // The protobuf stream is fetched live from LogDog and cached locally, either
51 // temporarily (if incomplete) or indefinitely (if complete).
52 type AnnotationStream struct{}
Ryan Tseng 2016/07/29 18:16:38 Move AnnotationStream and it's methods to html.go.
dnj 2016/07/29 19:57:34 "AnnotationStream" is explicitly a build.html hand
Ryan Tseng 2016/07/29 20:34:17 In other words, all "ThemedHandler" implementation
dnj 2016/07/29 21:23:37 Done.
53
54 // GetTemplateName implements settings.ThemedHandler.
55 func (s *AnnotationStream) GetTemplateName(t settings.Theme) string {
56 return "build.html"
57 }
58
59 // Render implements settings.ThemedHandler.
60 func (s *AnnotationStream) Render(c context.Context, req *http.Request, p httpro uter.Params) (*templates.Args, error) {
61 as := annotationStreamRequest{
62 project: config.ProjectName(p.ByName("project")),
63 path: types.StreamPath(strings.Trim(p.ByName("path"), "/")),
64 host: req.FormValue("host"),
nodir 2016/07/29 18:42:06 buildbucket and swarming handlers use "server" nam
dnj 2016/07/29 19:57:35 Lots of other packages, including pRPC, use "host"
Ryan Tseng 2016/07/29 20:34:17 I agree with nodir, we should keep it consistent w
dnj 2016/07/29 21:23:37 Discussed, using "host" everywhere.
65 }
66 if err := as.normalize(); err != nil {
67 return nil, err
68 }
69
70 // Load the Milo annotation protobuf from the annotation stream.
71 if err := as.load(c); err != nil {
72 return nil, err
73 }
74
75 // Convert the Milo Annotation protobuf to Milo objects.
76 return &templates.Args{
77 "Build": as.toMiloBuild(c),
78 }, nil
79 }
80
81 type annotationStreamRequest struct {
82 // host is the name of the LogDog host.
83 host string
84 // project is the project.
nodir 2016/07/29 18:42:06 this is not useful :)
dnj 2016/07/29 19:57:35 Done.
85 project config.ProjectName
86 // path is the stream path.
87 path types.StreamPath
88
89 // item is the unmarshalled annotation stream Step and associated data.
90 item Item
91 }
92
93 func (as *annotationStreamRequest) normalize() error {
94 if err := as.project.Validate(); err != nil {
95 return &miloerror.Error{
96 Message: "Invalid project name",
97 Code: http.StatusBadRequest,
98 }
99 }
100
101 if err := as.path.Validate(); err != nil {
102 return &miloerror.Error{
103 Message: fmt.Sprintf("Invalid log stream path %q: %s", a s.path, err),
104 Code: http.StatusBadRequest,
105 }
106 }
107
108 // Get the host. We normalize it to lowercase and trim spaces since we u se
109 // it as a memcache key.
110 if as.host == "" {
nodir 2016/07/29 18:42:06 do this after trimming, so as.host = " " is conver
dnj 2016/07/29 19:57:35 Done.
111 as.host = defaultLogDogHost
112 }
113 as.host = strings.ToLower(strings.TrimSpace(as.host))
nodir 2016/07/29 18:42:06 please verify that it does not have slashes, so ou
dnj 2016/07/29 19:57:35 Done.
114
115 return nil
116 }
117
118 func (as *annotationStreamRequest) memcacheKey() string {
119 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path)
120 }
121
122 func (as *annotationStreamRequest) load(c context.Context) error {
123 // Load from memcache, if possible. If an error occurs, we will proceed as if
124 // no cache item was available.
125 mcKey := as.memcacheKey()
126 mcItem, err := memcache.Get(c).Get(mcKey)
127 switch err {
128 case nil:
129 if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil {
130 return nil
131 }
132
133 // We could not unmarshal the cached value. Try and delete it fr om
134 // memcache, since it's invalid.
135 log.Fields{
136 log.ErrorKey: err,
137 "memcacheKey": mcKey,
138 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.")
139 if err := memcache.Get(c).Delete(mcKey); err != nil {
140 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.")
141 }
142
143 case memcache.ErrCacheMiss:
144 break
145
146 default:
147 log.Fields{
148 log.ErrorKey: err,
149 "memcacheKey": mcKey,
150 }.Warningf(c, "Failed to load annotation protobuf memcache item. ")
nodir 2016/07/29 18:42:06 i think it is ok to make it Errorf since it may ha
dnj 2016/07/29 19:57:34 Done.
151 }
152
153 // Load from LogDog directly.
154 a, err := authClient.Transport(c, nil, nil)
155 if err != nil {
156 log.WithError(err).Errorf(c, "Failed to get transport for LogDog server.")
157 return &miloerror.Error{
158 Code: http.StatusInternalServerError,
159 }
160 }
161
162 client := logdog.NewLogsPRPCClient(&prpc.Client{
163 C: &http.Client{
nodir 2016/07/29 18:42:06 http.Client reuses TCP connections, but if we crea
dnj 2016/07/29 19:57:34 Done.
164 Transport: a,
165 },
166 Host: as.host,
167 })
168 resp, err := client.Tail(c, &logdog.TailRequest{
169 Project: string(as.project),
170 Path: string(as.path),
171 State: true,
172 })
173 if err != nil {
nodir 2016/07/29 18:42:06 nit: consider `case codes.OK:` as an alternative t
dnj 2016/07/29 19:57:35 Done.
174 switch code := grpcutil.Code(err); code {
175 case codes.NotFound:
176 return &miloerror.Error{
177 Message: "Stream not found",
178 Code: http.StatusNotFound,
179 }
180
nodir 2016/07/29 18:42:06 return 403 in case of check codes.PermissionDenied
dnj 2016/07/29 19:57:35 Since we're authenticating as Milo rather than the
nodir 2016/07/29 23:00:31 until milo uses delegation tokens, it does not sup
181 default:
182 log.Fields{
183 log.ErrorKey: err,
184 "code": code,
185 }.Errorf(c, "Failed to load LogDog annotation stream.")
186 return &miloerror.Error{
187 Message: "Failed to load stream",
188 Code: http.StatusInternalServerError,
189 }
190 }
191 }
192
193 // Make sure that this is an annotation stream!
194 switch {
195 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
196 return &miloerror.Error{
197 Message: "Requested stream is not a Milo annotation prot obuf",
198 Code: http.StatusBadRequest,
199 }
200
201 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
202 return &miloerror.Error{
203 Message: "Requested stream is not a datagram stream",
204 Code: http.StatusBadRequest,
205 }
206
207 case len(resp.Logs) == 0:
208 return &miloerror.Error{
209 Message: "Annotation stream data is not yet available",
210 Code: http.StatusInternalServerError,
nodir 2016/07/29 18:42:06 it is not an internal error. I think we should `re
dnj 2016/07/29 19:57:34 I'm not sure a blank page is better than "hey data
211 }
212 }
213
214 // Get the last log entry in the stream. In reality, this will be index 0,
215 // since the "Tail" call should only return one log entry.
216 latestStream := resp.Logs[len(resp.Logs)-1]
217 dg := latestStream.GetDatagram()
218 switch {
219 case dg == nil:
220 return &miloerror.Error{
221 Message: "Datagram stream does not have datagram data",
222 Code: http.StatusInternalServerError,
223 }
224
225 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
226 return &miloerror.Error{
227 Message: "Partial datagram streams are not supported yet ",
228 Code: http.StatusNotImplemented,
nodir 2016/07/29 18:42:06 how often this is going to happen in practice? if
dnj 2016/07/29 19:57:34 It's part of the protocol, and could happen if a g
229 }
230 }
231
232 // Attempt to decode the Step protobuf.
233 var step miloProto.Step
234 if err := proto.Unmarshal(dg.Data, &step); err != nil {
235 return &miloerror.Error{
236 Message: "Failed to unmarshal annotation protobuf",
237 Code: http.StatusInternalServerError,
238 }
239 }
240
241 var latestEndedTime time.Time
242 for _, sub := range step.Substep {
243 switch t := sub.Substep.(type) {
244 case *miloProto.Step_Substep_AnnotationStream:
245 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if
246 // specified.
247 log.Warningf(c, "Annotation stream links LogDog substrea m [%+v], not supported!", t.AnnotationStream)
248
249 case *miloProto.Step_Substep_Step:
250 endedTime := t.Step.Ended.Time()
251 if t.Step.Ended != nil && (latestEndedTime.IsZero() || e ndedTime.After(latestEndedTime)) {
nodir 2016/07/29 18:42:06 i think this condition should be just if endedTim
dnj 2016/07/29 19:57:34 Done.
252 latestEndedTime = endedTime
253 }
254 }
255 }
256 if latestEndedTime.IsZero() {
257 // No substep had an ended time :(
258 latestEndedTime = step.Started.Time()
259 }
260
261 // Build our Item.
262 as.item = Item{
263 Step: &step,
264 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI ndex == uint64(resp.State.TerminalIndex)),
265 }
266
267 // Annotee is apparently not putting an ended time on some annotation pr otos.
268 // This hack will ensure that a finished build will always have an ended time.
269 if as.item.Finished && as.item.Step.Ended == nil {
270 as.item.Step.Ended = google.NewTimestamp(latestEndedTime)
271 }
272
273 // Marshal and cache the item. If this is the final protobuf in the stre am,
274 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime.
275 //
276 // If this fails, it is non-fatal.
277 mcData, err := proto.Marshal(&as.item)
278 if err == nil {
279 mcItem = memcache.Get(c).NewItem(mcKey)
280 if !as.item.Finished {
281 mcItem.SetExpiration(intermediateCacheLifetime)
282 }
283 mcItem.SetValue(mcData)
284 if err := memcache.Get(c).Set(mcItem); err != nil {
285 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf Item.")
286 }
287 } else {
288 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf Item.")
289 }
290
291 return nil
292 }
293
294 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil d {
295 prefix, name := as.path.Split()
296
297 // Prepare a Streams object with only one stream.
298 streams := Streams{
299 MainStream: &Stream{
300 Server: as.host,
301 Prefix: string(prefix),
302 Path: string(name),
303 IsDatagram: true,
304 Data: as.item.Step,
305 Closed: as.item.Finished,
306 },
307 }
308
309 var (
310 build resp.MiloBuild
311 ub = logDogURLBuilder{
312 project: as.project,
313 host: as.host,
314 prefix: prefix,
315 }
316 )
317 AddLogDogToBuild(c, &ub, &streams, &build)
318
319 // If we're still building, the duration is the difference between start time
320 // and now.
321 return &build
322 }
323
324 type logDogURLBuilder struct {
325 host string
326 prefix types.StreamName
327 project config.ProjectName
328 }
329
330 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
331 switch t := l.Value.(type) {
332 case *miloProto.Link_LogdogStream:
333 ls := t.LogdogStream
334
335 server := ls.Server
336 if server == "" {
337 server = b.host
338 }
339
340 prefix := types.StreamName(ls.Prefix)
341 if prefix == "" {
342 prefix = b.prefix
343 }
344
345 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream Name(ls.Name)))
346 u := url.URL{
347 Scheme: "https",
348 Host: server,
349 Path: "v/",
350 RawQuery: url.Values{
351 "s": []string{string(path)},
352 }.Encode(),
353 }
354
355 link := resp.Link{
356 Label: l.Label,
357 URL: u.String(),
358 }
359 if link.Label == "" {
360 link.Label = ls.Name
361 }
362 return &link
363
364 case *miloProto.Link_Url:
365 link := resp.Link{
366 Label: l.Label,
367 URL: t.Url,
368 }
369 if link.Label == "" {
370 link.Label = "unnamed"
371 }
372 return &link
373
374 default:
375 // Don't know how to render.
376 return nil
377 }
378 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698