Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(575)

Side by Side Diff: appengine/cmd/milo/logdog/build.go

Issue 2191693003: Milo: Add LogDog annotation stream support. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Updated from comments. Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package logdog
6
7 import (
8 "errors"
9 "fmt"
10 "net/http"
11 "net/url"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/luci/luci-go/appengine/cmd/milo/logdog/internal"
17 "github.com/luci/luci-go/appengine/cmd/milo/miloerror"
18 "github.com/luci/luci-go/appengine/cmd/milo/resp"
19 "github.com/luci/luci-go/appengine/cmd/milo/settings"
20 authClient "github.com/luci/luci-go/appengine/gaeauth/client"
21 "github.com/luci/luci-go/common/config"
22 "github.com/luci/luci-go/common/grpcutil"
23 log "github.com/luci/luci-go/common/logging"
24 "github.com/luci/luci-go/common/proto/google"
25 miloProto "github.com/luci/luci-go/common/proto/milo"
26 "github.com/luci/luci-go/common/prpc"
27 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
28 "github.com/luci/luci-go/logdog/api/logpb"
29 "github.com/luci/luci-go/logdog/common/types"
30 "github.com/luci/luci-go/server/templates"
31
32 "github.com/golang/protobuf/proto"
33 "github.com/julienschmidt/httprouter"
34 "github.com/luci/gae/service/memcache"
35 "golang.org/x/net/context"
36 "google.golang.org/grpc/codes"
37 )
38
39 const (
40 // intermediateCacheLifetime is the amount of time to cache intermediate (non-
41 // terminal) annotation streams. Terminal annotation streams are cached
42 // indefinitely.
43 intermediateCacheLifetime = 10 * time.Second
44
45 // defaultLogDogHost is the default LogDog host, if one isn't specified via
46 // query string.
47 defaultLogDogHost = "luci-logdog.appspot.com"
48 )
49
50 // AnnotationStream is a ThemedHandler that renders a LogDog Milo annotation
51 // protobuf stream.
52 //
53 // The protobuf stream is fetched live from LogDog and cached locally, either
54 // temporarily (if incomplete) or indefinitely (if complete).
55 type AnnotationStream struct {
56 // logDogClient is a reusable HTTP client to use for LogDog.
57 logDogClient *http.Client
58
59 initMu sync.Mutex
60 }
61
62 // init initializes the process-global state of the AnnotationStream.
63 func (s *AnnotationStream) init(c context.Context) *miloerror.Error {
64 s.initMu.Lock()
65 defer s.initMu.Unlock()
66
67 if s.logDogClient != nil {
68 return nil
69 }
70
71 // Initialize the LogDog client.
72 a, err := authClient.Transport(c, nil, nil)
73 if err != nil {
74 log.WithError(err).Errorf(c, "Failed to get transport for LogDog server.")
75 return &miloerror.Error{
76 Code: http.StatusInternalServerError,
77 }
78 }
79
80 s.logDogClient = &http.Client{
81 Transport: a,
82 }
83 return nil
84 }
85
86 // GetTemplateName implements settings.ThemedHandler.
87 func (s *AnnotationStream) GetTemplateName(t settings.Theme) string {
88 return "build.html"
89 }
90
91 // Render implements settings.ThemedHandler.
92 func (s *AnnotationStream) Render(c context.Context, req *http.Request, p httpro uter.Params) (*templates.Args, error) {
93 if err := s.init(c); err != nil {
94 return nil, err
95 }
96
97 as := annotationStreamRequest{
98 AnnotationStream: s,
99
100 project: config.ProjectName(p.ByName("project")),
101 path: types.StreamPath(strings.Trim(p.ByName("path"), "/")),
102 host: req.FormValue("host"),
103 }
104 if err := as.normalize(); err != nil {
105 return nil, err
106 }
107
108 // Load the Milo annotation protobuf from the annotation stream.
109 if err := as.load(c); err != nil {
110 return nil, err
111 }
112
113 // Convert the Milo Annotation protobuf to Milo objects.
114 return &templates.Args{
115 "Build": as.toMiloBuild(c),
116 }, nil
117 }
118
119 type annotationStreamRequest struct {
120 *AnnotationStream
121
122 // host is the name of the LogDog host.
123 host string
124
125 project config.ProjectName
126 path types.StreamPath
127
128 // item is the unmarshalled annotation stream Step and associated data.
129 item internal.Item
130 }
131
132 func (as *annotationStreamRequest) normalize() error {
133 if err := as.project.Validate(); err != nil {
134 return &miloerror.Error{
135 Message: "Invalid project name",
136 Code: http.StatusBadRequest,
137 }
138 }
139
140 if err := as.path.Validate(); err != nil {
141 return &miloerror.Error{
142 Message: fmt.Sprintf("Invalid log stream path %q: %s", a s.path, err),
143 Code: http.StatusBadRequest,
144 }
145 }
146
147 // Get the host. We normalize it to lowercase and trim spaces since we u se
148 // it as a memcache key.
149 as.host = strings.ToLower(strings.TrimSpace(as.host))
150 if as.host == "" {
151 as.host = defaultLogDogHost
152 }
153 if strings.IndexRune(as.host, '/') >= 0 {
154 return errors.New("invalid host name")
155 }
156
157 return nil
158 }
159
160 func (as *annotationStreamRequest) memcacheKey() string {
161 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path)
162 }
163
164 func (as *annotationStreamRequest) load(c context.Context) error {
165 // Load from memcache, if possible. If an error occurs, we will proceed as if
166 // no cache item was available.
167 mcKey := as.memcacheKey()
168 mcItem, err := memcache.Get(c).Get(mcKey)
169 switch err {
170 case nil:
171 if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil {
172 return nil
173 }
174
175 // We could not unmarshal the cached value. Try and delete it fr om
176 // memcache, since it's invalid.
177 log.Fields{
178 log.ErrorKey: err,
179 "memcacheKey": mcKey,
180 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.")
181 if err := memcache.Get(c).Delete(mcKey); err != nil {
182 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.")
183 }
184
185 case memcache.ErrCacheMiss:
186 break
187
188 default:
189 log.Fields{
190 log.ErrorKey: err,
191 "memcacheKey": mcKey,
192 }.Errorf(c, "Failed to load annotation protobuf memcache item.")
193 }
194
195 // Load from LogDog directly.
196 client := logdog.NewLogsPRPCClient(&prpc.Client{
197 C: as.logDogClient,
198 Host: as.host,
199 })
200 resp, err := client.Tail(c, &logdog.TailRequest{
201 Project: string(as.project),
202 Path: string(as.path),
203 State: true,
204 })
205 switch code := grpcutil.Code(err); code {
206 case codes.OK:
207 break
208
209 case codes.NotFound:
210 return &miloerror.Error{
211 Message: "Stream not found",
212 Code: http.StatusNotFound,
213 }
214
215 default:
216 // TODO: Once we switch to delegation tokens and are making the request on
217 // behalf of a user rather than the Milo service, handle Permiss ionDenied.
218 log.Fields{
219 log.ErrorKey: err,
220 "code": code,
221 }.Errorf(c, "Failed to load LogDog annotation stream.")
222 return &miloerror.Error{
223 Message: "Failed to load stream",
224 Code: http.StatusInternalServerError,
225 }
226 }
227
228 // Make sure that this is an annotation stream.
229 switch {
230 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
231 return &miloerror.Error{
232 Message: "Requested stream is not a Milo annotation prot obuf",
233 Code: http.StatusBadRequest,
234 }
235
236 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
237 return &miloerror.Error{
238 Message: "Requested stream is not a datagram stream",
239 Code: http.StatusBadRequest,
240 }
241
242 case len(resp.Logs) == 0:
243 // No annotation stream data, so render a minimal page.
244 return nil
245 }
246
247 // Get the last log entry in the stream. In reality, this will be index 0,
248 // since the "Tail" call should only return one log entry.
249 latestStream := resp.Logs[len(resp.Logs)-1]
250 dg := latestStream.GetDatagram()
251 switch {
252 case dg == nil:
253 return &miloerror.Error{
254 Message: "Datagram stream does not have datagram data",
255 Code: http.StatusInternalServerError,
256 }
257
258 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
259 // LogDog splits large datagrams into consecutive fragments. If the
260 // annotation state is fragmented, a reconstruction algorithm wi ll have to
261 // be employed here to build the full datagram before processing .
262 //
263 // At the moment, no annotation streams are expected to be anywh ere close to
264 // this large, so we're going to handle this case by erroring. A
265 // reconstruction algorithm would look like:
266 // 1) "Tail" to get the latest datagram, identify it as partial.
267 // 1a) Perform a bounds check on the total datagram size to ensu re that it
268 // can be safely reconstructed.
269 // 2) Determine if it's the last partial index. If not, then the latest
270 // datagram is incomplete. Determine our initial datagram's stream index
271 // the by subtracting the partial index from this message's stream index.
272 // 2a) If this datagram index is "0", the first datagram in the stream is
273 // partial and all of the data isn't here, so treat this as "no data".
274 // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus
275 // one to get the previous datagram.
276 // 3) Issue a "Get" request for our initial datagram index throu gh the index
277 // preceding ours.
278 // 4) Reassemble the binary data from the full set of datagrams.
279 return &miloerror.Error{
280 Message: "Partial datagram streams are not supported yet ",
281 Code: http.StatusNotImplemented,
282 }
283 }
284
285 // Attempt to decode the Step protobuf.
286 var step miloProto.Step
287 if err := proto.Unmarshal(dg.Data, &step); err != nil {
288 return &miloerror.Error{
289 Message: "Failed to unmarshal annotation protobuf",
290 Code: http.StatusInternalServerError,
291 }
292 }
293
294 var latestEndedTime time.Time
295 for _, sub := range step.Substep {
296 switch t := sub.Substep.(type) {
297 case *miloProto.Step_Substep_AnnotationStream:
298 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if
299 // specified.
300 log.Warningf(c, "Annotation stream links LogDog substrea m [%+v], not supported!", t.AnnotationStream)
301
302 case *miloProto.Step_Substep_Step:
303 endedTime := t.Step.Ended.Time()
304 if t.Step.Ended != nil && endedTime.After(latestEndedTim e) {
305 latestEndedTime = endedTime
306 }
307 }
308 }
309 if latestEndedTime.IsZero() {
310 // No substep had an ended time :(
311 latestEndedTime = step.Started.Time()
312 }
313
314 // Build our Item.
315 as.item = internal.Item{
316 Step: &step,
317 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI ndex == uint64(resp.State.TerminalIndex)),
318 }
319
320 // Annotee is apparently not putting an ended time on some annotation pr otos.
321 // This hack will ensure that a finished build will always have an ended time.
322 if as.item.Finished && as.item.Step.Ended == nil {
323 as.item.Step.Ended = google.NewTimestamp(latestEndedTime)
324 }
325
326 // Marshal and cache the item. If this is the final protobuf in the stre am,
327 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime.
328 //
329 // If this fails, it is non-fatal.
330 mcData, err := proto.Marshal(&as.item)
331 if err == nil {
332 mcItem = memcache.Get(c).NewItem(mcKey)
333 if !as.item.Finished {
334 mcItem.SetExpiration(intermediateCacheLifetime)
335 }
336 mcItem.SetValue(mcData)
337 if err := memcache.Get(c).Set(mcItem); err != nil {
338 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf Item.")
339 }
340 } else {
341 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf Item.")
342 }
343
344 return nil
345 }
346
347 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil d {
348 prefix, name := as.path.Split()
349
350 // Prepare a Streams object with only one stream.
351 streams := Streams{
352 MainStream: &Stream{
353 Server: as.host,
354 Prefix: string(prefix),
355 Path: string(name),
356 IsDatagram: true,
357 Data: as.item.Step,
358 Closed: as.item.Finished,
359 },
360 }
361
362 var (
363 build resp.MiloBuild
364 ub = logDogURLBuilder{
365 project: as.project,
366 host: as.host,
367 prefix: prefix,
368 }
369 )
370 AddLogDogToBuild(c, &ub, &streams, &build)
371
372 // If we're still building, the duration is the difference between start time
373 // and now.
374 return &build
375 }
376
377 type logDogURLBuilder struct {
378 host string
379 prefix types.StreamName
380 project config.ProjectName
381 }
382
383 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
384 switch t := l.Value.(type) {
385 case *miloProto.Link_LogdogStream:
386 ls := t.LogdogStream
387
388 server := ls.Server
389 if server == "" {
390 server = b.host
391 }
392
393 prefix := types.StreamName(ls.Prefix)
394 if prefix == "" {
395 prefix = b.prefix
396 }
397
398 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream Name(ls.Name)))
399 u := url.URL{
400 Scheme: "https",
401 Host: server,
402 Path: "v/",
403 RawQuery: url.Values{
404 "s": []string{string(path)},
405 }.Encode(),
406 }
407
408 link := resp.Link{
409 Label: l.Label,
410 URL: u.String(),
411 }
412 if link.Label == "" {
413 link.Label = ls.Name
414 }
415 return &link
416
417 case *miloProto.Link_Url:
418 link := resp.Link{
419 Label: l.Label,
420 URL: t.Url,
421 }
422 if link.Label == "" {
423 link.Label = "unnamed"
424 }
425 return &link
426
427 default:
428 // Don't know how to render.
429 return nil
430 }
431 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698