| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "io" | 11 "io" |
| 12 "math" |
| 12 "testing" | 13 "testing" |
| 13 "time" | 14 "time" |
| 14 | 15 |
| 15 "github.com/golang/protobuf/proto" | 16 "github.com/golang/protobuf/proto" |
| 16 "github.com/luci/gae/filter/featureBreaker" | 17 "github.com/luci/gae/filter/featureBreaker" |
| 17 "github.com/luci/gae/impl/memory" | 18 "github.com/luci/gae/impl/memory" |
| 18 ds "github.com/luci/gae/service/datastore" | 19 ds "github.com/luci/gae/service/datastore" |
| 19 "github.com/luci/luci-go/appengine/logdog/coordinator" | 20 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 21 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 21 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 22 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 57 return c[path] | 58 return c[path] |
| 58 } | 59 } |
| 59 | 60 |
| 60 func (c testGSClient) Close() error { return nil } | 61 func (c testGSClient) Close() error { return nil } |
| 61 func (c testGSClient) NewWriter(gs.Path) (gs.Writer, error) { | 62 func (c testGSClient) NewWriter(gs.Path) (gs.Writer, error) { |
| 62 return nil, errors.New("not implemented") | 63 return nil, errors.New("not implemented") |
| 63 } | 64 } |
| 64 func (c testGSClient) Rename(gs.Path, gs.Path) error { return errors.New("not im
plemented") } | 65 func (c testGSClient) Rename(gs.Path, gs.Path) error { return errors.New("not im
plemented") } |
| 65 func (c testGSClient) Delete(gs.Path) error { return errors.New("not im
plemented") } | 66 func (c testGSClient) Delete(gs.Path) error { return errors.New("not im
plemented") } |
| 66 | 67 |
| 67 func (c testGSClient) NewReader(path gs.Path, o gs.Options) (io.ReadCloser, erro
r) { | 68 func (c testGSClient) NewReader(path gs.Path, offset int64, length int64) (io.Re
adCloser, error) { |
| 68 if d, ok := c["error"]; ok { | 69 if d, ok := c["error"]; ok { |
| 69 return nil, errors.New(string(d)) | 70 return nil, errors.New(string(d)) |
| 70 } | 71 } |
| 71 | 72 |
| 72 d, ok := c[path] | 73 d, ok := c[path] |
| 73 if !ok { | 74 if !ok { |
| 74 return nil, errors.New("does not exist") | 75 return nil, errors.New("does not exist") |
| 75 } | 76 } |
| 76 | 77 |
| 77 » to := int(o.To) | 78 » // Determine the slice of data to return. |
| 78 » if to == 0 { | 79 » if offset < 0 { |
| 79 » » to = len(d) | 80 » » offset = 0 |
| 80 } | 81 } |
| 81 » d = d[int(o.From):to] | 82 » end := int64(len(d)) |
| 83 » if length >= 0 { |
| 84 » » if v := offset + length; v < end { |
| 85 » » » end = v |
| 86 » » } |
| 87 » } |
| 88 » d = d[offset:end] |
| 82 | 89 |
| 83 r := make([]byte, len(d)) | 90 r := make([]byte, len(d)) |
| 84 copy(r, d) | 91 copy(r, d) |
| 85 gsr := testGSReader(r) | 92 gsr := testGSReader(r) |
| 86 return &gsr, nil | 93 return &gsr, nil |
| 87 } | 94 } |
| 88 | 95 |
| 89 type testGSReader []byte | 96 type testGSReader []byte |
| 90 | 97 |
| 91 func (r *testGSReader) Read(d []byte) (int, error) { | 98 func (r *testGSReader) Read(d []byte) (int, error) { |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 134 for i := int64(0); i < s; i++ { | 141 for i := int64(0); i < s; i++ { |
| 135 d[pos+int(i)] = 0x00 | 142 d[pos+int(i)] = 0x00 |
| 136 } | 143 } |
| 137 | 144 |
| 138 // Read the (now-zeroed) data. | 145 // Read the (now-zeroed) data. |
| 139 trash.Reset() | 146 trash.Reset() |
| 140 trash.ReadFrom(r) | 147 trash.ReadFrom(r) |
| 141 } | 148 } |
| 142 } | 149 } |
| 143 | 150 |
| 144 func TestGet(t *testing.T) { | 151 func testGetImpl(t *testing.T, archived bool) { |
| 145 » t.Parallel() | 152 » Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { |
| 146 | |
| 147 » Convey(`With a testing configuration, a Get request`, t, func() { | |
| 148 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 153 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 149 c = memory.Use(c) | 154 c = memory.Use(c) |
| 150 | 155 |
| 151 fs := authtest.FakeState{} | 156 fs := authtest.FakeState{} |
| 152 c = auth.WithState(c, &fs) | 157 c = auth.WithState(c, &fs) |
| 153 | 158 |
| 154 ms := memoryStorage.Storage{} | 159 ms := memoryStorage.Storage{} |
| 155 gsc := testGSClient{} | 160 gsc := testGSClient{} |
| 156 svcStub := ct.Services{ | 161 svcStub := ct.Services{ |
| 157 IS: func() (storage.Storage, error) { | 162 IS: func() (storage.Storage, error) { |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 206 } | 211 } |
| 207 } | 212 } |
| 208 | 213 |
| 209 d, err := proto.Marshal(le) | 214 d, err := proto.Marshal(le) |
| 210 if err != nil { | 215 if err != nil { |
| 211 panic(err) | 216 panic(err) |
| 212 } | 217 } |
| 213 protobufs[uint64(v)] = d | 218 protobufs[uint64(v)] = d |
| 214 } | 219 } |
| 215 | 220 |
| 221 // frameSize returns the full RecordIO frame size for the named
log protobuf |
| 222 // indices. |
| 223 frameSize := func(indices ...uint64) int32 { |
| 224 var size int |
| 225 for _, idx := range indices { |
| 226 pb := protobufs[idx] |
| 227 size += recordio.FrameHeaderSize(int64(len(pb)))
+ len(pb) |
| 228 } |
| 229 if size > math.MaxInt32 { |
| 230 panic(size) |
| 231 } |
| 232 return int32(size) |
| 233 } |
| 234 |
| 216 Convey(`Testing Get requests (no logs)`, func() { | 235 Convey(`Testing Get requests (no logs)`, func() { |
| 217 req := logdog.GetRequest{ | 236 req := logdog.GetRequest{ |
| 218 Path: string(ls.Path()), | 237 Path: string(ls.Path()), |
| 219 } | 238 } |
| 220 | 239 |
| 221 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { | 240 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { |
| 222 req.Path = "not/a/full/stream/path" | 241 req.Path = "not/a/full/stream/path" |
| 223 _, err := s.Get(c, &req) | 242 _, err := s.Get(c, &req) |
| 224 So(err, ShouldErrLike, "invalid path value") | 243 So(err, ShouldErrLike, "invalid path value") |
| 225 }) | 244 }) |
| 226 | 245 |
| 227 Convey(`Will fail with Internal if the datastore Get() d
oesn't work.`, func() { | 246 Convey(`Will fail with Internal if the datastore Get() d
oesn't work.`, func() { |
| 228 c, fb := featureBreaker.FilterRDS(c, nil) | 247 c, fb := featureBreaker.FilterRDS(c, nil) |
| 229 fb.BreakFeatures(errors.New("testing error"), "G
etMulti") | 248 fb.BreakFeatures(errors.New("testing error"), "G
etMulti") |
| 230 | 249 |
| 231 _, err := s.Get(c, &req) | 250 _, err := s.Get(c, &req) |
| 232 So(err, ShouldBeRPCInternal) | 251 So(err, ShouldBeRPCInternal) |
| 233 }) | 252 }) |
| 234 | 253 |
| 235 Convey(`Will fail with NotFound if the log stream does n
ot exist.`, func() { | 254 Convey(`Will fail with NotFound if the log stream does n
ot exist.`, func() { |
| 236 req.Path = "testing/+/does/not/exist" | 255 req.Path = "testing/+/does/not/exist" |
| 237 _, err := s.Get(c, &req) | 256 _, err := s.Get(c, &req) |
| 238 So(err, ShouldBeRPCNotFound) | 257 So(err, ShouldBeRPCNotFound) |
| 239 }) | 258 }) |
| 240 }) | 259 }) |
| 241 | 260 |
| 242 » » for _, v := range []bool{ | 261 » » if !archived { |
| 243 » » » false, | 262 » » » // Add the logs to the in-memory temporary storage. |
| 244 » » » true, | 263 » » » for _, le := range entries { |
| 245 » » } { | 264 » » » » err := ms.Put(storage.PutRequest{ |
| 246 » » » is := "is" | 265 » » » » » Path: ls.Path(), |
| 247 » » » if !v { | 266 » » » » » Index: types.MessageIndex(le.StreamInde
x), |
| 248 » » » » is += " not" | 267 » » » » » Values: [][]byte{protobufs[le.StreamInde
x]}, |
| 249 » » » } | 268 » » » » }) |
| 250 | 269 » » » » if err != nil { |
| 251 » » » Convey(fmt.Sprintf(`When the log %s archived`, is), func
() { | 270 » » » » » panic(fmt.Errorf("failed to Put() LogEnt
ry: %v", err)) |
| 252 » » » » if !v { | |
| 253 » » » » » // Add the logs to the in-memory tempora
ry storage. | |
| 254 » » » » » for _, le := range entries { | |
| 255 » » » » » » err := ms.Put(storage.PutRequest
{ | |
| 256 » » » » » » » Path: ls.Path(), | |
| 257 » » » » » » » Index: types.MessageInd
ex(le.StreamIndex), | |
| 258 » » » » » » » Values: [][]byte{protobu
fs[le.StreamIndex]}, | |
| 259 » » » » » » }) | |
| 260 » » » » » » if err != nil { | |
| 261 » » » » » » » panic(fmt.Errorf("failed
to Put() LogEntry: %v", err)) | |
| 262 » » » » » » } | |
| 263 » » » » » } | |
| 264 » » » » } else { | |
| 265 » » » » » // Archive this log stream. We will gene
rate one index entry for every | |
| 266 » » » » » // 2 log entries. | |
| 267 » » » » » src := staticArchiveSource(entries) | |
| 268 » » » » » var lbuf, ibuf bytes.Buffer | |
| 269 » » » » » m := archive.Manifest{ | |
| 270 » » » » » » Desc: desc, | |
| 271 » » » » » » Source: &src, | |
| 272 » » » » » » LogWriter: &lbuf, | |
| 273 » » » » » » IndexWriter: &ibuf, | |
| 274 » » » » » » StreamIndexRange: 2, | |
| 275 » » » » » } | |
| 276 » » » » » if err := archive.Archive(m); err != nil
{ | |
| 277 » » » » » » panic(err) | |
| 278 » » » » » } | |
| 279 | |
| 280 » » » » » now := tc.Now().UTC() | |
| 281 | |
| 282 » » » » » gsc.put("gs://testbucket/stream", lbuf.B
ytes()) | |
| 283 » » » » » gsc.put("gs://testbucket/index", ibuf.By
tes()) | |
| 284 » » » » » ls.State = coordinator.LSArchived | |
| 285 » » » » » ls.TerminatedTime = now | |
| 286 » » » » » ls.ArchivedTime = now | |
| 287 » » » » » ls.ArchiveStreamURL = "gs://testbucket/s
tream" | |
| 288 » » » » » ls.ArchiveIndexURL = "gs://testbucket/in
dex" | |
| 289 } | 271 } |
| 272 } |
| 273 } else { |
| 274 // Archive this log stream. We will generate one index e
ntry for every |
| 275 // 2 log entries. |
| 276 src := staticArchiveSource(entries) |
| 277 var lbuf, ibuf bytes.Buffer |
| 278 m := archive.Manifest{ |
| 279 Desc: desc, |
| 280 Source: &src, |
| 281 LogWriter: &lbuf, |
| 282 IndexWriter: &ibuf, |
| 283 StreamIndexRange: 2, |
| 284 } |
| 285 if err := archive.Archive(m); err != nil { |
| 286 panic(err) |
| 287 } |
| 288 |
| 289 now := tc.Now().UTC() |
| 290 |
| 291 gsc.put("gs://testbucket/stream", lbuf.Bytes()) |
| 292 gsc.put("gs://testbucket/index", ibuf.Bytes()) |
| 293 ls.State = coordinator.LSArchived |
| 294 ls.TerminatedTime = now |
| 295 ls.ArchivedTime = now |
| 296 ls.ArchiveStreamURL = "gs://testbucket/stream" |
| 297 ls.ArchiveIndexURL = "gs://testbucket/index" |
| 298 } |
| 299 if err := ds.Get(c).Put(ls); err != nil { |
| 300 panic(err) |
| 301 } |
| 302 |
| 303 Convey(`Testing Get requests`, func() { |
| 304 req := logdog.GetRequest{ |
| 305 Path: string(ls.Path()), |
| 306 } |
| 307 |
| 308 Convey(`When the log stream is purged`, func() { |
| 309 ls.Purged = true |
| 290 if err := ds.Get(c).Put(ls); err != nil { | 310 if err := ds.Get(c).Put(ls); err != nil { |
| 291 panic(err) | 311 panic(err) |
| 292 } | 312 } |
| 293 | 313 |
| 294 » » » » Convey(`Testing Get requests`, func() { | 314 » » » » Convey(`Will return NotFound if the user is not
an administrator.`, func() { |
| 295 » » » » » req := logdog.GetRequest{ | 315 » » » » » _, err := s.Get(c, &req) |
| 296 » » » » » » Path: string(ls.Path()), | 316 » » » » » So(err, ShouldBeRPCNotFound) |
| 317 » » » » }) |
| 318 |
| 319 » » » » Convey(`Will process the request if the user is
an administrator.`, func() { |
| 320 » » » » » fs.IdentityGroups = []string{"test-admin
istrators"} |
| 321 |
| 322 » » » » » resp, err := s.Get(c, &req) |
| 323 » » » » » So(err, ShouldBeRPCOK) |
| 324 » » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 325 » » » » }) |
| 326 » » » }) |
| 327 |
| 328 » » » Convey(`Will return empty if no records were requested.`
, func() { |
| 329 » » » » req.LogCount = -1 |
| 330 » » » » req.State = false |
| 331 |
| 332 » » » » resp, err := s.Get(c, &req) |
| 333 » » » » So(err, ShouldBeRPCOK) |
| 334 » » » » So(resp.Logs, ShouldHaveLength, 0) |
| 335 » » » }) |
| 336 |
| 337 » » » Convey(`Will successfully retrieve a stream path.`, func
() { |
| 338 » » » » resp, err := s.Get(c, &req) |
| 339 » » » » So(err, ShouldBeRPCOK) |
| 340 » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 341 » » » }) |
| 342 |
| 343 » » » Convey(`Will successfully retrieve a stream path offset
at 4.`, func() { |
| 344 » » » » req.Index = 4 |
| 345 |
| 346 » » » » resp, err := s.Get(c, &req) |
| 347 » » » » So(err, ShouldBeRPCOK) |
| 348 » » » » So(resp, shouldHaveLogs, 4, 5) |
| 349 » » » }) |
| 350 |
| 351 » » » Convey(`Will retrieve no logs for contiguous offset 6.`,
func() { |
| 352 » » » » req.Index = 6 |
| 353 |
| 354 » » » » resp, err := s.Get(c, &req) |
| 355 » » » » So(err, ShouldBeRPCOK) |
| 356 » » » » So(len(resp.Logs), ShouldEqual, 0) |
| 357 » » » }) |
| 358 |
| 359 » » » Convey(`Will retrieve log 7 for non-contiguous offset 6.
`, func() { |
| 360 » » » » req.NonContiguous = true |
| 361 » » » » req.Index = 6 |
| 362 |
| 363 » » » » resp, err := s.Get(c, &req) |
| 364 » » » » So(err, ShouldBeRPCOK) |
| 365 » » » » So(resp, shouldHaveLogs, 7) |
| 366 » » » }) |
| 367 |
| 368 » » » Convey(`With a byte limit of 1, will still return at lea
st one log entry.`, func() { |
| 369 » » » » req.ByteCount = 1 |
| 370 |
| 371 » » » » resp, err := s.Get(c, &req) |
| 372 » » » » So(err, ShouldBeRPCOK) |
| 373 » » » » So(resp, shouldHaveLogs, 0) |
| 374 » » » }) |
| 375 |
| 376 » » » Convey(`With a byte limit of sizeof(0), will return log
entry 0.`, func() { |
| 377 » » » » req.ByteCount = frameSize(0) |
| 378 |
| 379 » » » » resp, err := s.Get(c, &req) |
| 380 » » » » So(err, ShouldBeRPCOK) |
| 381 » » » » So(resp, shouldHaveLogs, 0) |
| 382 » » » }) |
| 383 |
| 384 » » » Convey(`With a byte limit of sizeof(0)+1, will return lo
g entry 0.`, func() { |
| 385 » » » » req.ByteCount = frameSize(0) + 1 |
| 386 |
| 387 » » » » resp, err := s.Get(c, &req) |
| 388 » » » » So(err, ShouldBeRPCOK) |
| 389 » » » » So(resp, shouldHaveLogs, 0) |
| 390 » » » }) |
| 391 |
| 392 » » » Convey(`With a byte limit of sizeof({0, 1}), will return
log entries {0, 1}.`, func() { |
| 393 » » » » req.ByteCount = frameSize(0, 1) |
| 394 |
| 395 » » » » resp, err := s.Get(c, &req) |
| 396 » » » » So(err, ShouldBeRPCOK) |
| 397 » » » » So(resp, shouldHaveLogs, 0, 1) |
| 398 » » » }) |
| 399 |
| 400 » » » Convey(`With a byte limit of sizeof({0, 1, 2}), will ret
urn log entries {0, 1, 2}.`, func() { |
| 401 » » » » req.ByteCount = frameSize(0, 1, 2) |
| 402 |
| 403 » » » » resp, err := s.Get(c, &req) |
| 404 » » » » So(err, ShouldBeRPCOK) |
| 405 » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 406 » » » }) |
| 407 |
| 408 » » » Convey(`With a byte limit of sizeof({0, 1, 2})+1, will r
eturn log entries {0, 1, 2}.`, func() { |
| 409 » » » » req.ByteCount = frameSize(0, 1, 2) + 1 |
| 410 |
| 411 » » » » resp, err := s.Get(c, &req) |
| 412 » » » » So(err, ShouldBeRPCOK) |
| 413 » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 414 » » » }) |
| 415 |
| 416 » » » Convey(`Will successfully retrieve a stream path hash.`,
func() { |
| 417 » » » » req.Path = ls.HashID |
| 418 » » » » resp, err := s.Get(c, &req) |
| 419 » » » » So(err, ShouldBeRPCOK) |
| 420 » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 421 » » » }) |
| 422 |
| 423 » » » Convey(`When requesting state`, func() { |
| 424 » » » » req.State = true |
| 425 » » » » req.LogCount = -1 |
| 426 |
| 427 » » » » Convey(`Will successfully retrieve stream state.
`, func() { |
| 428 » » » » » resp, err := s.Get(c, &req) |
| 429 » » » » » So(err, ShouldBeRPCOK) |
| 430 » » » » » So(resp.State, ShouldResemble, loadLogSt
reamState(ls)) |
| 431 » » » » » So(len(resp.Logs), ShouldEqual, 0) |
| 432 » » » » }) |
| 433 |
| 434 » » » » Convey(`Will return Internal if the protobuf des
criptor data is corrupt.`, func() { |
| 435 » » » » » ls.SetDSValidate(false) |
| 436 » » » » » ls.Descriptor = []byte{0x00} // Invalid
protobuf, zero tag. |
| 437 » » » » » if err := ds.Get(c).Put(ls); err != nil
{ |
| 438 » » » » » » panic(err) |
| 297 } | 439 } |
| 298 | 440 |
| 299 » » » » » Convey(`When the log stream is purged`,
func() { | 441 » » » » » _, err := s.Get(c, &req) |
| 300 » » » » » » ls.Purged = true | 442 » » » » » So(err, ShouldBeRPCInternal) |
| 301 » » » » » » if err := ds.Get(c).Put(ls); err
!= nil { | 443 » » » » }) |
| 302 » » » » » » » panic(err) | 444 » » » }) |
| 303 » » » » » » } | 445 |
| 304 | 446 » » » Convey(`Will return Internal if the protobuf log entry d
ata is corrupt.`, func() { |
| 305 » » » » » » Convey(`Will return NotFound if
the user is not an administrator.`, func() { | 447 » » » » if archived { |
| 306 » » » » » » » _, err := s.Get(c, &req) | 448 » » » » » // Corrupt the archive datastream. |
| 307 » » » » » » » So(err, ShouldBeRPCNotFo
und) | 449 » » » » » stream := gsc.get("gs://testbucket/strea
m") |
| 308 » » » » » » }) | 450 » » » » » zeroRecords(stream) |
| 309 | 451 » » » » } else { |
| 310 » » » » » » Convey(`Will process the request
if the user is an administrator.`, func() { | 452 » » » » » // Add corrupted entry to Storage. Creat
e a new entry here, since |
| 311 » » » » » » » fs.IdentityGroups = []st
ring{"test-administrators"} | 453 » » » » » // the storage will reject a duplicate/o
verwrite. |
| 312 | 454 » » » » » err := ms.Put(storage.PutRequest{ |
| 313 » » » » » » » resp, err := s.Get(c, &r
eq) | 455 » » » » » » Path: types.StreamPath(req.Pat
h), |
| 314 » » » » » » » So(err, ShouldBeRPCOK) | 456 » » » » » » Index: 666, |
| 315 » » » » » » » So(resp, shouldHaveLogs,
0, 1, 2) | 457 » » » » » » Values: [][]byte{{0x00}}, // Inv
alid protobuf, zero tag. |
| 316 » » » » » » }) | |
| 317 }) | 458 }) |
| 318 | 459 » » » » » if err != nil { |
| 319 » » » » » Convey(`Will return empty if no records
were requested.`, func() { | 460 » » » » » » panic(err) |
| 320 » » » » » » req.LogCount = -1 | |
| 321 » » » » » » req.State = false | |
| 322 | |
| 323 » » » » » » resp, err := s.Get(c, &req) | |
| 324 » » » » » » So(err, ShouldBeRPCOK) | |
| 325 » » » » » » So(resp.Logs, ShouldHaveLength,
0) | |
| 326 » » » » » }) | |
| 327 | |
| 328 » » » » » Convey(`Will successfully retrieve a str
eam path.`, func() { | |
| 329 » » » » » » resp, err := s.Get(c, &req) | |
| 330 » » » » » » So(err, ShouldBeRPCOK) | |
| 331 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) | |
| 332 » » » » » }) | |
| 333 | |
| 334 » » » » » Convey(`Will successfully retrieve a str
eam path offset at 4.`, func() { | |
| 335 » » » » » » req.Index = 4 | |
| 336 | |
| 337 » » » » » » resp, err := s.Get(c, &req) | |
| 338 » » » » » » So(err, ShouldBeRPCOK) | |
| 339 » » » » » » So(resp, shouldHaveLogs, 4, 5) | |
| 340 » » » » » }) | |
| 341 | |
| 342 » » » » » Convey(`Will retrieve no logs for contig
uous offset 6.`, func() { | |
| 343 » » » » » » req.Index = 6 | |
| 344 | |
| 345 » » » » » » resp, err := s.Get(c, &req) | |
| 346 » » » » » » So(err, ShouldBeRPCOK) | |
| 347 » » » » » » So(len(resp.Logs), ShouldEqual,
0) | |
| 348 » » » » » }) | |
| 349 | |
| 350 » » » » » Convey(`Will retrieve log 7 for non-cont
iguous offset 6.`, func() { | |
| 351 » » » » » » req.NonContiguous = true | |
| 352 » » » » » » req.Index = 6 | |
| 353 | |
| 354 » » » » » » resp, err := s.Get(c, &req) | |
| 355 » » » » » » So(err, ShouldBeRPCOK) | |
| 356 » » » » » » So(resp, shouldHaveLogs, 7) | |
| 357 » » » » » }) | |
| 358 | |
| 359 » » » » » Convey(`With a byte limit of 1, will sti
ll return at least one log entry.`, func() { | |
| 360 » » » » » » req.ByteCount = 1 | |
| 361 | |
| 362 » » » » » » resp, err := s.Get(c, &req) | |
| 363 » » » » » » So(err, ShouldBeRPCOK) | |
| 364 » » » » » » So(resp, shouldHaveLogs, 0) | |
| 365 » » » » » }) | |
| 366 | |
| 367 » » » » » Convey(`With a byte limit of sizeof(0),
will return log entry 0.`, func() { | |
| 368 » » » » » » req.ByteCount = int32(len(protob
ufs[0])) | |
| 369 | |
| 370 » » » » » » resp, err := s.Get(c, &req) | |
| 371 » » » » » » So(err, ShouldBeRPCOK) | |
| 372 » » » » » » So(resp, shouldHaveLogs, 0) | |
| 373 » » » » » }) | |
| 374 | |
| 375 » » » » » Convey(`With a byte limit of sizeof(0)+1
, will return log entry 0.`, func() { | |
| 376 » » » » » » req.ByteCount = int32(len(protob
ufs[0])) | |
| 377 | |
| 378 » » » » » » resp, err := s.Get(c, &req) | |
| 379 » » » » » » So(err, ShouldBeRPCOK) | |
| 380 » » » » » » So(resp, shouldHaveLogs, 0) | |
| 381 » » » » » }) | |
| 382 | |
| 383 » » » » » Convey(`With a byte limit of sizeof({0,
1}), will return log entries {0, 1}.`, func() { | |
| 384 » » » » » » req.ByteCount = int32(len(protob
ufs[0]) + len(protobufs[1])) | |
| 385 | |
| 386 » » » » » » resp, err := s.Get(c, &req) | |
| 387 » » » » » » So(err, ShouldBeRPCOK) | |
| 388 » » » » » » So(resp, shouldHaveLogs, 0, 1) | |
| 389 » » » » » }) | |
| 390 | |
| 391 » » » » » Convey(`With a byte limit of sizeof({0,
1, 2}), will return log entries {0, 1, 2}.`, func() { | |
| 392 » » » » » » req.ByteCount = int32(len(protob
ufs[0]) + len(protobufs[1]) + len(protobufs[2])) | |
| 393 | |
| 394 » » » » » » resp, err := s.Get(c, &req) | |
| 395 » » » » » » So(err, ShouldBeRPCOK) | |
| 396 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) | |
| 397 » » » » » }) | |
| 398 | |
| 399 » » » » » Convey(`With a byte limit of sizeof({0,
1, 2})+1, will return log entries {0, 1, 2}.`, func() { | |
| 400 » » » » » » req.ByteCount = int32(len(protob
ufs[0]) + len(protobufs[1]) + len(protobufs[2]) + 1) | |
| 401 | |
| 402 » » » » » » resp, err := s.Get(c, &req) | |
| 403 » » » » » » So(err, ShouldBeRPCOK) | |
| 404 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) | |
| 405 » » » » » }) | |
| 406 | |
| 407 » » » » » Convey(`Will successfully retrieve a str
eam path hash.`, func() { | |
| 408 » » » » » » req.Path = ls.HashID | |
| 409 » » » » » » resp, err := s.Get(c, &req) | |
| 410 » » » » » » So(err, ShouldBeRPCOK) | |
| 411 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) | |
| 412 » » » » » }) | |
| 413 | |
| 414 » » » » » Convey(`When requesting state`, func() { | |
| 415 » » » » » » req.State = true | |
| 416 » » » » » » req.LogCount = -1 | |
| 417 | |
| 418 » » » » » » Convey(`Will successfully retrie
ve stream state.`, func() { | |
| 419 » » » » » » » resp, err := s.Get(c, &r
eq) | |
| 420 » » » » » » » So(err, ShouldBeRPCOK) | |
| 421 » » » » » » » So(resp.State, ShouldRes
emble, loadLogStreamState(ls)) | |
| 422 » » » » » » » So(len(resp.Logs), Shoul
dEqual, 0) | |
| 423 » » » » » » }) | |
| 424 | |
| 425 » » » » » » Convey(`Will return Internal if
the protobuf descriptor data is corrupt.`, func() { | |
| 426 » » » » » » » ls.SetDSValidate(false) | |
| 427 » » » » » » » ls.Descriptor = []byte{0
x00} // Invalid protobuf, zero tag. | |
| 428 » » » » » » » if err := ds.Get(c).Put(
ls); err != nil { | |
| 429 » » » » » » » » panic(err) | |
| 430 » » » » » » » } | |
| 431 | |
| 432 » » » » » » » _, err := s.Get(c, &req) | |
| 433 » » » » » » » So(err, ShouldBeRPCInter
nal) | |
| 434 » » » » » » }) | |
| 435 » » » » » }) | |
| 436 | |
| 437 » » » » » Convey(`Will return Internal if the prot
obuf log entry data is corrupt.`, func() { | |
| 438 » » » » » » if v { | |
| 439 » » » » » » » // Corrupt the archive d
atastream. | |
| 440 » » » » » » » stream := gsc.get("gs://
testbucket/stream") | |
| 441 » » » » » » » zeroRecords(stream) | |
| 442 » » » » » » } else { | |
| 443 » » » » » » » // Add corrupted entry t
o Storage. Create a new entry here, since | |
| 444 » » » » » » » // the storage will reje
ct a duplicate/overwrite. | |
| 445 » » » » » » » err := ms.Put(storage.Pu
tRequest{ | |
| 446 » » » » » » » » Path: types.St
reamPath(req.Path), | |
| 447 » » » » » » » » Index: 666, | |
| 448 » » » » » » » » Values: [][]byte
{{0x00}}, // Invalid protobuf, zero tag. | |
| 449 » » » » » » » }) | |
| 450 » » » » » » » if err != nil { | |
| 451 » » » » » » » » panic(err) | |
| 452 » » » » » » » } | |
| 453 » » » » » » » req.Index = 666 | |
| 454 » » » » » » } | |
| 455 | |
| 456 » » » » » » _, err := s.Get(c, &req) | |
| 457 » » » » » » So(err, ShouldBeRPCInternal) | |
| 458 » » » » » }) | |
| 459 | |
| 460 » » » » » Convey(`Will successfully retrieve both
logs and stream state.`, func() { | |
| 461 » » » » » » req.State = true | |
| 462 | |
| 463 » » » » » » resp, err := s.Get(c, &req) | |
| 464 » » » » » » So(err, ShouldBeRPCOK) | |
| 465 » » » » » » So(resp.State, ShouldResemble, l
oadLogStreamState(ls)) | |
| 466 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) | |
| 467 » » » » » }) | |
| 468 | |
| 469 » » » » » Convey(`Will return Internal if the Stor
age is not working.`, func() { | |
| 470 » » » » » » if v { | |
| 471 » » » » » » » gsc["error"] = []byte("t
est error") | |
| 472 » » » » » » } else { | |
| 473 » » » » » » » ms.Close() | |
| 474 » » » » » » } | |
| 475 | |
| 476 » » » » » » _, err := s.Get(c, &req) | |
| 477 » » » » » » So(err, ShouldBeRPCInternal) | |
| 478 » » » » » }) | |
| 479 | |
| 480 » » » » » Convey(`Will enforce a maximum count of
2.`, func() { | |
| 481 » » » » » » req.LogCount = 2 | |
| 482 » » » » » » resp, err := s.Get(c, &req) | |
| 483 » » » » » » So(err, ShouldBeRPCOK) | |
| 484 » » » » » » So(resp, shouldHaveLogs, 0, 1) | |
| 485 » » » » » }) | |
| 486 | |
| 487 » » » » » Convey(`When requesting protobufs`, func
() { | |
| 488 » » » » » » req.State = true | |
| 489 | |
| 490 » » » » » » resp, err := s.Get(c, &req) | |
| 491 » » » » » » So(err, ShouldBeRPCOK) | |
| 492 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
) | |
| 493 | |
| 494 » » » » » » // Confirm that this has protobu
fs. | |
| 495 » » » » » » So(len(resp.Logs), ShouldEqual,
3) | |
| 496 » » » » » » So(resp.Logs[0], ShouldNotBeNil) | |
| 497 | |
| 498 » » » » » » // Confirm that there is a descr
iptor protobuf. | |
| 499 » » » » » » So(resp.Desc, ShouldResemble, de
sc) | |
| 500 | |
| 501 » » » » » » // Confirm that the state was re
turned. | |
| 502 » » » » » » So(resp.State, ShouldNotBeNil) | |
| 503 » » » » » }) | |
| 504 | |
| 505 » » » » » Convey(`Will successfully retrieve all r
ecords if non-contiguous is allowed.`, func() { | |
| 506 » » » » » » req.NonContiguous = true | |
| 507 » » » » » » resp, err := s.Get(c, &req) | |
| 508 » » » » » » So(err, ShouldBeRPCOK) | |
| 509 » » » » » » So(resp, shouldHaveLogs, 0, 1, 2
, 4, 5, 7) | |
| 510 » » » » » }) | |
| 511 | |
| 512 » » » » » Convey(`When newlines are not requested,
does not include delimiters.`, func() { | |
| 513 » » » » » » req.LogCount = 1 | |
| 514 | |
| 515 » » » » » » resp, err := s.Get(c, &req) | |
| 516 » » » » » » So(err, ShouldBeRPCOK) | |
| 517 » » » » » » So(resp, shouldHaveLogs, 0) | |
| 518 | |
| 519 » » » » » » So(resp.Logs[0].GetText(), Shoul
dResemble, &logpb.Text{ | |
| 520 » » » » » » » Lines: []*logpb.Text_Lin
e{ | |
| 521 » » » » » » » » {"log entry #0",
"\n"}, | |
| 522 » » » » » » » » {"another line o
f text", ""}, | |
| 523 » » » » » » » }, | |
| 524 » » » » » » }) | |
| 525 » » » » » }) | |
| 526 | |
| 527 » » » » » Convey(`Will get a Binary LogEntry`, fun
c() { | |
| 528 » » » » » » req.Index = 4 | |
| 529 » » » » » » req.LogCount = 1 | |
| 530 » » » » » » resp, err := s.Get(c, &req) | |
| 531 » » » » » » So(err, ShouldBeRPCOK) | |
| 532 » » » » » » So(resp, shouldHaveLogs, 4) | |
| 533 » » » » » » So(resp.Logs[0].GetBinary(), Sho
uldResemble, &logpb.Binary{ | |
| 534 » » » » » » » Data: []byte{0x00, 0x01,
0x02, 0x03}, | |
| 535 » » » » » » }) | |
| 536 » » » » » }) | |
| 537 | |
| 538 » » » » » Convey(`Will get a Datagram LogEntry`, f
unc() { | |
| 539 » » » » » » req.Index = 5 | |
| 540 » » » » » » req.LogCount = 1 | |
| 541 » » » » » » resp, err := s.Get(c, &req) | |
| 542 » » » » » » So(err, ShouldBeRPCOK) | |
| 543 » » » » » » So(resp, shouldHaveLogs, 5) | |
| 544 » » » » » » So(resp.Logs[0].GetDatagram(), S
houldResemble, &logpb.Datagram{ | |
| 545 » » » » » » » Data: []byte{0x00, 0x01,
0x02, 0x03}, | |
| 546 » » » » » » » Partial: &logpb.Datagram
_Partial{ | |
| 547 » » » » » » » » Index: 2, | |
| 548 » » » » » » » » Size: 1024, | |
| 549 » » » » » » » » Last: false, | |
| 550 » » » » » » » }, | |
| 551 » » » » » » }) | |
| 552 » » » » » }) | |
| 553 » » » » }) | |
| 554 | |
| 555 » » » » Convey(`Testing tail requests`, func() { | |
| 556 » » » » » req := logdog.TailRequest{ | |
| 557 » » » » » » Path: string(ls.Path()), | |
| 558 } | 461 } |
| 559 | 462 » » » » » req.Index = 666 |
| 560 » » » » » Convey(`Will successfully retrieve a str
eam path.`, func() { | 463 » » » » } |
| 561 » » » » » » resp, err := s.Tail(c, &req) | 464 |
| 562 » » » » » » So(err, ShouldBeRPCOK) | 465 » » » » _, err := s.Get(c, &req) |
| 563 » » » » » » So(resp, shouldHaveLogs, 7) | 466 » » » » So(err, ShouldBeRPCInternal) |
| 564 » » » » » }) | 467 » » » }) |
| 565 | 468 |
| 566 » » » » » Convey(`Will successfully retrieve a str
eam path hash and state.`, func() { | 469 » » » Convey(`Will successfully retrieve both logs and stream
state.`, func() { |
| 567 » » » » » » req.Path = ls.HashID | 470 » » » » req.State = true |
| 568 » » » » » » req.State = true | 471 |
| 569 | 472 » » » » resp, err := s.Get(c, &req) |
| 570 » » » » » » resp, err := s.Tail(c, &req) | 473 » » » » So(err, ShouldBeRPCOK) |
| 571 » » » » » » So(err, ShouldBeRPCOK) | 474 » » » » So(resp.State, ShouldResemble, loadLogStreamStat
e(ls)) |
| 572 » » » » » » So(resp, shouldHaveLogs, 7) | 475 » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 573 » » » » » » So(resp.State, ShouldResemble, l
oadLogStreamState(ls)) | 476 » » » }) |
| 574 » » » » » }) | 477 |
| 575 » » » » }) | 478 » » » Convey(`Will return Internal if the Storage is not worki
ng.`, func() { |
| 576 » » » }) | 479 » » » » if archived { |
| 577 » » } | 480 » » » » » gsc["error"] = []byte("test error") |
| 481 » » » » } else { |
| 482 » » » » » ms.Close() |
| 483 » » » » } |
| 484 |
| 485 » » » » _, err := s.Get(c, &req) |
| 486 » » » » So(err, ShouldBeRPCInternal) |
| 487 » » » }) |
| 488 |
| 489 » » » Convey(`Will enforce a maximum count of 2.`, func() { |
| 490 » » » » req.LogCount = 2 |
| 491 » » » » resp, err := s.Get(c, &req) |
| 492 » » » » So(err, ShouldBeRPCOK) |
| 493 » » » » So(resp, shouldHaveLogs, 0, 1) |
| 494 » » » }) |
| 495 |
| 496 » » » Convey(`When requesting protobufs`, func() { |
| 497 » » » » req.State = true |
| 498 |
| 499 » » » » resp, err := s.Get(c, &req) |
| 500 » » » » So(err, ShouldBeRPCOK) |
| 501 » » » » So(resp, shouldHaveLogs, 0, 1, 2) |
| 502 |
| 503 » » » » // Confirm that this has protobufs. |
| 504 » » » » So(len(resp.Logs), ShouldEqual, 3) |
| 505 » » » » So(resp.Logs[0], ShouldNotBeNil) |
| 506 |
| 507 » » » » // Confirm that there is a descriptor protobuf. |
| 508 » » » » So(resp.Desc, ShouldResemble, desc) |
| 509 |
| 510 » » » » // Confirm that the state was returned. |
| 511 » » » » So(resp.State, ShouldNotBeNil) |
| 512 » » » }) |
| 513 |
| 514 » » » Convey(`Will successfully retrieve all records if non-co
ntiguous is allowed.`, func() { |
| 515 » » » » req.NonContiguous = true |
| 516 » » » » resp, err := s.Get(c, &req) |
| 517 » » » » So(err, ShouldBeRPCOK) |
| 518 » » » » So(resp, shouldHaveLogs, 0, 1, 2, 4, 5, 7) |
| 519 » » » }) |
| 520 |
| 521 » » » Convey(`When newlines are not requested, does not includ
e delimiters.`, func() { |
| 522 » » » » req.LogCount = 1 |
| 523 |
| 524 » » » » resp, err := s.Get(c, &req) |
| 525 » » » » So(err, ShouldBeRPCOK) |
| 526 » » » » So(resp, shouldHaveLogs, 0) |
| 527 |
| 528 » » » » So(resp.Logs[0].GetText(), ShouldResemble, &logp
b.Text{ |
| 529 » » » » » Lines: []*logpb.Text_Line{ |
| 530 » » » » » » {"log entry #0", "\n"}, |
| 531 » » » » » » {"another line of text", ""}, |
| 532 » » » » » }, |
| 533 » » » » }) |
| 534 » » » }) |
| 535 |
| 536 » » » Convey(`Will get a Binary LogEntry`, func() { |
| 537 » » » » req.Index = 4 |
| 538 » » » » req.LogCount = 1 |
| 539 » » » » resp, err := s.Get(c, &req) |
| 540 » » » » So(err, ShouldBeRPCOK) |
| 541 » » » » So(resp, shouldHaveLogs, 4) |
| 542 » » » » So(resp.Logs[0].GetBinary(), ShouldResemble, &lo
gpb.Binary{ |
| 543 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, |
| 544 » » » » }) |
| 545 » » » }) |
| 546 |
| 547 » » » Convey(`Will get a Datagram LogEntry`, func() { |
| 548 » » » » req.Index = 5 |
| 549 » » » » req.LogCount = 1 |
| 550 » » » » resp, err := s.Get(c, &req) |
| 551 » » » » So(err, ShouldBeRPCOK) |
| 552 » » » » So(resp, shouldHaveLogs, 5) |
| 553 » » » » So(resp.Logs[0].GetDatagram(), ShouldResemble, &
logpb.Datagram{ |
| 554 » » » » » Data: []byte{0x00, 0x01, 0x02, 0x03}, |
| 555 » » » » » Partial: &logpb.Datagram_Partial{ |
| 556 » » » » » » Index: 2, |
| 557 » » » » » » Size: 1024, |
| 558 » » » » » » Last: false, |
| 559 » » » » » }, |
| 560 » » » » }) |
| 561 » » » }) |
| 562 » » }) |
| 563 |
| 564 » » Convey(`Testing tail requests`, func() { |
| 565 » » » req := logdog.TailRequest{ |
| 566 » » » » Path: string(ls.Path()), |
| 567 » » » } |
| 568 |
| 569 » » » Convey(`Will successfully retrieve a stream path.`, func
() { |
| 570 » » » » resp, err := s.Tail(c, &req) |
| 571 » » » » So(err, ShouldBeRPCOK) |
| 572 » » » » So(resp, shouldHaveLogs, 7) |
| 573 » » » }) |
| 574 |
| 575 » » » Convey(`Will successfully retrieve a stream path hash an
d state.`, func() { |
| 576 » » » » req.Path = ls.HashID |
| 577 » » » » req.State = true |
| 578 |
| 579 » » » » resp, err := s.Tail(c, &req) |
| 580 » » » » So(err, ShouldBeRPCOK) |
| 581 » » » » So(resp, shouldHaveLogs, 7) |
| 582 » » » » So(resp.State, ShouldResemble, loadLogStreamStat
e(ls)) |
| 583 » » » }) |
| 584 » » }) |
| 578 }) | 585 }) |
| 579 } | 586 } |
| 587 |
| 588 func TestGetIntermediate(t *testing.T) { |
| 589 t.Parallel() |
| 590 |
| 591 testGetImpl(t, false) |
| 592 } |
| 593 |
| 594 func TestGetArchived(t *testing.T) { |
| 595 t.Parallel() |
| 596 |
| 597 testGetImpl(t, true) |
| 598 } |
| OLD | NEW |