Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(126)

Side by Side Diff: logdog/client/coordinator/stream_test.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « logdog/client/coordinator/stream_params.go ('k') | milo/appengine/logdog/build.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "testing" 9 "testing"
10 10
11 "github.com/luci/luci-go/common/clock/testclock" 11 "github.com/luci/luci-go/common/clock/testclock"
12 "github.com/luci/luci-go/common/proto/google" 12 "github.com/luci/luci-go/common/proto/google"
13 "github.com/luci/luci-go/common/testing/prpctest" 13 "github.com/luci/luci-go/common/testing/prpctest"
14 "github.com/luci/luci-go/grpc/grpcutil" 14 "github.com/luci/luci-go/grpc/grpcutil"
15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" 15 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
16 "github.com/luci/luci-go/logdog/api/logpb" 16 "github.com/luci/luci-go/logdog/api/logpb"
17
17 "golang.org/x/net/context" 18 "golang.org/x/net/context"
19 "google.golang.org/grpc/codes"
18 20
19 . "github.com/luci/luci-go/common/testing/assertions" 21 . "github.com/luci/luci-go/common/testing/assertions"
20 . "github.com/smartystreets/goconvey/convey" 22 . "github.com/smartystreets/goconvey/convey"
21 ) 23 )
22 24
23 func genLog(idx int64, id string) *logpb.LogEntry { 25 func genLog(idx int64, id string) *logpb.LogEntry {
24 return &logpb.LogEntry{ 26 return &logpb.LogEntry{
25 StreamIndex: uint64(idx), 27 StreamIndex: uint64(idx),
26 Content: &logpb.LogEntry_Text{ 28 Content: &logpb.LogEntry_Text{
27 Text: &logpb.Text{ 29 Text: &logpb.Text{
28 Lines: []*logpb.Text_Line{ 30 Lines: []*logpb.Text_Line{
29 {Value: id}, 31 {Value: id},
30 }, 32 },
31 }, 33 },
32 }, 34 },
33 } 35 }
34 } 36 }
35 37
38 func genDG(idx int64, content ...string) []*logpb.LogEntry {
39 var contentSize uint64
40 if len(content) > 1 {
41 for _, c := range content {
42 contentSize += uint64(len(c))
43 }
44 }
45
46 logs := make([]*logpb.LogEntry, len(content))
47 for i, c := range content {
48 dg := logpb.Datagram{
49 Data: []byte(c),
50 }
51 if len(content) > 1 {
52 dg.Partial = &logpb.Datagram_Partial{
53 Index: uint32(i),
54 Size: contentSize,
55 Last: (i == len(content)-1),
56 }
57 }
58
59 logs[i] = &logpb.LogEntry{
60 StreamIndex: uint64(idx + int64(i)),
61 Content: &logpb.LogEntry_Datagram{&dg},
62 }
63 }
64 return logs
65 }
66
36 // testStreamLogsService implements just the Get and Tail endpoints, 67 // testStreamLogsService implements just the Get and Tail endpoints,
37 // instrumented for testing. 68 // instrumented for testing.
38 type testStreamLogsService struct { 69 type testStreamLogsService struct {
39 testLogsServiceBase 70 testLogsServiceBase
40 71
41 // Get 72 // Get
42 GR logdog.GetRequest 73 GR logdog.GetRequest
43 GH func(*logdog.GetRequest) (*logdog.GetResponse, error) 74 GH func(*logdog.GetRequest) (*logdog.GetResponse, error)
44 75
45 // Tail 76 // Tail
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
83 panic(err) 114 panic(err)
84 } 115 }
85 client := Client{ 116 client := Client{
86 C: logdog.NewLogsPRPCClient(prpcClient), 117 C: logdog.NewLogsPRPCClient(prpcClient),
87 } 118 }
88 119
89 Convey(`Can bind a Stream`, func() { 120 Convey(`Can bind a Stream`, func() {
90 s := client.Stream("myproj", "test/+/a") 121 s := client.Stream("myproj", "test/+/a")
91 122
92 Convey(`Test Get`, func() { 123 Convey(`Test Get`, func() {
93 p := NewGetParams()
94
95 Convey(`A default Get query will return logs and no state.`, func() { 124 Convey(`A default Get query will return logs and no state.`, func() {
96 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 125 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
97 return &logdog.GetResponse{ 126 return &logdog.GetResponse{
98 Logs: []*logpb.LogEntry{ 127 Logs: []*logpb.LogEntry{
99 genLog(1337, "oh ai"), 128 genLog(1337, "oh ai"),
100 genLog(1338, "kt hxbye"), 129 genLog(1338, "kt hxbye"),
101 }, 130 },
102 }, nil 131 }, nil
103 } 132 }
104 133
105 » » » » » l, err := s.Get(c, nil) 134 » » » » » l, err := s.Get(c)
106 So(err, ShouldBeNil) 135 So(err, ShouldBeNil)
107 So(l, ShouldResemble, []*logpb.LogEntry{ genLog(1337, "ohai"), genLog(1338, "kthxbye")}) 136 So(l, ShouldResemble, []*logpb.LogEntry{ genLog(1337, "ohai"), genLog(1338, "kthxbye")})
108 137
109 // Validate the correct parameters were sent. 138 // Validate the correct parameters were sent.
110 So(svc.GR, ShouldResemble, logdog.GetReq uest{ 139 So(svc.GR, ShouldResemble, logdog.GetReq uest{
111 Project: "myproj", 140 Project: "myproj",
112 Path: "test/+/a", 141 Path: "test/+/a",
113 }) 142 })
114 }) 143 })
115 144
116 Convey(`Will form a proper Get logs query.`, fun c() { 145 Convey(`Will form a proper Get logs query.`, fun c() {
117 p = p.NonContiguous().Index(1)
118
119 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 146 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
120 return &logdog.GetResponse{}, ni l 147 return &logdog.GetResponse{}, ni l
121 } 148 }
122 149
123 » » » » » l, err := s.Get(c, p) 150 » » » » » l, err := s.Get(c, NonContiguous(), Inde x(1))
124 So(err, ShouldBeNil) 151 So(err, ShouldBeNil)
125 So(l, ShouldBeNil) 152 So(l, ShouldBeNil)
126 153
127 // Validate the correct parameters were sent. 154 // Validate the correct parameters were sent.
128 So(svc.GR, ShouldResemble, logdog.GetReq uest{ 155 So(svc.GR, ShouldResemble, logdog.GetReq uest{
129 Project: "myproj", 156 Project: "myproj",
130 Path: "test/+/a", 157 Path: "test/+/a",
131 NonContiguous: true, 158 NonContiguous: true,
132 Index: 1, 159 Index: 1,
133 }) 160 })
134 }) 161 })
135 162
136 Convey(`Will request a specific number of logs i f a constraint is supplied.`, func() { 163 Convey(`Will request a specific number of logs i f a constraint is supplied.`, func() {
137 p = p.Limit(32, 64)
138
139 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 164 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
140 return &logdog.GetResponse{ 165 return &logdog.GetResponse{
141 Logs: []*logpb.LogEntry{ 166 Logs: []*logpb.LogEntry{
142 genLog(1337, "oh ai"), 167 genLog(1337, "oh ai"),
143 }, 168 },
144 }, nil 169 }, nil
145 } 170 }
146 171
147 » » » » » l, err := s.Get(c, p) 172 » » » » » l, err := s.Get(c, LimitCount(64), Limit Bytes(32))
148 So(err, ShouldBeNil) 173 So(err, ShouldBeNil)
149 So(l, ShouldResemble, []*logpb.LogEntry{ genLog(1337, "ohai")}) 174 So(l, ShouldResemble, []*logpb.LogEntry{ genLog(1337, "ohai")})
150 175
151 // Validate the HTTP request that we mad e. 176 // Validate the HTTP request that we mad e.
152 So(svc.GR, ShouldResemble, logdog.GetReq uest{ 177 So(svc.GR, ShouldResemble, logdog.GetReq uest{
153 Project: "myproj", 178 Project: "myproj",
154 Path: "test/+/a", 179 Path: "test/+/a",
155 LogCount: 64, 180 LogCount: 64,
156 ByteCount: 32, 181 ByteCount: 32,
157 }) 182 })
158 }) 183 })
159 184
160 Convey(`Can decode a full protobuf and state.`, func() { 185 Convey(`Can decode a full protobuf and state.`, func() {
161 var ls LogStream
162 p = p.State(&ls)
163
164 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 186 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
165 return &logdog.GetResponse{ 187 return &logdog.GetResponse{
166 Logs: []*logpb.LogEntry{ 188 Logs: []*logpb.LogEntry{
167 genLog(1337, "kt hxbye"), 189 genLog(1337, "kt hxbye"),
168 }, 190 },
169 State: &logdog.LogStream State{ 191 State: &logdog.LogStream State{
170 Created: google. NewTimestamp(now), 192 Created: google. NewTimestamp(now),
171 Archive: &logdog .LogStreamState_ArchiveInfo{ 193 Archive: &logdog .LogStreamState_ArchiveInfo{
172 IndexUrl : "index", 194 IndexUrl : "index",
173 StreamUr l: "stream", 195 StreamUr l: "stream",
174 DataUrl: "data", 196 DataUrl: "data",
175 }, 197 },
176 }, 198 },
177 Desc: &logpb.LogStreamDe scriptor{ 199 Desc: &logpb.LogStreamDe scriptor{
178 Prefix: "tes t", 200 Prefix: "tes t",
179 Name: "a", 201 Name: "a",
180 StreamType: logp b.StreamType_TEXT, 202 StreamType: logp b.StreamType_TEXT,
181 }, 203 },
182 }, nil 204 }, nil
183 } 205 }
184 206
185 » » » » » l, err := s.Get(c, p) 207 » » » » » var ls LogStream
208 » » » » » l, err := s.Get(c, WithState(&ls))
186 So(err, ShouldBeNil) 209 So(err, ShouldBeNil)
187 So(l, ShouldResemble, []*logpb.LogEntry{ genLog(1337, "kthxbye")}) 210 So(l, ShouldResemble, []*logpb.LogEntry{ genLog(1337, "kthxbye")})
188 So(ls, ShouldResemble, LogStream{ 211 So(ls, ShouldResemble, LogStream{
189 Path: "test/+/a", 212 Path: "test/+/a",
190 » » » » » » Desc: &logpb.LogStreamDescriptor { 213 » » » » » » Desc: logpb.LogStreamDescriptor{
191 Prefix: "test", 214 Prefix: "test",
192 Name: "a", 215 Name: "a",
193 StreamType: logpb.Stream Type_TEXT, 216 StreamType: logpb.Stream Type_TEXT,
194 }, 217 },
195 » » » » » » State: &StreamState{ 218 » » » » » » State: StreamState{
196 Created: now, 219 Created: now,
197 Archived: true, 220 Archived: true,
198 ArchiveIndexURL: "index ", 221 ArchiveIndexURL: "index ",
199 ArchiveStreamURL: "strea m", 222 ArchiveStreamURL: "strea m",
200 ArchiveDataURL: "data" , 223 ArchiveDataURL: "data" ,
201 }, 224 },
202 }) 225 })
203 }) 226 })
204 227
205 Convey(`Will return ErrNoSuchStream if the strea m is not found.`, func() { 228 Convey(`Will return ErrNoSuchStream if the strea m is not found.`, func() {
206 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 229 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
207 return nil, grpcutil.NotFound 230 return nil, grpcutil.NotFound
208 } 231 }
209 232
210 » » » » » _, err := s.Get(c, p) 233 » » » » » _, err := s.Get(c)
211 So(err, ShouldEqual, ErrNoSuchStream) 234 So(err, ShouldEqual, ErrNoSuchStream)
212 }) 235 })
213 236
214 Convey(`Will return ErrNoAccess if unauthenticat ed.`, func() { 237 Convey(`Will return ErrNoAccess if unauthenticat ed.`, func() {
215 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 238 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
216 return nil, grpcutil.Unauthentic ated 239 return nil, grpcutil.Unauthentic ated
217 } 240 }
218 241
219 » » » » » _, err := s.Get(c, p) 242 » » » » » _, err := s.Get(c)
220 So(err, ShouldEqual, ErrNoAccess) 243 So(err, ShouldEqual, ErrNoAccess)
221 }) 244 })
222 245
223 Convey(`Will return ErrNoAccess if permission is denied.`, func() { 246 Convey(`Will return ErrNoAccess if permission is denied.`, func() {
224 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 247 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
225 return nil, grpcutil.PermissionD enied 248 return nil, grpcutil.PermissionD enied
226 } 249 }
227 250
228 » » » » » _, err := s.Get(c, p) 251 » » » » » _, err := s.Get(c)
229 So(err, ShouldEqual, ErrNoAccess) 252 So(err, ShouldEqual, ErrNoAccess)
230 }) 253 })
231 }) 254 })
232 255
233 Convey(`Test State`, func() { 256 Convey(`Test State`, func() {
234 Convey(`Will request just the state if asked.`, func() { 257 Convey(`Will request just the state if asked.`, func() {
235 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) { 258 svc.GH = func(*logdog.GetRequest) (*logd og.GetResponse, error) {
236 return &logdog.GetResponse{ 259 return &logdog.GetResponse{
237 Project: "myproj", 260 Project: "myproj",
261 Desc: &logpb.LogStreamDe scriptor{
262 Prefix: "tes t",
263 Name: "a",
264 StreamType: logp b.StreamType_TEXT,
265 },
238 State: &logdog.LogStream State{ 266 State: &logdog.LogStream State{
239 Created: google. NewTimestamp(now), 267 Created: google. NewTimestamp(now),
240 }, 268 },
241 }, nil 269 }, nil
242 } 270 }
243 271
244 l, err := s.State(c) 272 l, err := s.State(c)
245 So(err, ShouldBeNil) 273 So(err, ShouldBeNil)
246 So(l, ShouldResemble, &LogStream{ 274 So(l, ShouldResemble, &LogStream{
247 Project: "myproj", 275 Project: "myproj",
248 Path: "test/+/a", 276 Path: "test/+/a",
249 » » » » » » State: &StreamState{ 277 » » » » » » Desc: logpb.LogStreamDescriptor{
278 » » » » » » » Prefix: "test",
279 » » » » » » » Name: "a",
280 » » » » » » » StreamType: logpb.Stream Type_TEXT,
281 » » » » » » },
282 » » » » » » State: StreamState{
250 Created: now.UTC(), 283 Created: now.UTC(),
251 }, 284 },
252 }) 285 })
253 286
254 // Validate the HTTP request that we mad e. 287 // Validate the HTTP request that we mad e.
255 So(svc.GR, ShouldResemble, logdog.GetReq uest{ 288 So(svc.GR, ShouldResemble, logdog.GetReq uest{
256 Project: "myproj", 289 Project: "myproj",
257 Path: "test/+/a", 290 Path: "test/+/a",
258 LogCount: -1, 291 LogCount: -1,
259 State: true, 292 State: true,
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
301 Name: "a", 334 Name: "a",
302 StreamType: logp b.StreamType_TEXT, 335 StreamType: logp b.StreamType_TEXT,
303 }, 336 },
304 Logs: []*logpb.LogEntry{ 337 Logs: []*logpb.LogEntry{
305 genLog(1337, "kt hxbye"), 338 genLog(1337, "kt hxbye"),
306 }, 339 },
307 }, nil 340 }, nil
308 } 341 }
309 342
310 var ls LogStream 343 var ls LogStream
311 » » » » » l, err := s.Tail(c, &ls) 344 » » » » » l, err := s.Tail(c, WithState(&ls))
312 So(err, ShouldBeNil) 345 So(err, ShouldBeNil)
313 346
314 // Validate the HTTP request that we mad e. 347 // Validate the HTTP request that we mad e.
315 So(svc.TR, ShouldResemble, logdog.TailRe quest{ 348 So(svc.TR, ShouldResemble, logdog.TailRe quest{
316 Project: "myproj", 349 Project: "myproj",
317 Path: "test/+/a", 350 Path: "test/+/a",
318 State: true, 351 State: true,
319 }) 352 })
320 353
321 // Validate that the log and state were returned. 354 // Validate that the log and state were returned.
322 So(l, ShouldResemble, genLog(1337, "kthx bye")) 355 So(l, ShouldResemble, genLog(1337, "kthx bye"))
323 So(ls, ShouldResemble, LogStream{ 356 So(ls, ShouldResemble, LogStream{
324 Project: "myproj", 357 Project: "myproj",
325 Path: "test/+/a", 358 Path: "test/+/a",
326 » » » » » » Desc: &logpb.LogStreamDescriptor { 359 » » » » » » Desc: logpb.LogStreamDescriptor{
327 Prefix: "test", 360 Prefix: "test",
328 Name: "a", 361 Name: "a",
329 StreamType: logpb.Stream Type_TEXT, 362 StreamType: logpb.Stream Type_TEXT,
330 }, 363 },
331 » » » » » » State: &StreamState{ 364 » » » » » » State: StreamState{
332 Created: now, 365 Created: now,
333 }, 366 },
334 }) 367 })
335 }) 368 })
336 369
337 Convey(`Will return nil with state if no logs ar e returned from the endpoint.`, func() { 370 Convey(`Will return nil with state if no logs ar e returned from the endpoint.`, func() {
338 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) { 371 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) {
339 return &logdog.GetResponse{ 372 return &logdog.GetResponse{
340 Project: "myproj", 373 Project: "myproj",
341 State: &logdog.LogStream State{ 374 State: &logdog.LogStream State{
342 Created: google. NewTimestamp(now), 375 Created: google. NewTimestamp(now),
343 }, 376 },
344 Desc: &logpb.LogStreamDe scriptor{ 377 Desc: &logpb.LogStreamDe scriptor{
345 Prefix: "tes t", 378 Prefix: "tes t",
346 Name: "a", 379 Name: "a",
347 StreamType: logp b.StreamType_TEXT, 380 StreamType: logp b.StreamType_TEXT,
348 }, 381 },
349 }, nil 382 }, nil
350 } 383 }
351 384
352 var ls LogStream 385 var ls LogStream
353 » » » » » l, err := s.Tail(c, &ls) 386 » » » » » l, err := s.Tail(c, WithState(&ls))
354 So(err, ShouldBeNil) 387 So(err, ShouldBeNil)
355 So(l, ShouldBeNil) 388 So(l, ShouldBeNil)
356 So(ls, ShouldResemble, LogStream{ 389 So(ls, ShouldResemble, LogStream{
357 Project: "myproj", 390 Project: "myproj",
358 Path: "test/+/a", 391 Path: "test/+/a",
359 » » » » » » Desc: &logpb.LogStreamDescriptor { 392 » » » » » » Desc: logpb.LogStreamDescriptor{
360 Prefix: "test", 393 Prefix: "test",
361 Name: "a", 394 Name: "a",
362 StreamType: logpb.Stream Type_TEXT, 395 StreamType: logpb.Stream Type_TEXT,
363 }, 396 },
364 » » » » » » State: &StreamState{ 397 » » » » » » State: StreamState{
365 Created: now, 398 Created: now,
366 }, 399 },
367 }) 400 })
368 }) 401 })
369 402
370 Convey(`Will error if multiple logs are returned from the endpoint.`, func() { 403 Convey(`Will error if multiple logs are returned from the endpoint.`, func() {
371 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) { 404 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) {
372 return &logdog.GetResponse{ 405 return &logdog.GetResponse{
373 State: &logdog.LogStream State{ 406 State: &logdog.LogStream State{
374 Created: google. NewTimestamp(now), 407 Created: google. NewTimestamp(now),
375 }, 408 },
376 Logs: []*logpb.LogEntry{ 409 Logs: []*logpb.LogEntry{
377 genLog(1337, "oh ai"), 410 genLog(1337, "oh ai"),
378 genLog(1338, "kt hxbye"), 411 genLog(1338, "kt hxbye"),
379 }, 412 },
380 }, nil 413 }, nil
381 } 414 }
382 415
383 » » » » » _, err := s.Tail(c, nil) 416 » » » » » _, err := s.Tail(c)
384 So(err, ShouldErrLike, "tail call return ed 2 logs") 417 So(err, ShouldErrLike, "tail call return ed 2 logs")
385 }) 418 })
386 419
387 Convey(`Will return ErrNoSuchStream if the strea m is not found.`, func() { 420 Convey(`Will return ErrNoSuchStream if the strea m is not found.`, func() {
388 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) { 421 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) {
389 return nil, grpcutil.NotFound 422 return nil, grpcutil.NotFound
390 } 423 }
391 424
392 » » » » » _, err := s.Tail(c, nil) 425 » » » » » _, err := s.Tail(c)
393 So(err, ShouldEqual, ErrNoSuchStream) 426 So(err, ShouldEqual, ErrNoSuchStream)
394 }) 427 })
395 428
396 Convey(`Will return ErrNoAccess if unauthenticat ed.`, func() { 429 Convey(`Will return ErrNoAccess if unauthenticat ed.`, func() {
397 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) { 430 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) {
398 return nil, grpcutil.Unauthentic ated 431 return nil, grpcutil.Unauthentic ated
399 } 432 }
400 433
401 » » » » » _, err := s.Tail(c, nil) 434 » » » » » _, err := s.Tail(c)
402 So(err, ShouldEqual, ErrNoAccess) 435 So(err, ShouldEqual, ErrNoAccess)
403 }) 436 })
404 437
405 Convey(`Will return ErrNoAccess if permission is denied.`, func() { 438 Convey(`Will return ErrNoAccess if permission is denied.`, func() {
406 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) { 439 svc.TH = func(*logdog.TailRequest) (*log dog.GetResponse, error) {
407 return nil, grpcutil.PermissionD enied 440 return nil, grpcutil.PermissionD enied
408 } 441 }
409 442
410 » » » » » _, err := s.Tail(c, nil) 443 » » » » » _, err := s.Tail(c)
411 So(err, ShouldEqual, ErrNoAccess) 444 So(err, ShouldEqual, ErrNoAccess)
412 }) 445 })
446
447 Convey(`When requesting complete streams`, func( ) {
448 var allLogs []*logpb.LogEntry
449 allLogs = append(allLogs, genDG(1337, "f oo", "bar", "baz", "kthxbye")...)
450 allLogs = append(allLogs, genDG(1341, "q ux", "ohai")...)
451 allLogs = append(allLogs, genDG(1343, "c omplete")...)
452 tailLog := allLogs[len(allLogs)-1]
453
454 svc.TH = func(req *logdog.TailRequest) ( *logdog.GetResponse, error) {
455 return &logdog.GetResponse{
456 Logs: []*logpb.LogEntry{ tailLog},
457 State: &logdog.LogStream State{
458 Created: google. NewTimestamp(now),
459 },
460 Desc: &logpb.LogStreamDe scriptor{
461 Prefix: "tes t",
462 Name: "a",
463 StreamType: logp b.StreamType_DATAGRAM,
464 },
465 }, nil
466 }
467
468 svc.GH = func(req *logdog.GetRequest) (* logdog.GetResponse, error) {
469 if req.State || req.NonContiguou s || req.ByteCount != 0 {
470 return nil, errors.New(" not implemented in test")
471 }
472 if len(allLogs) == 0 {
473 return &logdog.GetRespon se{}, nil
474 }
475
476 // Identify the requested index.
477 var ret []*logpb.LogEntry
478 for i, le := range allLogs {
479 if le.StreamIndex == uin t64(req.Index) {
480 ret = allLogs[i: ]
481 break
482 }
483 }
484 count := int(req.LogCount)
485 if count > len(ret) {
486 count = len(ret)
487 }
488 return &logdog.GetResponse{
489 Logs: ret[:count],
490 }, nil
491 }
492
493 Convey(`With a non-partial datagram, ret urns that datagram.`, func() {
494 le, err := s.Tail(c, Complete())
495 So(err, ShouldBeNil)
496 So(le.StreamIndex, ShouldEqual, 1343)
497 So(le.GetDatagram().Partial, Sho uldBeNil)
498 So(le.GetDatagram().Data, Should Resemble, []byte("complete"))
499 })
500
501 Convey(`Can assemble a set of one partia l datagram.`, func() {
502 // This is weird, since this doe sn't need to be partial at all, but
503 // we should handle it gracefull y.
504 dg := tailLog.GetDatagram()
505 dg.Partial = &logpb.Datagram_Par tial{
506 Index: 0,
507 Size: uint64(len(dg.Dat a)),
508 Last: true,
509 }
510
511 le, err := s.Tail(c, Complete())
512 So(err, ShouldBeNil)
513 So(le.StreamIndex, ShouldEqual, 1343)
514 So(le.GetDatagram().Partial, Sho uldBeNil)
515 So(le.GetDatagram().Data, Should Resemble, []byte("complete"))
516 })
517
518 Convey(`Can assemble a set of two partia l datagrams.`, func() {
519 tailLog = allLogs[5]
520
521 le, err := s.Tail(c, Complete())
522 So(err, ShouldBeNil)
523 So(le.StreamIndex, ShouldEqual, 1341)
524 So(le.GetDatagram().Partial, Sho uldBeNil)
525 So(le.GetDatagram().Data, Should Resemble, []byte("quxohai"))
526 })
527
528 Convey(`With a set of three partial data grams.`, func() {
529 tailLog = allLogs[3]
530
531 Convey(`Will return a fully reas sembled datagram.`, func() {
532 var ls LogStream
533 le, err := s.Tail(c, Wit hState(&ls), Complete())
534 So(err, ShouldBeNil)
535 So(le.StreamIndex, Shoul dEqual, 1337)
536 So(le.GetDatagram().Part ial, ShouldBeNil)
537 So(le.GetDatagram().Data , ShouldResemble, []byte("foobarbazkthxbye"))
538
539 So(ls, ShouldResemble, L ogStream{
540 Path: "test/+/a" ,
541 Desc: logpb.LogS treamDescriptor{
542 Prefix: "test",
543 Name: "a",
544 StreamTy pe: logpb.StreamType_DATAGRAM,
545 },
546 State: StreamSta te{
547 Created: now,
548 },
549 })
550 })
551
552 Convey(`Will return an error if the Get fails.`, func() {
553 svc.GH = func(req *logdo g.GetRequest) (*logdog.GetResponse, error) {
554 return nil, grpc util.Errf(codes.InvalidArgument, "test error")
555 }
556
557 _, err := s.Tail(c, Comp lete())
558 So(err, ShouldErrLike, " failed to get intermediate logs")
559 So(err, ShouldErrLike, " test error")
560 })
561
562 Convey(`Will return an error if the Get returns fewer logs than requested.`, func() {
563 allLogs = allLogs[0:1]
564
565 _, err := s.Tail(c, Comp lete())
566 So(err, ShouldErrLike, " incomplete intermediate logs results")
567 })
568
569 Convey(`Will return an error if Get returns non-datagram logs.`, func() {
570 allLogs[1].Content = nil
571
572 _, err := s.Tail(c, Comp lete())
573 So(err, ShouldErrLike, " is not a datagram")
574 })
575
576 Convey(`Will return an error if Get returns non-partial datagram logs.`, func() {
577 allLogs[1].GetDatagram() .Partial = nil
578
579 _, err := s.Tail(c, Comp lete())
580 So(err, ShouldErrLike, " is not partial")
581 })
582
583 Convey(`Will return an error if Get returns non-contiguous partial datagrams.`, func() {
584 allLogs[1].GetDatagram() .Partial.Index = 2
585
586 _, err := s.Tail(c, Comp lete())
587 So(err, ShouldErrLike, " does not have a contiguous index")
588 })
589
590 Convey(`Will return an error if the chunks declare different sizes.`, func() {
591 allLogs[1].GetDatagram() .Partial.Size = 0
592
593 _, err := s.Tail(c, Comp lete())
594 So(err, ShouldErrLike, " inconsistent datagram size")
595 })
596
597 Convey(`Will return an error if the reassembled length exceeds the declared size.`, func() {
598 for _, le := range allLo gs {
599 if p := le.GetDa tagram().Partial; p != nil {
600 p.Size = 0
601 }
602 }
603
604 _, err := s.Tail(c, Comp lete())
605 So(err, ShouldErrLike, " appending chunk data would exceed the declared size")
606 })
607
608 Convey(`Will return an error if the reassembled length doesn't match the declared size.`, func() {
609 for _, le := range allLo gs {
610 if p := le.GetDa tagram().Partial; p != nil {
611 p.Size = 1024 * 1024
612 }
613 }
614
615 _, err := s.Tail(c, Comp lete())
616 So(err, ShouldErrLike, " differs from declared length")
617 })
618 })
619
620 Convey(`When Tail returns a mid-partial datagram.`, func() {
621 tailLog = allLogs[4]
622
623 Convey(`If the previous datagram is partial, will return it reassembled.`, func() {
624 le, err := s.Tail(c, Com plete())
625 So(err, ShouldBeNil)
626 So(le.StreamIndex, Shoul dEqual, 1337)
627 So(le.GetDatagram().Part ial, ShouldBeNil)
628 So(le.GetDatagram().Data , ShouldResemble, []byte("foobarbazkthxbye"))
629 })
630
631 Convey(`If the previous datagram is not partial, will return it.`, func() {
632 allLogs[3].GetDatagram() .Partial = nil
633
634 le, err := s.Tail(c, Com plete())
635 So(err, ShouldBeNil)
636 So(le.StreamIndex, Shoul dEqual, 1340)
637 So(le.GetDatagram().Part ial, ShouldBeNil)
638 So(le.GetDatagram().Data , ShouldResemble, []byte("kthxbye"))
639 })
640 })
641 })
413 }) 642 })
414 }) 643 })
415 }) 644 })
416 } 645 }
OLDNEW
« no previous file with comments | « logdog/client/coordinator/stream_params.go ('k') | milo/appengine/logdog/build.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698