Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(84)

Side by Side Diff: appengine/cmd/milo/logdog/build.go

Issue 2191693003: Milo: Add LogDog annotation stream support. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Fixed bug, moved some things to http.go Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package logdog
6
7 import (
8 "errors"
9 "fmt"
10 "net/http"
11 "net/url"
12 "strings"
13 "time"
14
15 "github.com/luci/luci-go/appengine/cmd/milo/logdog/internal"
16 "github.com/luci/luci-go/appengine/cmd/milo/miloerror"
17 "github.com/luci/luci-go/appengine/cmd/milo/resp"
18 "github.com/luci/luci-go/common/config"
19 "github.com/luci/luci-go/common/grpcutil"
20 log "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/proto/google"
22 miloProto "github.com/luci/luci-go/common/proto/milo"
23 "github.com/luci/luci-go/common/prpc"
24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
25 "github.com/luci/luci-go/logdog/api/logpb"
26 "github.com/luci/luci-go/logdog/common/types"
27
28 "github.com/golang/protobuf/proto"
29 "github.com/luci/gae/service/memcache"
30 "golang.org/x/net/context"
31 "google.golang.org/grpc/codes"
32 )
33
34 const (
35 // intermediateCacheLifetime is the amount of time to cache intermediate (non-
36 // terminal) annotation streams. Terminal annotation streams are cached
37 // indefinitely.
38 intermediateCacheLifetime = 10 * time.Second
39
40 // defaultLogDogHost is the default LogDog host, if one isn't specified via
41 // query string.
42 defaultLogDogHost = "luci-logdog.appspot.com"
43 )
44
45 type annotationStreamRequest struct {
46 *AnnotationStream
47
48 // host is the name of the LogDog host.
49 host string
50
51 project config.ProjectName
52 path types.StreamPath
53
54 // item is the unmarshalled annotation stream Step and associated data.
55 item internal.Item
56 }
57
58 func (as *annotationStreamRequest) normalize() error {
59 if err := as.project.Validate(); err != nil {
60 return &miloerror.Error{
61 Message: "Invalid project name",
62 Code: http.StatusBadRequest,
63 }
64 }
65
66 if err := as.path.Validate(); err != nil {
67 return &miloerror.Error{
68 Message: fmt.Sprintf("Invalid log stream path %q: %s", a s.path, err),
69 Code: http.StatusBadRequest,
70 }
71 }
72
73 // Get the host. We normalize it to lowercase and trim spaces since we u se
74 // it as a memcache key.
75 as.host = strings.ToLower(strings.TrimSpace(as.host))
76 if as.host == "" {
77 as.host = defaultLogDogHost
78 }
79 if strings.IndexRune(as.host, '/') >= 0 {
nodir 2016/07/29 23:00:32 nit: use string.ContainsRune
dnj 2016/07/29 23:24:43 Done.
80 return errors.New("invalid host name")
81 }
82
83 return nil
84 }
85
86 func (as *annotationStreamRequest) memcacheKey() string {
87 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path)
88 }
89
90 func (as *annotationStreamRequest) load(c context.Context) error {
91 // Load from memcache, if possible. If an error occurs, we will proceed as if
92 // no cache item was available.
93 mcKey := as.memcacheKey()
94 mcItem, err := memcache.Get(c).Get(mcKey)
95 switch err {
96 case nil:
97 if err := proto.Unmarshal(mcItem.Value(), &as.item); err == nil {
98 return nil
99 }
100
101 // We could not unmarshal the cached value. Try and delete it fr om
102 // memcache, since it's invalid.
103 log.Fields{
104 log.ErrorKey: err,
105 "memcacheKey": mcKey,
106 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.")
107 if err := memcache.Get(c).Delete(mcKey); err != nil {
108 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.")
109 }
110
111 case memcache.ErrCacheMiss:
112 break
113
114 default:
115 log.Fields{
116 log.ErrorKey: err,
117 "memcacheKey": mcKey,
118 }.Errorf(c, "Failed to load annotation protobuf memcache item.")
119 }
120
121 // Load from LogDog directly.
122 client := logdog.NewLogsPRPCClient(&prpc.Client{
123 C: as.logDogClient,
124 Host: as.host,
125 })
126 resp, err := client.Tail(c, &logdog.TailRequest{
nodir 2016/07/29 23:00:31 please add logging: would be nice to have a log li
dnj 2016/07/29 23:24:43 Done.
127 Project: string(as.project),
128 Path: string(as.path),
129 State: true,
130 })
131 switch code := grpcutil.Code(err); code {
132 case codes.OK:
133 break
134
135 case codes.NotFound:
136 return &miloerror.Error{
137 Message: "Stream not found",
138 Code: http.StatusNotFound,
139 }
140
141 default:
142 // TODO: Once we switch to delegation tokens and are making the request on
143 // behalf of a user rather than the Milo service, handle Permiss ionDenied.
144 log.Fields{
145 log.ErrorKey: err,
146 "code": code,
147 }.Errorf(c, "Failed to load LogDog annotation stream.")
148 return &miloerror.Error{
149 Message: "Failed to load stream",
150 Code: http.StatusInternalServerError,
151 }
152 }
153
154 // Make sure that this is an annotation stream.
155 switch {
156 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
157 return &miloerror.Error{
158 Message: "Requested stream is not a Milo annotation prot obuf",
159 Code: http.StatusBadRequest,
160 }
161
162 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
163 return &miloerror.Error{
164 Message: "Requested stream is not a datagram stream",
165 Code: http.StatusBadRequest,
166 }
167
168 case len(resp.Logs) == 0:
169 // No annotation stream data, so render a minimal page.
170 return nil
171 }
172
173 // Get the last log entry in the stream. In reality, this will be index 0,
174 // since the "Tail" call should only return one log entry.
175 latestStream := resp.Logs[len(resp.Logs)-1]
176 dg := latestStream.GetDatagram()
177 switch {
178 case dg == nil:
179 return &miloerror.Error{
180 Message: "Datagram stream does not have datagram data",
181 Code: http.StatusInternalServerError,
182 }
183
184 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
185 // LogDog splits large datagrams into consecutive fragments. If the
186 // annotation state is fragmented, a reconstruction algorithm wi ll have to
187 // be employed here to build the full datagram before processing .
188 //
189 // At the moment, no annotation streams are expected to be anywh ere close to
190 // this large, so we're going to handle this case by erroring. A
191 // reconstruction algorithm would look like:
192 // 1) "Tail" to get the latest datagram, identify it as partial.
193 // 1a) Perform a bounds check on the total datagram size to ensu re that it
194 // can be safely reconstructed.
195 // 2) Determine if it's the last partial index. If not, then the latest
196 // datagram is incomplete. Determine our initial datagram's stream index
197 // the by subtracting the partial index from this message's stream index.
198 // 2a) If this datagram index is "0", the first datagram in the stream is
199 // partial and all of the data isn't here, so treat this as "no data".
200 // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus
201 // one to get the previous datagram.
202 // 3) Issue a "Get" request for our initial datagram index throu gh the index
203 // preceding ours.
204 // 4) Reassemble the binary data from the full set of datagrams.
205 return &miloerror.Error{
206 Message: "Partial datagram streams are not supported yet ",
207 Code: http.StatusNotImplemented,
208 }
209 }
210
211 // Attempt to decode the Step protobuf.
212 var step miloProto.Step
213 if err := proto.Unmarshal(dg.Data, &step); err != nil {
214 return &miloerror.Error{
215 Message: "Failed to unmarshal annotation protobuf",
216 Code: http.StatusInternalServerError,
217 }
218 }
219
220 var latestEndedTime time.Time
221 for _, sub := range step.Substep {
222 switch t := sub.Substep.(type) {
223 case *miloProto.Step_Substep_AnnotationStream:
224 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if
225 // specified.
226 log.Warningf(c, "Annotation stream links LogDog substrea m [%+v], not supported!", t.AnnotationStream)
227
228 case *miloProto.Step_Substep_Step:
229 endedTime := t.Step.Ended.Time()
230 if t.Step.Ended != nil && endedTime.After(latestEndedTim e) {
231 latestEndedTime = endedTime
232 }
233 }
234 }
235 if latestEndedTime.IsZero() {
236 // No substep had an ended time :(
237 latestEndedTime = step.Started.Time()
238 }
239
240 // Build our Item.
241 as.item = internal.Item{
242 Step: &step,
243 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI ndex == uint64(resp.State.TerminalIndex)),
244 }
245
246 // Annotee is apparently not putting an ended time on some annotation pr otos.
247 // This hack will ensure that a finished build will always have an ended time.
248 if as.item.Finished && as.item.Step.Ended == nil {
249 as.item.Step.Ended = google.NewTimestamp(latestEndedTime)
250 }
251
252 // Marshal and cache the item. If this is the final protobuf in the stre am,
253 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime.
254 //
255 // If this fails, it is non-fatal.
256 mcData, err := proto.Marshal(&as.item)
257 if err == nil {
258 mcItem = memcache.Get(c).NewItem(mcKey)
259 if !as.item.Finished {
260 mcItem.SetExpiration(intermediateCacheLifetime)
261 }
262 mcItem.SetValue(mcData)
263 if err := memcache.Get(c).Set(mcItem); err != nil {
264 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf Item.")
265 }
266 } else {
267 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf Item.")
268 }
269
270 return nil
271 }
272
273 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil d {
274 prefix, name := as.path.Split()
275
276 // Prepare a Streams object with only one stream.
277 streams := Streams{
278 MainStream: &Stream{
279 Server: as.host,
280 Prefix: string(prefix),
281 Path: string(name),
282 IsDatagram: true,
283 Data: as.item.Step,
284 Closed: as.item.Finished,
285 },
286 }
287
288 var (
289 build resp.MiloBuild
290 ub = logDogURLBuilder{
291 project: as.project,
292 host: as.host,
293 prefix: prefix,
294 }
295 )
296 AddLogDogToBuild(c, &ub, &streams, &build)
297
298 // If we're still building, the duration is the difference between start time
299 // and now.
nodir 2016/07/29 23:00:31 why this comment is here?
dnj 2016/07/29 23:24:43 Done.
300 return &build
301 }
302
303 type logDogURLBuilder struct {
304 host string
305 prefix types.StreamName
306 project config.ProjectName
307 }
308
309 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
310 switch t := l.Value.(type) {
311 case *miloProto.Link_LogdogStream:
312 ls := t.LogdogStream
313
314 server := ls.Server
315 if server == "" {
316 server = b.host
317 }
318
319 prefix := types.StreamName(ls.Prefix)
320 if prefix == "" {
321 prefix = b.prefix
322 }
323
324 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream Name(ls.Name)))
325 u := url.URL{
326 Scheme: "https",
327 Host: server,
328 Path: "v/",
329 RawQuery: url.Values{
330 "s": []string{string(path)},
331 }.Encode(),
332 }
333
334 link := resp.Link{
335 Label: l.Label,
336 URL: u.String(),
337 }
338 if link.Label == "" {
339 link.Label = ls.Name
340 }
341 return &link
342
343 case *miloProto.Link_Url:
344 link := resp.Link{
345 Label: l.Label,
346 URL: t.Url,
347 }
348 if link.Label == "" {
349 link.Label = "unnamed"
350 }
351 return &link
352
353 default:
354 // Don't know how to render.
355 return nil
356 }
357 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698