Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(140)

Side by Side Diff: milo/appengine/logdog/build.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/client/coordinator/stream_test.go ('k') | milo/appengine/logdog/http.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The LUCI Authors. All rights reserved. 1 // Copyright 2016 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logdog 5 package logdog
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "fmt" 9 "fmt"
10 "net/http" 10 "net/http"
11 "net/url" 11 "net/url"
12 "strings" 12 "strings"
13 "time" 13 "time"
14 14
15 "github.com/luci/luci-go/common/config" 15 "github.com/luci/luci-go/common/config"
16 log "github.com/luci/luci-go/common/logging" 16 log "github.com/luci/luci-go/common/logging"
17 "github.com/luci/luci-go/common/proto/google" 17 "github.com/luci/luci-go/common/proto/google"
18 miloProto "github.com/luci/luci-go/common/proto/milo" 18 miloProto "github.com/luci/luci-go/common/proto/milo"
19 "github.com/luci/luci-go/grpc/grpcutil" 19 "github.com/luci/luci-go/grpc/grpcutil"
20 "github.com/luci/luci-go/grpc/prpc"
21 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
22 "github.com/luci/luci-go/logdog/api/logpb" 20 "github.com/luci/luci-go/logdog/api/logpb"
21 "github.com/luci/luci-go/logdog/client/coordinator"
23 "github.com/luci/luci-go/logdog/common/types" 22 "github.com/luci/luci-go/logdog/common/types"
24 "github.com/luci/luci-go/milo/api/resp" 23 "github.com/luci/luci-go/milo/api/resp"
25 "github.com/luci/luci-go/milo/appengine/logdog/internal" 24 "github.com/luci/luci-go/milo/appengine/logdog/internal"
26 "github.com/luci/luci-go/milo/common/miloerror" 25 "github.com/luci/luci-go/milo/common/miloerror"
27 26
28 "github.com/golang/protobuf/proto" 27 "github.com/golang/protobuf/proto"
29 mc "github.com/luci/gae/service/memcache" 28 mc "github.com/luci/gae/service/memcache"
30 "golang.org/x/net/context" 29 "golang.org/x/net/context"
31 "google.golang.org/grpc/codes" 30 "google.golang.org/grpc/codes"
32 ) 31 )
(...skipping 12 matching lines...) Expand all
45 type annotationStreamRequest struct { 44 type annotationStreamRequest struct {
46 *AnnotationStream 45 *AnnotationStream
47 46
48 // host is the name of the LogDog host. 47 // host is the name of the LogDog host.
49 host string 48 host string
50 49
51 project config.ProjectName 50 project config.ProjectName
52 path types.StreamPath 51 path types.StreamPath
53 52
54 // logDogClient is the HTTP client to use for LogDog communication. 53 // logDogClient is the HTTP client to use for LogDog communication.
55 » logDogClient http.Client 54 » logDogClient *coordinator.Client
56 55
57 // cs is the unmarshalled annotation stream Step and associated data. 56 // cs is the unmarshalled annotation stream Step and associated data.
58 cs internal.CachedStep 57 cs internal.CachedStep
59 } 58 }
60 59
61 func (as *annotationStreamRequest) normalize() error { 60 func (as *annotationStreamRequest) normalize() error {
62 if err := as.project.Validate(); err != nil { 61 if err := as.project.Validate(); err != nil {
63 return &miloerror.Error{ 62 return &miloerror.Error{
64 Message: "Invalid project name", 63 Message: "Invalid project name",
65 Code: http.StatusBadRequest, 64 Code: http.StatusBadRequest,
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
115 break 114 break
116 115
117 default: 116 default:
118 log.Fields{ 117 log.Fields{
119 log.ErrorKey: err, 118 log.ErrorKey: err,
120 "memcacheKey": mcKey, 119 "memcacheKey": mcKey,
121 }.Errorf(c, "Failed to load annotation protobuf memcache cached step.") 120 }.Errorf(c, "Failed to load annotation protobuf memcache cached step.")
122 } 121 }
123 122
124 // Load from LogDog directly. 123 // Load from LogDog directly.
125 client := logdog.NewLogsPRPCClient(&prpc.Client{
126 C: &as.logDogClient,
127 Host: as.host,
128 })
129
130 log.Fields{ 124 log.Fields{
131 "project": as.project, 125 "project": as.project,
132 "path": as.path, 126 "path": as.path,
133 "host": as.host, 127 "host": as.host,
134 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.") 128 }.Infof(c, "Making tail request to LogDog to fetch annotation stream.")
135 » resp, err := client.Tail(c, &logdog.TailRequest{ 129
136 » » Project: string(as.project), 130 » var (
137 » » Path: string(as.path), 131 » » state coordinator.LogStream
138 » » State: true, 132 » » stream = as.logDogClient.Stream(as.project, as.path)
139 » }) 133 » )
134 » le, err := stream.Tail(c, coordinator.WithState(&state), coordinator.Com plete())
140 switch code := grpcutil.Code(err); code { 135 switch code := grpcutil.Code(err); code {
141 case codes.OK: 136 case codes.OK:
142 break 137 break
143 138
144 case codes.NotFound: 139 case codes.NotFound:
145 return &miloerror.Error{ 140 return &miloerror.Error{
146 Message: "Stream not found", 141 Message: "Stream not found",
147 Code: http.StatusNotFound, 142 Code: http.StatusNotFound,
148 } 143 }
149 144
150 default: 145 default:
151 // TODO: Once we switch to delegation tokens and are making the request on 146 // TODO: Once we switch to delegation tokens and are making the request on
152 // behalf of a user rather than the Milo service, handle Permiss ionDenied. 147 // behalf of a user rather than the Milo service, handle Permiss ionDenied.
153 log.Fields{ 148 log.Fields{
154 log.ErrorKey: err, 149 log.ErrorKey: err,
155 "code": code, 150 "code": code,
156 }.Errorf(c, "Failed to load LogDog annotation stream.") 151 }.Errorf(c, "Failed to load LogDog annotation stream.")
157 return &miloerror.Error{ 152 return &miloerror.Error{
158 Message: "Failed to load stream", 153 Message: "Failed to load stream",
159 Code: http.StatusInternalServerError, 154 Code: http.StatusInternalServerError,
160 } 155 }
161 } 156 }
162 157
163 // Make sure that this is an annotation stream. 158 // Make sure that this is an annotation stream.
164 switch { 159 switch {
165 » case resp.Desc.ContentType != miloProto.ContentTypeAnnotations: 160 » case state.Desc.ContentType != miloProto.ContentTypeAnnotations:
166 return &miloerror.Error{ 161 return &miloerror.Error{
167 Message: "Requested stream is not a Milo annotation prot obuf", 162 Message: "Requested stream is not a Milo annotation prot obuf",
168 Code: http.StatusBadRequest, 163 Code: http.StatusBadRequest,
169 } 164 }
170 165
171 » case resp.Desc.StreamType != logpb.StreamType_DATAGRAM: 166 » case state.Desc.StreamType != logpb.StreamType_DATAGRAM:
172 return &miloerror.Error{ 167 return &miloerror.Error{
173 Message: "Requested stream is not a datagram stream", 168 Message: "Requested stream is not a datagram stream",
174 Code: http.StatusBadRequest, 169 Code: http.StatusBadRequest,
175 } 170 }
176 171
177 » case len(resp.Logs) == 0: 172 » case le == nil:
178 // No annotation stream data, so render a minimal page. 173 // No annotation stream data, so render a minimal page.
179 return nil 174 return nil
180 } 175 }
181 176
182 // Get the last log entry in the stream. In reality, this will be index 0, 177 // Get the last log entry in the stream. In reality, this will be index 0,
183 // since the "Tail" call should only return one log entry. 178 // since the "Tail" call should only return one log entry.
184 » latestStream := resp.Logs[len(resp.Logs)-1] 179 » //
185 » dg := latestStream.GetDatagram() 180 » // Because we supplied the "Complete" flag to Tail and suceeded, this da tagram
186 » switch { 181 » // will be complete even if its source datagram(s) are fragments.
187 » case dg == nil: 182 » dg := le.GetDatagram()
183 » if dg == nil {
188 return &miloerror.Error{ 184 return &miloerror.Error{
189 Message: "Datagram stream does not have datagram data", 185 Message: "Datagram stream does not have datagram data",
190 Code: http.StatusInternalServerError, 186 Code: http.StatusInternalServerError,
191 } 187 }
192
193 case dg.Partial != nil && !(dg.Partial.Index == 0 && dg.Partial.Last):
194 // LogDog splits large datagrams into consecutive fragments. If the
195 // annotation state is fragmented, a reconstruction algorithm wi ll have to
196 // be employed here to build the full datagram before processing .
197 //
198 // At the moment, no annotation streams are expected to be anywh ere close to
199 // this large, so we're going to handle this case by erroring. A
200 // reconstruction algorithm would look like:
201 // 1) "Tail" to get the latest datagram, identify it as partial.
202 // 1a) Perform a bounds check on the total datagram size to ensu re that it
203 // can be safely reconstructed.
204 // 2) Determine if it's the last partial index. If not, then the latest
205 // datagram is incomplete. Determine our initial datagram's stream index
206 // the by subtracting the partial index from this message's stream index.
207 // 2a) If this datagram index is "0", the first datagram in the stream is
208 // partial and all of the data isn't here, so treat this as "no data".
209 // 2b) Otherwise, goto (1), using "Get" request on the datagram index minus
210 // one to get the previous datagram.
211 // 3) Issue a "Get" request for our initial datagram index throu gh the index
212 // preceding ours.
213 // 4) Reassemble the binary data from the full set of datagrams.
214 return &miloerror.Error{
215 Message: "Partial datagram streams are not supported yet ",
216 Code: http.StatusNotImplemented,
217 }
218 } 188 }
219 189
220 // Attempt to decode the Step protobuf. 190 // Attempt to decode the Step protobuf.
221 var step miloProto.Step 191 var step miloProto.Step
222 if err := proto.Unmarshal(dg.Data, &step); err != nil { 192 if err := proto.Unmarshal(dg.Data, &step); err != nil {
223 return &miloerror.Error{ 193 return &miloerror.Error{
224 Message: "Failed to unmarshal annotation protobuf", 194 Message: "Failed to unmarshal annotation protobuf",
225 Code: http.StatusInternalServerError, 195 Code: http.StatusInternalServerError,
226 } 196 }
227 } 197 }
(...skipping 14 matching lines...) Expand all
242 } 212 }
243 } 213 }
244 if latestEndedTime.IsZero() { 214 if latestEndedTime.IsZero() {
245 // No substep had an ended time :( 215 // No substep had an ended time :(
246 latestEndedTime = step.Started.Time() 216 latestEndedTime = step.Started.Time()
247 } 217 }
248 218
249 // Build our CachedStep. 219 // Build our CachedStep.
250 as.cs = internal.CachedStep{ 220 as.cs = internal.CachedStep{
251 Step: &step, 221 Step: &step,
252 » » Finished: (resp.State.TerminalIndex >= 0 && latestStream.StreamI ndex == uint64(resp.State.TerminalIndex)), 222 » » Finished: (state.State.TerminalIndex >= 0 && le.StreamIndex == u int64(state.State.TerminalIndex)),
253 } 223 }
254 224
255 // Annotee is apparently not putting an ended time on some annotation pr otos. 225 // Annotee is apparently not putting an ended time on some annotation pr otos.
256 // This hack will ensure that a finished build will always have an ended time. 226 // This hack will ensure that a finished build will always have an ended time.
257 if as.cs.Finished && as.cs.Step.Ended == nil { 227 if as.cs.Finished && as.cs.Step.Ended == nil {
258 as.cs.Step.Ended = google.NewTimestamp(latestEndedTime) 228 as.cs.Step.Ended = google.NewTimestamp(latestEndedTime)
259 } 229 }
260 230
261 // Marshal and cache the step. If this is the final protobuf in the stre am, 231 // Marshal and cache the step. If this is the final protobuf in the stre am,
262 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime. 232 // cache it indefinitely; otherwise, cache it for intermediateCacheLifet ime.
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
354 if link.Label == "" { 324 if link.Label == "" {
355 link.Label = "unnamed" 325 link.Label = "unnamed"
356 } 326 }
357 return &link 327 return &link
358 328
359 default: 329 default:
360 // Don't know how to render. 330 // Don't know how to render.
361 return nil 331 return nil
362 } 332 }
363 } 333 }
OLDNEW
« no previous file with comments | « logdog/client/coordinator/stream_test.go ('k') | milo/appengine/logdog/http.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698