OLD | NEW |
| (Empty) |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | |
2 // Use of this source code is governed under the Apache License, Version 2.0 | |
3 // that can be found in the LICENSE file. | |
4 | |
5 package raw_presentation | |
6 | |
7 import ( | |
8 "errors" | |
9 "fmt" | |
10 "time" | |
11 | |
12 log "github.com/luci/luci-go/common/logging" | |
13 "github.com/luci/luci-go/common/proto/google" | |
14 miloProto "github.com/luci/luci-go/common/proto/milo" | |
15 "github.com/luci/luci-go/logdog/api/logpb" | |
16 "github.com/luci/luci-go/logdog/client/coordinator" | |
17 "github.com/luci/luci-go/logdog/common/types" | |
18 "github.com/luci/luci-go/logdog/common/viewer" | |
19 "github.com/luci/luci-go/luci_config/common/cfgtypes" | |
20 "github.com/luci/luci-go/milo/api/resp" | |
21 "github.com/luci/luci-go/milo/appengine/job_source/raw_presentation/inte
rnal" | |
22 | |
23 "github.com/golang/protobuf/proto" | |
24 "golang.org/x/net/context" | |
25 ) | |
26 | |
27 const ( | |
28 // intermediateCacheLifetime is the amount of time to cache intermediate
(non- | |
29 // terminal) annotation streams. Terminal annotation streams are cached | |
30 // indefinitely. | |
31 intermediateCacheLifetime = 10 * time.Second | |
32 | |
33 // defaultLogDogHost is the default LogDog host, if one isn't specified
via | |
34 // query string. | |
35 defaultLogDogHost = "luci-logdog.appspot.com" | |
36 ) | |
37 | |
38 // AnnotationStream represents a LogDog annotation protobuf stream. | |
39 type AnnotationStream struct { | |
40 Project cfgtypes.ProjectName | |
41 Path types.StreamPath | |
42 | |
43 // Client is the HTTP client to use for LogDog communication. | |
44 Client *coordinator.Client | |
45 | |
46 // cs is the unmarshalled annotation stream Step and associated data. | |
47 cs internal.CachedStep | |
48 } | |
49 | |
50 // Normalize validates and normalizes the stream's parameters. | |
51 func (as *AnnotationStream) Normalize() error { | |
52 if err := as.Project.Validate(); err != nil { | |
53 return fmt.Errorf("Invalid project name: %s", as.Project) | |
54 } | |
55 | |
56 if err := as.Path.Validate(); err != nil { | |
57 return fmt.Errorf("Invalid log stream path %q: %s", as.Path, err
) | |
58 } | |
59 | |
60 return nil | |
61 } | |
62 | |
63 var errNotMilo = errors.New("Requested stream is not a Milo annotation protobuf"
) | |
64 var errNotDatagram = errors.New("Requested stream is not a datagram stream") | |
65 var errNoEntries = errors.New("Log stream has no annotation entries") | |
66 | |
67 // Fetch loads the annotation stream from LogDog. | |
68 // | |
69 // If the stream does not exist, or is invalid, Fetch will return a Milo error. | |
70 // Otherwise, it will return the Step that was loaded. | |
71 // | |
72 // Fetch caches the step, so multiple calls to Fetch will return the same Step | |
73 // value. | |
74 func (as *AnnotationStream) Fetch(c context.Context) (*miloProto.Step, error) { | |
75 // Cached? | |
76 if as.cs.Step != nil { | |
77 return as.cs.Step, nil | |
78 } | |
79 | |
80 // Load from LogDog directly. | |
81 log.Fields{ | |
82 "host": as.Client.Host, | |
83 "project": as.Project, | |
84 "path": as.Path, | |
85 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") | |
86 | |
87 var ( | |
88 state coordinator.LogStream | |
89 stream = as.Client.Stream(as.Project, as.Path) | |
90 ) | |
91 | |
92 le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com
plete()) | |
93 if err != nil { | |
94 log.WithError(err).Errorf(c, "Failed to load stream.") | |
95 return nil, err | |
96 } | |
97 | |
98 // Make sure that this is an annotation stream. | |
99 switch { | |
100 case state.Desc.ContentType != miloProto.ContentTypeAnnotations: | |
101 return nil, errNotMilo | |
102 | |
103 case state.Desc.StreamType != logpb.StreamType_DATAGRAM: | |
104 return nil, errNotDatagram | |
105 | |
106 case le == nil: | |
107 // No annotation stream data, so render a minimal page. | |
108 return nil, errNoEntries | |
109 } | |
110 | |
111 // Get the last log entry in the stream. In reality, this will be index
0, | |
112 // since the "Tail" call should only return one log entry. | |
113 // | |
114 // Because we supplied the "Complete" flag to Tail and suceeded, this da
tagram | |
115 // will be complete even if its source datagram(s) are fragments. | |
116 dg := le.GetDatagram() | |
117 if dg == nil { | |
118 return nil, errors.New("Datagram stream does not have datagram d
ata") | |
119 } | |
120 | |
121 // Attempt to decode the Step protobuf. | |
122 var step miloProto.Step | |
123 if err := proto.Unmarshal(dg.Data, &step); err != nil { | |
124 return nil, err | |
125 } | |
126 | |
127 var latestEndedTime time.Time | |
128 for _, sub := range step.Substep { | |
129 switch t := sub.Substep.(type) { | |
130 case *miloProto.Step_Substep_AnnotationStream: | |
131 // TODO(hinoka,dnj): Implement recursive / embedded subs
tream fetching if | |
132 // specified. | |
133 log.Warningf(c, "Annotation stream links LogDog substrea
m [%+v], not supported!", t.AnnotationStream) | |
134 | |
135 case *miloProto.Step_Substep_Step: | |
136 endedTime := google.TimeFromProto(t.Step.Ended) | |
137 if t.Step.Ended != nil && endedTime.After(latestEndedTim
e) { | |
138 latestEndedTime = endedTime | |
139 } | |
140 } | |
141 } | |
142 if latestEndedTime.IsZero() { | |
143 // No substep had an ended time :( | |
144 latestEndedTime = google.TimeFromProto(step.Started) | |
145 } | |
146 | |
147 // Build our CachedStep. | |
148 as.cs = internal.CachedStep{ | |
149 Step: &step, | |
150 Finished: (state.State.TerminalIndex >= 0 && le.StreamIndex == u
int64(state.State.TerminalIndex)), | |
151 } | |
152 return as.cs.Step, nil | |
153 } | |
154 | |
155 func (as *AnnotationStream) toMiloBuild(c context.Context) *resp.MiloBuild { | |
156 prefix, name := as.Path.Split() | |
157 | |
158 // Prepare a Streams object with only one stream. | |
159 streams := Streams{ | |
160 MainStream: &Stream{ | |
161 Server: as.Client.Host, | |
162 Prefix: string(prefix), | |
163 Path: string(name), | |
164 IsDatagram: true, | |
165 Data: as.cs.Step, | |
166 Closed: as.cs.Finished, | |
167 }, | |
168 } | |
169 | |
170 var ( | |
171 build resp.MiloBuild | |
172 ub = ViewerURLBuilder{ | |
173 Host: as.Client.Host, | |
174 Project: as.Project, | |
175 Prefix: prefix, | |
176 } | |
177 ) | |
178 AddLogDogToBuild(c, &ub, streams.MainStream.Data, &build) | |
179 return &build | |
180 } | |
181 | |
182 // ViewerURLBuilder is a URL builder that constructs LogDog viewer URLs. | |
183 type ViewerURLBuilder struct { | |
184 Host string | |
185 Prefix types.StreamName | |
186 Project cfgtypes.ProjectName | |
187 } | |
188 | |
189 // BuildLink implements URLBuilder. | |
190 func (b *ViewerURLBuilder) BuildLink(l *miloProto.Link) *resp.Link { | |
191 switch t := l.Value.(type) { | |
192 case *miloProto.Link_LogdogStream: | |
193 ls := t.LogdogStream | |
194 | |
195 server := ls.Server | |
196 if server == "" { | |
197 server = b.Host | |
198 } | |
199 | |
200 prefix := types.StreamName(ls.Prefix) | |
201 if prefix == "" { | |
202 prefix = b.Prefix | |
203 } | |
204 | |
205 u := viewer.GetURL(server, b.Project, prefix.Join(types.StreamNa
me(ls.Name))) | |
206 link := resp.NewLink(l.Label, u) | |
207 if link.Label == "" { | |
208 link.Label = ls.Name | |
209 } | |
210 return link | |
211 | |
212 case *miloProto.Link_Url: | |
213 link := resp.NewLink(l.Label, t.Url) | |
214 if link.Label == "" { | |
215 link.Label = "unnamed" | |
216 } | |
217 return link | |
218 | |
219 default: | |
220 // Don't know how to render. | |
221 return nil | |
222 } | |
223 } | |
OLD | NEW |