Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Side by Side Diff: appengine/logdog/coordinator/endpoints/logs/get_test.go

Issue 1904503003: LogDog: Fix archived log stream read errors. (Closed) Base URL: https://github.com/luci/luci-go@hierarchy-check-first
Patch Set: Delete "offset()" method. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « appengine/logdog/coordinator/endpoints/logs/get.go ('k') | common/gcloud/gs/gs.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "io" 11 "io"
12 "math"
12 "testing" 13 "testing"
13 "time" 14 "time"
14 15
15 "github.com/golang/protobuf/proto" 16 "github.com/golang/protobuf/proto"
16 "github.com/luci/gae/filter/featureBreaker" 17 "github.com/luci/gae/filter/featureBreaker"
17 "github.com/luci/gae/impl/memory" 18 "github.com/luci/gae/impl/memory"
18 ds "github.com/luci/gae/service/datastore" 19 ds "github.com/luci/gae/service/datastore"
19 "github.com/luci/luci-go/appengine/logdog/coordinator" 20 "github.com/luci/luci-go/appengine/logdog/coordinator"
20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
21 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" 22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1"
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
57 return c[path] 58 return c[path]
58 } 59 }
59 60
60 func (c testGSClient) Close() error { return nil } 61 func (c testGSClient) Close() error { return nil }
61 func (c testGSClient) NewWriter(gs.Path) (gs.Writer, error) { 62 func (c testGSClient) NewWriter(gs.Path) (gs.Writer, error) {
62 return nil, errors.New("not implemented") 63 return nil, errors.New("not implemented")
63 } 64 }
64 func (c testGSClient) Rename(gs.Path, gs.Path) error { return errors.New("not im plemented") } 65 func (c testGSClient) Rename(gs.Path, gs.Path) error { return errors.New("not im plemented") }
65 func (c testGSClient) Delete(gs.Path) error { return errors.New("not im plemented") } 66 func (c testGSClient) Delete(gs.Path) error { return errors.New("not im plemented") }
66 67
67 func (c testGSClient) NewReader(path gs.Path, o gs.Options) (io.ReadCloser, erro r) { 68 func (c testGSClient) NewReader(path gs.Path, offset int64, length int64) (io.Re adCloser, error) {
68 if d, ok := c["error"]; ok { 69 if d, ok := c["error"]; ok {
69 return nil, errors.New(string(d)) 70 return nil, errors.New(string(d))
70 } 71 }
71 72
72 d, ok := c[path] 73 d, ok := c[path]
73 if !ok { 74 if !ok {
74 return nil, errors.New("does not exist") 75 return nil, errors.New("does not exist")
75 } 76 }
76 77
77 » to := int(o.To) 78 » // Determine the slice of data to return.
78 » if to == 0 { 79 » if offset < 0 {
79 » » to = len(d) 80 » » offset = 0
80 } 81 }
81 » d = d[int(o.From):to] 82 » end := int64(len(d))
83 » if length >= 0 {
84 » » if v := offset + length; v < end {
85 » » » end = v
86 » » }
87 » }
88 » d = d[offset:end]
82 89
83 r := make([]byte, len(d)) 90 r := make([]byte, len(d))
84 copy(r, d) 91 copy(r, d)
85 gsr := testGSReader(r) 92 gsr := testGSReader(r)
86 return &gsr, nil 93 return &gsr, nil
87 } 94 }
88 95
89 type testGSReader []byte 96 type testGSReader []byte
90 97
91 func (r *testGSReader) Read(d []byte) (int, error) { 98 func (r *testGSReader) Read(d []byte) (int, error) {
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
134 for i := int64(0); i < s; i++ { 141 for i := int64(0); i < s; i++ {
135 d[pos+int(i)] = 0x00 142 d[pos+int(i)] = 0x00
136 } 143 }
137 144
138 // Read the (now-zeroed) data. 145 // Read the (now-zeroed) data.
139 trash.Reset() 146 trash.Reset()
140 trash.ReadFrom(r) 147 trash.ReadFrom(r)
141 } 148 }
142 } 149 }
143 150
144 func TestGet(t *testing.T) { 151 func testGetImpl(t *testing.T, archived bool) {
145 » t.Parallel() 152 » Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive d=%v)`, archived), t, func() {
146
147 » Convey(`With a testing configuration, a Get request`, t, func() {
148 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 153 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
149 c = memory.Use(c) 154 c = memory.Use(c)
150 155
151 fs := authtest.FakeState{} 156 fs := authtest.FakeState{}
152 c = auth.WithState(c, &fs) 157 c = auth.WithState(c, &fs)
153 158
154 ms := memoryStorage.Storage{} 159 ms := memoryStorage.Storage{}
155 gsc := testGSClient{} 160 gsc := testGSClient{}
156 svcStub := ct.Services{ 161 svcStub := ct.Services{
157 IS: func() (storage.Storage, error) { 162 IS: func() (storage.Storage, error) {
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
206 } 211 }
207 } 212 }
208 213
209 d, err := proto.Marshal(le) 214 d, err := proto.Marshal(le)
210 if err != nil { 215 if err != nil {
211 panic(err) 216 panic(err)
212 } 217 }
213 protobufs[uint64(v)] = d 218 protobufs[uint64(v)] = d
214 } 219 }
215 220
221 // frameSize returns the full RecordIO frame size for the named log protobuf
222 // indices.
223 frameSize := func(indices ...uint64) int32 {
224 var size int
225 for _, idx := range indices {
226 pb := protobufs[idx]
227 size += recordio.FrameHeaderSize(int64(len(pb))) + len(pb)
228 }
229 if size > math.MaxInt32 {
230 panic(size)
231 }
232 return int32(size)
233 }
234
216 Convey(`Testing Get requests (no logs)`, func() { 235 Convey(`Testing Get requests (no logs)`, func() {
217 req := logdog.GetRequest{ 236 req := logdog.GetRequest{
218 Path: string(ls.Path()), 237 Path: string(ls.Path()),
219 } 238 }
220 239
221 Convey(`Will fail if the Path is not a stream path or a hash.`, func() { 240 Convey(`Will fail if the Path is not a stream path or a hash.`, func() {
222 req.Path = "not/a/full/stream/path" 241 req.Path = "not/a/full/stream/path"
223 _, err := s.Get(c, &req) 242 _, err := s.Get(c, &req)
224 So(err, ShouldErrLike, "invalid path value") 243 So(err, ShouldErrLike, "invalid path value")
225 }) 244 })
226 245
227 Convey(`Will fail with Internal if the datastore Get() d oesn't work.`, func() { 246 Convey(`Will fail with Internal if the datastore Get() d oesn't work.`, func() {
228 c, fb := featureBreaker.FilterRDS(c, nil) 247 c, fb := featureBreaker.FilterRDS(c, nil)
229 fb.BreakFeatures(errors.New("testing error"), "G etMulti") 248 fb.BreakFeatures(errors.New("testing error"), "G etMulti")
230 249
231 _, err := s.Get(c, &req) 250 _, err := s.Get(c, &req)
232 So(err, ShouldBeRPCInternal) 251 So(err, ShouldBeRPCInternal)
233 }) 252 })
234 253
235 Convey(`Will fail with NotFound if the log stream does n ot exist.`, func() { 254 Convey(`Will fail with NotFound if the log stream does n ot exist.`, func() {
236 req.Path = "testing/+/does/not/exist" 255 req.Path = "testing/+/does/not/exist"
237 _, err := s.Get(c, &req) 256 _, err := s.Get(c, &req)
238 So(err, ShouldBeRPCNotFound) 257 So(err, ShouldBeRPCNotFound)
239 }) 258 })
240 }) 259 })
241 260
242 » » for _, v := range []bool{ 261 » » if !archived {
243 » » » false, 262 » » » // Add the logs to the in-memory temporary storage.
244 » » » true, 263 » » » for _, le := range entries {
245 » » } { 264 » » » » err := ms.Put(storage.PutRequest{
246 » » » is := "is" 265 » » » » » Path: ls.Path(),
247 » » » if !v { 266 » » » » » Index: types.MessageIndex(le.StreamInde x),
248 » » » » is += " not" 267 » » » » » Values: [][]byte{protobufs[le.StreamInde x]},
249 » » » } 268 » » » » })
250 269 » » » » if err != nil {
251 » » » Convey(fmt.Sprintf(`When the log %s archived`, is), func () { 270 » » » » » panic(fmt.Errorf("failed to Put() LogEnt ry: %v", err))
252 » » » » if !v {
253 » » » » » // Add the logs to the in-memory tempora ry storage.
254 » » » » » for _, le := range entries {
255 » » » » » » err := ms.Put(storage.PutRequest {
256 » » » » » » » Path: ls.Path(),
257 » » » » » » » Index: types.MessageInd ex(le.StreamIndex),
258 » » » » » » » Values: [][]byte{protobu fs[le.StreamIndex]},
259 » » » » » » })
260 » » » » » » if err != nil {
261 » » » » » » » panic(fmt.Errorf("failed to Put() LogEntry: %v", err))
262 » » » » » » }
263 » » » » » }
264 » » » » } else {
265 » » » » » // Archive this log stream. We will gene rate one index entry for every
266 » » » » » // 2 log entries.
267 » » » » » src := staticArchiveSource(entries)
268 » » » » » var lbuf, ibuf bytes.Buffer
269 » » » » » m := archive.Manifest{
270 » » » » » » Desc: desc,
271 » » » » » » Source: &src,
272 » » » » » » LogWriter: &lbuf,
273 » » » » » » IndexWriter: &ibuf,
274 » » » » » » StreamIndexRange: 2,
275 » » » » » }
276 » » » » » if err := archive.Archive(m); err != nil {
277 » » » » » » panic(err)
278 » » » » » }
279
280 » » » » » now := tc.Now().UTC()
281
282 » » » » » gsc.put("gs://testbucket/stream", lbuf.B ytes())
283 » » » » » gsc.put("gs://testbucket/index", ibuf.By tes())
284 » » » » » ls.State = coordinator.LSArchived
285 » » » » » ls.TerminatedTime = now
286 » » » » » ls.ArchivedTime = now
287 » » » » » ls.ArchiveStreamURL = "gs://testbucket/s tream"
288 » » » » » ls.ArchiveIndexURL = "gs://testbucket/in dex"
289 } 271 }
272 }
273 } else {
274 // Archive this log stream. We will generate one index e ntry for every
275 // 2 log entries.
276 src := staticArchiveSource(entries)
277 var lbuf, ibuf bytes.Buffer
278 m := archive.Manifest{
279 Desc: desc,
280 Source: &src,
281 LogWriter: &lbuf,
282 IndexWriter: &ibuf,
283 StreamIndexRange: 2,
284 }
285 if err := archive.Archive(m); err != nil {
286 panic(err)
287 }
288
289 now := tc.Now().UTC()
290
291 gsc.put("gs://testbucket/stream", lbuf.Bytes())
292 gsc.put("gs://testbucket/index", ibuf.Bytes())
293 ls.State = coordinator.LSArchived
294 ls.TerminatedTime = now
295 ls.ArchivedTime = now
296 ls.ArchiveStreamURL = "gs://testbucket/stream"
297 ls.ArchiveIndexURL = "gs://testbucket/index"
298 }
299 if err := ds.Get(c).Put(ls); err != nil {
300 panic(err)
301 }
302
303 Convey(`Testing Get requests`, func() {
304 req := logdog.GetRequest{
305 Path: string(ls.Path()),
306 }
307
308 Convey(`When the log stream is purged`, func() {
309 ls.Purged = true
290 if err := ds.Get(c).Put(ls); err != nil { 310 if err := ds.Get(c).Put(ls); err != nil {
291 panic(err) 311 panic(err)
292 } 312 }
293 313
294 » » » » Convey(`Testing Get requests`, func() { 314 » » » » Convey(`Will return NotFound if the user is not an administrator.`, func() {
295 » » » » » req := logdog.GetRequest{ 315 » » » » » _, err := s.Get(c, &req)
296 » » » » » » Path: string(ls.Path()), 316 » » » » » So(err, ShouldBeRPCNotFound)
317 » » » » })
318
319 » » » » Convey(`Will process the request if the user is an administrator.`, func() {
320 » » » » » fs.IdentityGroups = []string{"test-admin istrators"}
321
322 » » » » » resp, err := s.Get(c, &req)
323 » » » » » So(err, ShouldBeRPCOK)
324 » » » » » So(resp, shouldHaveLogs, 0, 1, 2)
325 » » » » })
326 » » » })
327
328 » » » Convey(`Will return empty if no records were requested.` , func() {
329 » » » » req.LogCount = -1
330 » » » » req.State = false
331
332 » » » » resp, err := s.Get(c, &req)
333 » » » » So(err, ShouldBeRPCOK)
334 » » » » So(resp.Logs, ShouldHaveLength, 0)
335 » » » })
336
337 » » » Convey(`Will successfully retrieve a stream path.`, func () {
338 » » » » resp, err := s.Get(c, &req)
339 » » » » So(err, ShouldBeRPCOK)
340 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
341 » » » })
342
343 » » » Convey(`Will successfully retrieve a stream path offset at 4.`, func() {
344 » » » » req.Index = 4
345
346 » » » » resp, err := s.Get(c, &req)
347 » » » » So(err, ShouldBeRPCOK)
348 » » » » So(resp, shouldHaveLogs, 4, 5)
349 » » » })
350
351 » » » Convey(`Will retrieve no logs for contiguous offset 6.`, func() {
352 » » » » req.Index = 6
353
354 » » » » resp, err := s.Get(c, &req)
355 » » » » So(err, ShouldBeRPCOK)
356 » » » » So(len(resp.Logs), ShouldEqual, 0)
357 » » » })
358
359 » » » Convey(`Will retrieve log 7 for non-contiguous offset 6. `, func() {
360 » » » » req.NonContiguous = true
361 » » » » req.Index = 6
362
363 » » » » resp, err := s.Get(c, &req)
364 » » » » So(err, ShouldBeRPCOK)
365 » » » » So(resp, shouldHaveLogs, 7)
366 » » » })
367
368 » » » Convey(`With a byte limit of 1, will still return at lea st one log entry.`, func() {
369 » » » » req.ByteCount = 1
370
371 » » » » resp, err := s.Get(c, &req)
372 » » » » So(err, ShouldBeRPCOK)
373 » » » » So(resp, shouldHaveLogs, 0)
374 » » » })
375
376 » » » Convey(`With a byte limit of sizeof(0), will return log entry 0.`, func() {
377 » » » » req.ByteCount = frameSize(0)
378
379 » » » » resp, err := s.Get(c, &req)
380 » » » » So(err, ShouldBeRPCOK)
381 » » » » So(resp, shouldHaveLogs, 0)
382 » » » })
383
384 » » » Convey(`With a byte limit of sizeof(0)+1, will return lo g entry 0.`, func() {
385 » » » » req.ByteCount = frameSize(0) + 1
386
387 » » » » resp, err := s.Get(c, &req)
388 » » » » So(err, ShouldBeRPCOK)
389 » » » » So(resp, shouldHaveLogs, 0)
390 » » » })
391
392 » » » Convey(`With a byte limit of sizeof({0, 1}), will return log entries {0, 1}.`, func() {
393 » » » » req.ByteCount = frameSize(0, 1)
394
395 » » » » resp, err := s.Get(c, &req)
396 » » » » So(err, ShouldBeRPCOK)
397 » » » » So(resp, shouldHaveLogs, 0, 1)
398 » » » })
399
400 » » » Convey(`With a byte limit of sizeof({0, 1, 2}), will ret urn log entries {0, 1, 2}.`, func() {
401 » » » » req.ByteCount = frameSize(0, 1, 2)
402
403 » » » » resp, err := s.Get(c, &req)
404 » » » » So(err, ShouldBeRPCOK)
405 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
406 » » » })
407
408 » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1, will r eturn log entries {0, 1, 2}.`, func() {
409 » » » » req.ByteCount = frameSize(0, 1, 2) + 1
410
411 » » » » resp, err := s.Get(c, &req)
412 » » » » So(err, ShouldBeRPCOK)
413 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
414 » » » })
415
416 » » » Convey(`Will successfully retrieve a stream path hash.`, func() {
417 » » » » req.Path = ls.HashID
418 » » » » resp, err := s.Get(c, &req)
419 » » » » So(err, ShouldBeRPCOK)
420 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
421 » » » })
422
423 » » » Convey(`When requesting state`, func() {
424 » » » » req.State = true
425 » » » » req.LogCount = -1
426
427 » » » » Convey(`Will successfully retrieve stream state. `, func() {
428 » » » » » resp, err := s.Get(c, &req)
429 » » » » » So(err, ShouldBeRPCOK)
430 » » » » » So(resp.State, ShouldResemble, loadLogSt reamState(ls))
431 » » » » » So(len(resp.Logs), ShouldEqual, 0)
432 » » » » })
433
434 » » » » Convey(`Will return Internal if the protobuf des criptor data is corrupt.`, func() {
435 » » » » » ls.SetDSValidate(false)
436 » » » » » ls.Descriptor = []byte{0x00} // Invalid protobuf, zero tag.
437 » » » » » if err := ds.Get(c).Put(ls); err != nil {
438 » » » » » » panic(err)
297 } 439 }
298 440
299 » » » » » Convey(`When the log stream is purged`, func() { 441 » » » » » _, err := s.Get(c, &req)
300 » » » » » » ls.Purged = true 442 » » » » » So(err, ShouldBeRPCInternal)
301 » » » » » » if err := ds.Get(c).Put(ls); err != nil { 443 » » » » })
302 » » » » » » » panic(err) 444 » » » })
303 » » » » » » } 445
304 446 » » » Convey(`Will return Internal if the protobuf log entry d ata is corrupt.`, func() {
305 » » » » » » Convey(`Will return NotFound if the user is not an administrator.`, func() { 447 » » » » if archived {
306 » » » » » » » _, err := s.Get(c, &req) 448 » » » » » // Corrupt the archive datastream.
307 » » » » » » » So(err, ShouldBeRPCNotFo und) 449 » » » » » stream := gsc.get("gs://testbucket/strea m")
308 » » » » » » }) 450 » » » » » zeroRecords(stream)
309 451 » » » » } else {
310 » » » » » » Convey(`Will process the request if the user is an administrator.`, func() { 452 » » » » » // Add corrupted entry to Storage. Creat e a new entry here, since
311 » » » » » » » fs.IdentityGroups = []st ring{"test-administrators"} 453 » » » » » // the storage will reject a duplicate/o verwrite.
312 454 » » » » » err := ms.Put(storage.PutRequest{
313 » » » » » » » resp, err := s.Get(c, &r eq) 455 » » » » » » Path: types.StreamPath(req.Pat h),
314 » » » » » » » So(err, ShouldBeRPCOK) 456 » » » » » » Index: 666,
315 » » » » » » » So(resp, shouldHaveLogs, 0, 1, 2) 457 » » » » » » Values: [][]byte{{0x00}}, // Inv alid protobuf, zero tag.
316 » » » » » » })
317 }) 458 })
318 459 » » » » » if err != nil {
319 » » » » » Convey(`Will return empty if no records were requested.`, func() { 460 » » » » » » panic(err)
320 » » » » » » req.LogCount = -1
321 » » » » » » req.State = false
322
323 » » » » » » resp, err := s.Get(c, &req)
324 » » » » » » So(err, ShouldBeRPCOK)
325 » » » » » » So(resp.Logs, ShouldHaveLength, 0)
326 » » » » » })
327
328 » » » » » Convey(`Will successfully retrieve a str eam path.`, func() {
329 » » » » » » resp, err := s.Get(c, &req)
330 » » » » » » So(err, ShouldBeRPCOK)
331 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 )
332 » » » » » })
333
334 » » » » » Convey(`Will successfully retrieve a str eam path offset at 4.`, func() {
335 » » » » » » req.Index = 4
336
337 » » » » » » resp, err := s.Get(c, &req)
338 » » » » » » So(err, ShouldBeRPCOK)
339 » » » » » » So(resp, shouldHaveLogs, 4, 5)
340 » » » » » })
341
342 » » » » » Convey(`Will retrieve no logs for contig uous offset 6.`, func() {
343 » » » » » » req.Index = 6
344
345 » » » » » » resp, err := s.Get(c, &req)
346 » » » » » » So(err, ShouldBeRPCOK)
347 » » » » » » So(len(resp.Logs), ShouldEqual, 0)
348 » » » » » })
349
350 » » » » » Convey(`Will retrieve log 7 for non-cont iguous offset 6.`, func() {
351 » » » » » » req.NonContiguous = true
352 » » » » » » req.Index = 6
353
354 » » » » » » resp, err := s.Get(c, &req)
355 » » » » » » So(err, ShouldBeRPCOK)
356 » » » » » » So(resp, shouldHaveLogs, 7)
357 » » » » » })
358
359 » » » » » Convey(`With a byte limit of 1, will sti ll return at least one log entry.`, func() {
360 » » » » » » req.ByteCount = 1
361
362 » » » » » » resp, err := s.Get(c, &req)
363 » » » » » » So(err, ShouldBeRPCOK)
364 » » » » » » So(resp, shouldHaveLogs, 0)
365 » » » » » })
366
367 » » » » » Convey(`With a byte limit of sizeof(0), will return log entry 0.`, func() {
368 » » » » » » req.ByteCount = int32(len(protob ufs[0]))
369
370 » » » » » » resp, err := s.Get(c, &req)
371 » » » » » » So(err, ShouldBeRPCOK)
372 » » » » » » So(resp, shouldHaveLogs, 0)
373 » » » » » })
374
375 » » » » » Convey(`With a byte limit of sizeof(0)+1 , will return log entry 0.`, func() {
376 » » » » » » req.ByteCount = int32(len(protob ufs[0]))
377
378 » » » » » » resp, err := s.Get(c, &req)
379 » » » » » » So(err, ShouldBeRPCOK)
380 » » » » » » So(resp, shouldHaveLogs, 0)
381 » » » » » })
382
383 » » » » » Convey(`With a byte limit of sizeof({0, 1}), will return log entries {0, 1}.`, func() {
384 » » » » » » req.ByteCount = int32(len(protob ufs[0]) + len(protobufs[1]))
385
386 » » » » » » resp, err := s.Get(c, &req)
387 » » » » » » So(err, ShouldBeRPCOK)
388 » » » » » » So(resp, shouldHaveLogs, 0, 1)
389 » » » » » })
390
391 » » » » » Convey(`With a byte limit of sizeof({0, 1, 2}), will return log entries {0, 1, 2}.`, func() {
392 » » » » » » req.ByteCount = int32(len(protob ufs[0]) + len(protobufs[1]) + len(protobufs[2]))
393
394 » » » » » » resp, err := s.Get(c, &req)
395 » » » » » » So(err, ShouldBeRPCOK)
396 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 )
397 » » » » » })
398
399 » » » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1, will return log entries {0, 1, 2}.`, func() {
400 » » » » » » req.ByteCount = int32(len(protob ufs[0]) + len(protobufs[1]) + len(protobufs[2]) + 1)
401
402 » » » » » » resp, err := s.Get(c, &req)
403 » » » » » » So(err, ShouldBeRPCOK)
404 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 )
405 » » » » » })
406
407 » » » » » Convey(`Will successfully retrieve a str eam path hash.`, func() {
408 » » » » » » req.Path = ls.HashID
409 » » » » » » resp, err := s.Get(c, &req)
410 » » » » » » So(err, ShouldBeRPCOK)
411 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 )
412 » » » » » })
413
414 » » » » » Convey(`When requesting state`, func() {
415 » » » » » » req.State = true
416 » » » » » » req.LogCount = -1
417
418 » » » » » » Convey(`Will successfully retrie ve stream state.`, func() {
419 » » » » » » » resp, err := s.Get(c, &r eq)
420 » » » » » » » So(err, ShouldBeRPCOK)
421 » » » » » » » So(resp.State, ShouldRes emble, loadLogStreamState(ls))
422 » » » » » » » So(len(resp.Logs), Shoul dEqual, 0)
423 » » » » » » })
424
425 » » » » » » Convey(`Will return Internal if the protobuf descriptor data is corrupt.`, func() {
426 » » » » » » » ls.SetDSValidate(false)
427 » » » » » » » ls.Descriptor = []byte{0 x00} // Invalid protobuf, zero tag.
428 » » » » » » » if err := ds.Get(c).Put( ls); err != nil {
429 » » » » » » » » panic(err)
430 » » » » » » » }
431
432 » » » » » » » _, err := s.Get(c, &req)
433 » » » » » » » So(err, ShouldBeRPCInter nal)
434 » » » » » » })
435 » » » » » })
436
437 » » » » » Convey(`Will return Internal if the prot obuf log entry data is corrupt.`, func() {
438 » » » » » » if v {
439 » » » » » » » // Corrupt the archive d atastream.
440 » » » » » » » stream := gsc.get("gs:// testbucket/stream")
441 » » » » » » » zeroRecords(stream)
442 » » » » » » } else {
443 » » » » » » » // Add corrupted entry t o Storage. Create a new entry here, since
444 » » » » » » » // the storage will reje ct a duplicate/overwrite.
445 » » » » » » » err := ms.Put(storage.Pu tRequest{
446 » » » » » » » » Path: types.St reamPath(req.Path),
447 » » » » » » » » Index: 666,
448 » » » » » » » » Values: [][]byte {{0x00}}, // Invalid protobuf, zero tag.
449 » » » » » » » })
450 » » » » » » » if err != nil {
451 » » » » » » » » panic(err)
452 » » » » » » » }
453 » » » » » » » req.Index = 666
454 » » » » » » }
455
456 » » » » » » _, err := s.Get(c, &req)
457 » » » » » » So(err, ShouldBeRPCInternal)
458 » » » » » })
459
460 » » » » » Convey(`Will successfully retrieve both logs and stream state.`, func() {
461 » » » » » » req.State = true
462
463 » » » » » » resp, err := s.Get(c, &req)
464 » » » » » » So(err, ShouldBeRPCOK)
465 » » » » » » So(resp.State, ShouldResemble, l oadLogStreamState(ls))
466 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 )
467 » » » » » })
468
469 » » » » » Convey(`Will return Internal if the Stor age is not working.`, func() {
470 » » » » » » if v {
471 » » » » » » » gsc["error"] = []byte("t est error")
472 » » » » » » } else {
473 » » » » » » » ms.Close()
474 » » » » » » }
475
476 » » » » » » _, err := s.Get(c, &req)
477 » » » » » » So(err, ShouldBeRPCInternal)
478 » » » » » })
479
480 » » » » » Convey(`Will enforce a maximum count of 2.`, func() {
481 » » » » » » req.LogCount = 2
482 » » » » » » resp, err := s.Get(c, &req)
483 » » » » » » So(err, ShouldBeRPCOK)
484 » » » » » » So(resp, shouldHaveLogs, 0, 1)
485 » » » » » })
486
487 » » » » » Convey(`When requesting protobufs`, func () {
488 » » » » » » req.State = true
489
490 » » » » » » resp, err := s.Get(c, &req)
491 » » » » » » So(err, ShouldBeRPCOK)
492 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 )
493
494 » » » » » » // Confirm that this has protobu fs.
495 » » » » » » So(len(resp.Logs), ShouldEqual, 3)
496 » » » » » » So(resp.Logs[0], ShouldNotBeNil)
497
498 » » » » » » // Confirm that there is a descr iptor protobuf.
499 » » » » » » So(resp.Desc, ShouldResemble, de sc)
500
501 » » » » » » // Confirm that the state was re turned.
502 » » » » » » So(resp.State, ShouldNotBeNil)
503 » » » » » })
504
505 » » » » » Convey(`Will successfully retrieve all r ecords if non-contiguous is allowed.`, func() {
506 » » » » » » req.NonContiguous = true
507 » » » » » » resp, err := s.Get(c, &req)
508 » » » » » » So(err, ShouldBeRPCOK)
509 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2 , 4, 5, 7)
510 » » » » » })
511
512 » » » » » Convey(`When newlines are not requested, does not include delimiters.`, func() {
513 » » » » » » req.LogCount = 1
514
515 » » » » » » resp, err := s.Get(c, &req)
516 » » » » » » So(err, ShouldBeRPCOK)
517 » » » » » » So(resp, shouldHaveLogs, 0)
518
519 » » » » » » So(resp.Logs[0].GetText(), Shoul dResemble, &logpb.Text{
520 » » » » » » » Lines: []*logpb.Text_Lin e{
521 » » » » » » » » {"log entry #0", "\n"},
522 » » » » » » » » {"another line o f text", ""},
523 » » » » » » » },
524 » » » » » » })
525 » » » » » })
526
527 » » » » » Convey(`Will get a Binary LogEntry`, fun c() {
528 » » » » » » req.Index = 4
529 » » » » » » req.LogCount = 1
530 » » » » » » resp, err := s.Get(c, &req)
531 » » » » » » So(err, ShouldBeRPCOK)
532 » » » » » » So(resp, shouldHaveLogs, 4)
533 » » » » » » So(resp.Logs[0].GetBinary(), Sho uldResemble, &logpb.Binary{
534 » » » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03},
535 » » » » » » })
536 » » » » » })
537
538 » » » » » Convey(`Will get a Datagram LogEntry`, f unc() {
539 » » » » » » req.Index = 5
540 » » » » » » req.LogCount = 1
541 » » » » » » resp, err := s.Get(c, &req)
542 » » » » » » So(err, ShouldBeRPCOK)
543 » » » » » » So(resp, shouldHaveLogs, 5)
544 » » » » » » So(resp.Logs[0].GetDatagram(), S houldResemble, &logpb.Datagram{
545 » » » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03},
546 » » » » » » » Partial: &logpb.Datagram _Partial{
547 » » » » » » » » Index: 2,
548 » » » » » » » » Size: 1024,
549 » » » » » » » » Last: false,
550 » » » » » » » },
551 » » » » » » })
552 » » » » » })
553 » » » » })
554
555 » » » » Convey(`Testing tail requests`, func() {
556 » » » » » req := logdog.TailRequest{
557 » » » » » » Path: string(ls.Path()),
558 } 461 }
559 462 » » » » » req.Index = 666
560 » » » » » Convey(`Will successfully retrieve a str eam path.`, func() { 463 » » » » }
561 » » » » » » resp, err := s.Tail(c, &req) 464
562 » » » » » » So(err, ShouldBeRPCOK) 465 » » » » _, err := s.Get(c, &req)
563 » » » » » » So(resp, shouldHaveLogs, 7) 466 » » » » So(err, ShouldBeRPCInternal)
564 » » » » » }) 467 » » » })
565 468
566 » » » » » Convey(`Will successfully retrieve a str eam path hash and state.`, func() { 469 » » » Convey(`Will successfully retrieve both logs and stream state.`, func() {
567 » » » » » » req.Path = ls.HashID 470 » » » » req.State = true
568 » » » » » » req.State = true 471
569 472 » » » » resp, err := s.Get(c, &req)
570 » » » » » » resp, err := s.Tail(c, &req) 473 » » » » So(err, ShouldBeRPCOK)
571 » » » » » » So(err, ShouldBeRPCOK) 474 » » » » So(resp.State, ShouldResemble, loadLogStreamStat e(ls))
572 » » » » » » So(resp, shouldHaveLogs, 7) 475 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
573 » » » » » » So(resp.State, ShouldResemble, l oadLogStreamState(ls)) 476 » » » })
574 » » » » » }) 477
575 » » » » }) 478 » » » Convey(`Will return Internal if the Storage is not worki ng.`, func() {
576 » » » }) 479 » » » » if archived {
577 » » } 480 » » » » » gsc["error"] = []byte("test error")
481 » » » » } else {
482 » » » » » ms.Close()
483 » » » » }
484
485 » » » » _, err := s.Get(c, &req)
486 » » » » So(err, ShouldBeRPCInternal)
487 » » » })
488
489 » » » Convey(`Will enforce a maximum count of 2.`, func() {
490 » » » » req.LogCount = 2
491 » » » » resp, err := s.Get(c, &req)
492 » » » » So(err, ShouldBeRPCOK)
493 » » » » So(resp, shouldHaveLogs, 0, 1)
494 » » » })
495
496 » » » Convey(`When requesting protobufs`, func() {
497 » » » » req.State = true
498
499 » » » » resp, err := s.Get(c, &req)
500 » » » » So(err, ShouldBeRPCOK)
501 » » » » So(resp, shouldHaveLogs, 0, 1, 2)
502
503 » » » » // Confirm that this has protobufs.
504 » » » » So(len(resp.Logs), ShouldEqual, 3)
505 » » » » So(resp.Logs[0], ShouldNotBeNil)
506
507 » » » » // Confirm that there is a descriptor protobuf.
508 » » » » So(resp.Desc, ShouldResemble, desc)
509
510 » » » » // Confirm that the state was returned.
511 » » » » So(resp.State, ShouldNotBeNil)
512 » » » })
513
514 » » » Convey(`Will successfully retrieve all records if non-co ntiguous is allowed.`, func() {
515 » » » » req.NonContiguous = true
516 » » » » resp, err := s.Get(c, &req)
517 » » » » So(err, ShouldBeRPCOK)
518 » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 7)
519 » » » })
520
521 » » » Convey(`When newlines are not requested, does not includ e delimiters.`, func() {
522 » » » » req.LogCount = 1
523
524 » » » » resp, err := s.Get(c, &req)
525 » » » » So(err, ShouldBeRPCOK)
526 » » » » So(resp, shouldHaveLogs, 0)
527
528 » » » » So(resp.Logs[0].GetText(), ShouldResemble, &logp b.Text{
529 » » » » » Lines: []*logpb.Text_Line{
530 » » » » » » {"log entry #0", "\n"},
531 » » » » » » {"another line of text", ""},
532 » » » » » },
533 » » » » })
534 » » » })
535
536 » » » Convey(`Will get a Binary LogEntry`, func() {
537 » » » » req.Index = 4
538 » » » » req.LogCount = 1
539 » » » » resp, err := s.Get(c, &req)
540 » » » » So(err, ShouldBeRPCOK)
541 » » » » So(resp, shouldHaveLogs, 4)
542 » » » » So(resp.Logs[0].GetBinary(), ShouldResemble, &lo gpb.Binary{
543 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03},
544 » » » » })
545 » » » })
546
547 » » » Convey(`Will get a Datagram LogEntry`, func() {
548 » » » » req.Index = 5
549 » » » » req.LogCount = 1
550 » » » » resp, err := s.Get(c, &req)
551 » » » » So(err, ShouldBeRPCOK)
552 » » » » So(resp, shouldHaveLogs, 5)
553 » » » » So(resp.Logs[0].GetDatagram(), ShouldResemble, & logpb.Datagram{
554 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03},
555 » » » » » Partial: &logpb.Datagram_Partial{
556 » » » » » » Index: 2,
557 » » » » » » Size: 1024,
558 » » » » » » Last: false,
559 » » » » » },
560 » » » » })
561 » » » })
562 » » })
563
564 » » Convey(`Testing tail requests`, func() {
565 » » » req := logdog.TailRequest{
566 » » » » Path: string(ls.Path()),
567 » » » }
568
569 » » » Convey(`Will successfully retrieve a stream path.`, func () {
570 » » » » resp, err := s.Tail(c, &req)
571 » » » » So(err, ShouldBeRPCOK)
572 » » » » So(resp, shouldHaveLogs, 7)
573 » » » })
574
575 » » » Convey(`Will successfully retrieve a stream path hash an d state.`, func() {
576 » » » » req.Path = ls.HashID
577 » » » » req.State = true
578
579 » » » » resp, err := s.Tail(c, &req)
580 » » » » So(err, ShouldBeRPCOK)
581 » » » » So(resp, shouldHaveLogs, 7)
582 » » » » So(resp.State, ShouldResemble, loadLogStreamStat e(ls))
583 » » » })
584 » » })
578 }) 585 })
579 } 586 }
587
588 func TestGetIntermediate(t *testing.T) {
589 t.Parallel()
590
591 testGetImpl(t, false)
592 }
593
594 func TestGetArchived(t *testing.T) {
595 t.Parallel()
596
597 testGetImpl(t, true)
598 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/endpoints/logs/get.go ('k') | common/gcloud/gs/gs.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698