Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1012)

Side by Side Diff: appengine/cmd/milo/logdog/build.go

Issue 2191693003: Milo: Add LogDog annotation stream support. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Rebarse Created 4 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/cmd/milo/frontend/milo.go ('k') | appengine/cmd/milo/logdog/http.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file.
4
5 package logdog
6
7 import (
8 "errors"
9 "fmt"
10 "net/http"
11 "net/url"
12 "strings"
13 "time"
14
15 "github.com/luci/luci-go/appengine/cmd/milo/logdog/internal"
16 "github.com/luci/luci-go/appengine/cmd/milo/miloerror"
17 "github.com/luci/luci-go/appengine/cmd/milo/resp"
18 "github.com/luci/luci-go/common/config"
19 "github.com/luci/luci-go/common/grpcutil"
20 log "github.com/luci/luci-go/common/logging"
21 "github.com/luci/luci-go/common/proto/google"
22 miloProto "github.com/luci/luci-go/common/proto/milo"
23 "github.com/luci/luci-go/common/prpc"
24 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
25 "github.com/luci/luci-go/logdog/api/logpb"
26 "github.com/luci/luci-go/logdog/common/types"
27
28 "github.com/golang/protobuf/proto"
29 "github.com/luci/gae/service/memcache"
30 "golang.org/x/net/context"
31 "google.golang.org/grpc/codes"
32 )
33
34 const (
35 // intermediateCacheLifetime is the amount of time to cache intermediate (non-
36 // terminal) annotation streams. Terminal annotation streams are cached
37 // indefinitely.
38 intermediateCacheLifetime = 10 * time.Second
39
40 // defaultLogDogHost is the default LogDog host, if one isn't specified via
41 // query string.
42 defaultLogDogHost = "luci-logdog.appspot.com"
43 )
44
45 type annotationStreamRequest struct {
46 *AnnotationStream
47
48 // host is the name of the LogDog host.
49 host string
50
51 project config.ProjectName
52 path types.StreamPath
53
54 // logDogClient is the HTTP client to use for LogDog communication.
55 logDogClient http.Client
56
57 // cs is the unmarshalled annotation stream Step and associated data.
58 cs internal.CachedStep
59 }
60
61 func (as *annotationStreamRequest) normalize() error {
62 if err := as.project.Validate(); err != nil {
63 return &miloerror.Error{
64 Message: "Invalid project name",
65 Code: http.StatusBadRequest,
66 }
67 }
68
69 if err := as.path.Validate(); err != nil {
70 return &miloerror.Error{
71 Message: fmt.Sprintf("Invalid log stream path %q: %s", a s.path, err),
72 Code: http.StatusBadRequest,
73 }
74 }
75
76 // Get the host. We normalize it to lowercase and trim spaces since we u se
77 // it as a memcache key.
78 as.host = strings.ToLower(strings.TrimSpace(as.host))
79 if as.host == "" {
80 as.host = defaultLogDogHost
81 }
82 if strings.ContainsRune(as.host, '/') {
83 return errors.New("invalid host name")
84 }
85
86 return nil
87 }
88
89 func (as *annotationStreamRequest) memcacheKey() string {
90 return fmt.Sprintf("logdog/%s/%s/%s", as.host, as.project, as.path)
91 }
92
93 func (as *annotationStreamRequest) load(c context.Context) error {
94 // Load from memcache, if possible. If an error occurs, we will proceed as if
95 // no CachedStep was available.
96 mcKey := as.memcacheKey()
97 mcItem, err := memcache.Get(c).Get(mcKey)
98 switch err {
99 case nil:
100 if err := proto.Unmarshal(mcItem.Value(), &as.cs); err == nil {
101 return nil
102 }
103
104 // We could not unmarshal the cached value. Try and delete it fr om
105 // memcache, since it's invalid.
106 log.Fields{
107 log.ErrorKey: err,
108 "memcacheKey": mcKey,
109 }.Warningf(c, "Failed to unmarshal cached annotation protobuf.")
110 if err := memcache.Get(c).Delete(mcKey); err != nil {
111 log.WithError(err).Warningf(c, "Failed to delete invalid annotation protobuf memcache entry.")
112 }
113
114 case memcache.ErrCacheMiss:
115 break
116
117 default:
118 log.Fields{
119 log.ErrorKey: err,
120 "memcacheKey": mcKey,
121 }.Errorf(c, "Failed to load annotation protobuf memcache cached step.")
122 }
123
124 // Load from LogDog directly.
125 client := logdog.NewLogsPRPCClient(&prpc.Client{
126 C: &as.logDogClient,
127 Host: as.host,
128 })
129
130 log.Fields{
131 "project": as.project,
132 "path": as.path,
133 "host": as.host,
134 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.")
135 resp, err := client.Tail(c, &logdog.TailRequest{
136 Project: string(as.project),
137 Path: string(as.path),
138 State: true,
139 })
140 switch code := grpcutil.Code(err); code {
141 case codes.OK:
142 break
143
144 case codes.NotFound:
145 return &miloerror.Error{
146 Message: "Stream not found",
147 Code: http.StatusNotFound,
148 }
149
150 default:
151 // TODO: Once we switch to delegation tokens and are making the request on
152 // behalf of a user rather than the Milo service, handle Permiss ionDenied.
153 log.Fields{
154 log.ErrorKey: err,
155 "code": code,
156 }.Errorf(c, "Failed to load LogDog annotation stream.")
157 return &miloerror.Error{
158 Message: "Failed to load stream",
159 Code: http.StatusInternalServerError,
160 }
161 }
162
163 // Make sure that this is an annotation stream.
164 switch {
165 case resp.Desc.ContentType != miloProto.ContentTypeAnnotations:
166 return &miloerror.Error{
167 Message: "Requested stream is not a Milo annotation prot obuf",
168 Code: http.StatusBadRequest,
169 }
170
171 case resp.Desc.StreamType != logpb.StreamType_DATAGRAM:
172 return &miloerror.Error{
173 Message: "Requested stream is not a datagram stream",
174 Code: http.StatusBadRequest,
175 }
176
177 case len(resp.Logs) == 0:
178 // No annotation stream data, so render a minimal page.
179 return nil
180 }
181
182 // Get the last log entry in the stream. In reality, this will be index 0,
183 // since the "Tail" call should only return one log entry.
184 latestStream := resp.Logs[len(resp.Logs)-1]
185 dg := latestStream.GetDatagram()
186 switch {
187 case dg == nil:
188 return &miloerror.Error{
189 Message: "Datagram stream does not have datagram data",
190 Code: http.StatusInternalServerError,
191 }
192
193 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
194 // LogDog splits large datagrams into consecutive fragments. If the
195 // annotation state is fragmented, a reconstruction algorithm wi ll have to
196 // be employed here to build the full datagram before processing .
197 //
198 // At the moment, no annotation streams are expected to be anywh ere close to
199 // this large, so we're going to handle this case by erroring. A
200 // reconstruction algorithm would look like:
201 // 1) "Tail" to get the latest datagram, identify it as partial.
202 // 1a) Perform a bounds check on the total datagram size to ensu re that it
203 // can be safely reconstructed.
204 // 2) Determine if it's the last partial index. If not, then the latest
205 // datagram is incomplete. Determine our initial datagram's stream index
206 // the by subtracting the partial index from this message's stream index.
207 // 2a) If this datagram index is "0", the first datagram in the stream is
208 // partial and all of the data isn't here, so treat this as "no data".
209 // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus
210 // one to get the previous datagram.
211 // 3) Issue a "Get" request for our initial datagram index throu gh the index
212 // preceding ours.
213 // 4) Reassemble the binary data from the full set of datagrams.
214 return &miloerror.Error{
215 Message: "Partial datagram streams are not supported yet ",
216 Code: http.StatusNotImplemented,
217 }
218 }
219
220 // Attempt to decode the Step protobuf.
221 var step miloProto.Step
222 if err := proto.Unmarshal(dg.Data, &step); err != nil {
223 return &miloerror.Error{
224 Message: "Failed to unmarshal annotation protobuf",
225 Code: http.StatusInternalServerError,
226 }
227 }
228
229 var latestEndedTime time.Time
230 for _, sub := range step.Substep {
231 switch t := sub.Substep.(type) {
232 case *miloProto.Step_Substep_AnnotationStream:
233 // TODO(hinoka,dnj): Implement recursive / embedded subs tream fetching if
234 // specified.
235 log.Warningf(c, "Annotation stream links LogDog substrea m [%+v], not supported!", t.AnnotationStream)
236
237 case *miloProto.Step_Substep_Step:
238 endedTime := t.Step.Ended.Time()
239 if t.Step.Ended != nil && endedTime.After(latestEndedTim e) {
240 latestEndedTime = endedTime
241 }
242 }
243 }
244 if latestEndedTime.IsZero() {
245 // No substep had an ended time :(
246 latestEndedTime = step.Started.Time()
247 }
248
249 // Build our CachedStep.
250 as.cs = internal.CachedStep{
251 Step: &step,
252 Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI ndex == uint64(resp.State.TerminalIndex)),
253 }
254
255 // Annotee is apparently not putting an ended time on some annotation pr otos.
256 // This hack will ensure that a finished build will always have an ended time.
257 if as.cs.Finished && as.cs.Step.Ended == nil {
258 as.cs.Step.Ended = google.NewTimestamp(latestEndedTime)
259 }
260
261 // Marshal and cache the step. If this is the final protobuf in the stre am,
262 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime.
263 //
264 // If this fails, it is non-fatal.
265 mcData, err := proto.Marshal(&as.cs)
266 if err == nil {
267 mcItem = memcache.Get(c).NewItem(mcKey)
268 if !as.cs.Finished {
269 mcItem.SetExpiration(intermediateCacheLifetime)
270 }
271 mcItem.SetValue(mcData)
272 if err := memcache.Get(c).Set(mcItem); err != nil {
273 log.WithError(err).Warningf(c, "Failed to cache annotati on protobuf CachedStep.")
274 }
275 } else {
276 log.WithError(err).Warningf(c, "Failed to marshal annotation pro tobuf CachedStep.")
277 }
278
279 return nil
280 }
281
282 func (as *annotationStreamRequest) toMiloBuild(c context.Context) *resp.MiloBuil d {
283 prefix, name := as.path.Split()
284
285 // Prepare a Streams object with only one stream.
286 streams := Streams{
287 MainStream: &Stream{
288 Server: as.host,
289 Prefix: string(prefix),
290 Path: string(name),
291 IsDatagram: true,
292 Data: as.cs.Step,
293 Closed: as.cs.Finished,
294 },
295 }
296
297 var (
298 build resp.MiloBuild
299 ub = logDogURLBuilder{
300 project: as.project,
301 host: as.host,
302 prefix: prefix,
303 }
304 )
305 AddLogDogToBuild(c, &ub, &streams, &build)
306 return &build
307 }
308
309 type logDogURLBuilder struct {
310 host string
311 prefix types.StreamName
312 project config.ProjectName
313 }
314
315 func (b *logDogURLBuilder) BuildLink(l *miloProto.Link) *resp.Link {
316 switch t := l.Value.(type) {
317 case *miloProto.Link_LogdogStream:
318 ls := t.LogdogStream
319
320 server := ls.Server
321 if server == "" {
322 server = b.host
323 }
324
325 prefix := types.StreamName(ls.Prefix)
326 if prefix == "" {
327 prefix = b.prefix
328 }
329
330 path := fmt.Sprintf("%s/%s", b.project, prefix.Join(types.Stream Name(ls.Name)))
331 u := url.URL{
332 Scheme: "https",
333 Host: server,
334 Path: "v/",
335 RawQuery: url.Values{
336 "s": []string{string(path)},
337 }.Encode(),
338 }
339
340 link := resp.Link{
341 Label: l.Label,
342 URL: u.String(),
343 }
344 if link.Label == "" {
345 link.Label = ls.Name
346 }
347 return &link
348
349 case *miloProto.Link_Url:
350 link := resp.Link{
351 Label: l.Label,
352 URL: t.Url,
353 }
354 if link.Label == "" {
355 link.Label = "unnamed"
356 }
357 return &link
358
359 default:
360 // Don't know how to render.
361 return nil
362 }
363 }
OLDNEW
« no previous file with comments | « appengine/cmd/milo/frontend/milo.go ('k') | appengine/cmd/milo/logdog/http.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698