| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "io" | 11 "io" |
| 12 "testing" | 12 "testing" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/golang/protobuf/proto" | 15 "github.com/golang/protobuf/proto" |
| 16 "github.com/luci/gae/filter/featureBreaker" | 16 "github.com/luci/gae/filter/featureBreaker" |
| 17 "github.com/luci/gae/impl/memory" | 17 "github.com/luci/gae/impl/memory" |
| 18 ds "github.com/luci/gae/service/datastore" | 18 ds "github.com/luci/gae/service/datastore" |
| 19 "github.com/luci/luci-go/appengine/logdog/coordinator" | 19 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 21 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" | 21 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" |
| 22 "github.com/luci/luci-go/common/clock/testclock" | 22 "github.com/luci/luci-go/common/clock/testclock" |
| 23 "github.com/luci/luci-go/common/gcloud/gs" | 23 "github.com/luci/luci-go/common/gcloud/gs" |
| 24 "github.com/luci/luci-go/common/iotools" | 24 "github.com/luci/luci-go/common/iotools" |
| 25 "github.com/luci/luci-go/common/logdog/types" | 25 "github.com/luci/luci-go/common/logdog/types" |
| 26 "github.com/luci/luci-go/common/proto/logdog/logpb" | 26 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 27 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | |
| 28 "github.com/luci/luci-go/common/recordio" | 27 "github.com/luci/luci-go/common/recordio" |
| 29 "github.com/luci/luci-go/server/auth" | 28 "github.com/luci/luci-go/server/auth" |
| 30 "github.com/luci/luci-go/server/auth/authtest" | 29 "github.com/luci/luci-go/server/auth/authtest" |
| 31 "github.com/luci/luci-go/server/logdog/archive" | 30 "github.com/luci/luci-go/server/logdog/archive" |
| 32 "github.com/luci/luci-go/server/logdog/storage" | 31 "github.com/luci/luci-go/server/logdog/storage" |
| 33 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" | 32 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" |
| 34 "golang.org/x/net/context" | 33 "golang.org/x/net/context" |
| 35 | 34 |
| 36 . "github.com/luci/luci-go/common/testing/assertions" | 35 . "github.com/luci/luci-go/common/testing/assertions" |
| 37 . "github.com/smartystreets/goconvey/convey" | 36 . "github.com/smartystreets/goconvey/convey" |
| 38 ) | 37 ) |
| 39 | 38 |
| 40 type staticArchiveSource []*logpb.LogEntry | 39 type staticArchiveSource []*logpb.LogEntry |
| 41 | 40 |
| 42 func (s *staticArchiveSource) NextLogEntry() (le *logpb.LogEntry, err error) { | 41 func (s *staticArchiveSource) NextLogEntry() (le *logpb.LogEntry, err error) { |
| 43 if len(*s) == 0 { | 42 if len(*s) == 0 { |
| 44 err = archive.ErrEndOfStream | 43 err = archive.ErrEndOfStream |
| 45 } else { | 44 } else { |
| 46 le, *s = (*s)[0], (*s)[1:] | 45 le, *s = (*s)[0], (*s)[1:] |
| 47 } | 46 } |
| 48 return | 47 return |
| 49 } | 48 } |
| 50 | 49 |
| 51 type testGSClient map[string][]byte | 50 type testGSClient map[gs.Path][]byte |
| 52 | 51 |
| 53 func (c testGSClient) key(bucket, relpath string) string { | 52 func (c testGSClient) put(path gs.Path, d []byte) { |
| 54 » return fmt.Sprintf("%s/%s", bucket, relpath) | 53 » c[path] = d |
| 55 } | 54 } |
| 56 | 55 |
| 57 func (c testGSClient) put(bucket, relpath string, d []byte) { | 56 func (c testGSClient) get(path gs.Path) []byte { |
| 58 » c[c.key(bucket, relpath)] = d | 57 » return c[path] |
| 59 } | |
| 60 | |
| 61 func (c testGSClient) get(bucket, relpath string) []byte { | |
| 62 » return c[c.key(bucket, relpath)] | |
| 63 } | 58 } |
| 64 | 59 |
| 65 func (c testGSClient) Close() error { return nil } | 60 func (c testGSClient) Close() error { return nil } |
| 66 func (c testGSClient) NewWriter(string, string) (gs.Writer, error) { | 61 func (c testGSClient) NewWriter(gs.Path) (gs.Writer, error) { |
| 67 return nil, errors.New("not implemented") | 62 return nil, errors.New("not implemented") |
| 68 } | 63 } |
| 69 func (c testGSClient) Delete(string, string) error { return errors.New("not impl
emented") } | 64 func (c testGSClient) Rename(gs.Path, gs.Path) error { return errors.New("not im
plemented") } |
| 65 func (c testGSClient) Delete(gs.Path) error { return errors.New("not im
plemented") } |
| 70 | 66 |
| 71 func (c testGSClient) NewReader(bucket, relpath string, o gs.Options) (io.ReadCl
oser, error) { | 67 func (c testGSClient) NewReader(path gs.Path, o gs.Options) (io.ReadCloser, erro
r) { |
| 72 if d, ok := c["error"]; ok { | 68 if d, ok := c["error"]; ok { |
| 73 return nil, errors.New(string(d)) | 69 return nil, errors.New(string(d)) |
| 74 } | 70 } |
| 75 | 71 |
| 76 » d, ok := c[c.key(bucket, relpath)] | 72 » d, ok := c[path] |
| 77 if !ok { | 73 if !ok { |
| 78 return nil, errors.New("does not exist") | 74 return nil, errors.New("does not exist") |
| 79 } | 75 } |
| 80 | 76 |
| 81 to := int(o.To) | 77 to := int(o.To) |
| 82 if to == 0 { | 78 if to == 0 { |
| 83 to = len(d) | 79 to = len(d) |
| 84 } | 80 } |
| 85 d = d[int(o.From):to] | 81 d = d[int(o.From):to] |
| 86 | 82 |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 148 func TestGet(t *testing.T) { | 144 func TestGet(t *testing.T) { |
| 149 t.Parallel() | 145 t.Parallel() |
| 150 | 146 |
| 151 Convey(`With a testing configuration, a Get request`, t, func() { | 147 Convey(`With a testing configuration, a Get request`, t, func() { |
| 152 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 148 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 153 c = memory.Use(c) | 149 c = memory.Use(c) |
| 154 | 150 |
| 155 fs := authtest.FakeState{} | 151 fs := authtest.FakeState{} |
| 156 c = auth.WithState(c, &fs) | 152 c = auth.WithState(c, &fs) |
| 157 | 153 |
| 158 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ | 154 » » ms := memoryStorage.Storage{} |
| 159 » » » AdminAuthGroup: "test-administrators", | 155 » » gsc := testGSClient{} |
| 160 » » }) | 156 » » svcStub := ct.Services{ |
| 157 » » » IS: func() (storage.Storage, error) { |
| 158 » » » » return &ms, nil |
| 159 » » » }, |
| 160 » » » GS: func() (gs.Client, error) { |
| 161 » » » » return gsc, nil |
| 162 » » » }, |
| 163 » » } |
| 164 » » svcStub.InitConfig() |
| 165 » » svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis
trators" |
| 166 |
| 167 » » s := Server{ |
| 168 » » » ServiceBase: coordinator.ServiceBase{&svcStub}, |
| 169 » » } |
| 161 | 170 |
| 162 // Generate our test stream. | 171 // Generate our test stream. |
| 163 desc := ct.TestLogStreamDescriptor(c, "foo/bar") | 172 desc := ct.TestLogStreamDescriptor(c, "foo/bar") |
| 164 ls := ct.TestLogStream(c, desc) | 173 ls := ct.TestLogStream(c, desc) |
| 165 » » if err := ls.Put(ds.Get(c)); err != nil { | 174 » » if err := ds.Get(c).Put(ls); err != nil { |
| 166 panic(err) | 175 panic(err) |
| 167 } | 176 } |
| 168 | 177 |
| 169 tc.Add(time.Second) | 178 tc.Add(time.Second) |
| 170 var entries []*logpb.LogEntry | 179 var entries []*logpb.LogEntry |
| 171 protobufs := map[uint64][]byte{} | 180 protobufs := map[uint64][]byte{} |
| 172 for _, v := range []int{0, 1, 2, 4, 5, 7} { | 181 for _, v := range []int{0, 1, 2, 4, 5, 7} { |
| 173 le := ct.TestLogEntry(c, ls, v) | 182 le := ct.TestLogEntry(c, ls, v) |
| 174 le.GetText().Lines = append(le.GetText().Lines, &logpb.T
ext_Line{ | 183 le.GetText().Lines = append(le.GetText().Lines, &logpb.T
ext_Line{ |
| 175 Value: "another line of text", | 184 Value: "another line of text", |
| (...skipping 21 matching lines...) Expand all Loading... |
| 197 } | 206 } |
| 198 } | 207 } |
| 199 | 208 |
| 200 d, err := proto.Marshal(le) | 209 d, err := proto.Marshal(le) |
| 201 if err != nil { | 210 if err != nil { |
| 202 panic(err) | 211 panic(err) |
| 203 } | 212 } |
| 204 protobufs[uint64(v)] = d | 213 protobufs[uint64(v)] = d |
| 205 } | 214 } |
| 206 | 215 |
| 207 // Define and populate our Storage. | |
| 208 s := Server{} | |
| 209 | |
| 210 ms := memoryStorage.Storage{} | |
| 211 gsc := testGSClient{} | |
| 212 s.StorageFunc = func(context.Context) (storage.Storage, error) { | |
| 213 return &ms, nil | |
| 214 } | |
| 215 s.GSClientFunc = func(context.Context) (gs.Client, error) { | |
| 216 return gsc, nil | |
| 217 } | |
| 218 | |
| 219 Convey(`Testing Get requests (no logs)`, func() { | 216 Convey(`Testing Get requests (no logs)`, func() { |
| 220 req := logdog.GetRequest{ | 217 req := logdog.GetRequest{ |
| 221 Path: string(ls.Path()), | 218 Path: string(ls.Path()), |
| 222 } | 219 } |
| 223 | 220 |
| 224 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { | 221 Convey(`Will fail if the Path is not a stream path or a
hash.`, func() { |
| 225 req.Path = "not/a/full/stream/path" | 222 req.Path = "not/a/full/stream/path" |
| 226 _, err := s.Get(c, &req) | 223 _, err := s.Get(c, &req) |
| 227 So(err, ShouldErrLike, "invalid path value") | 224 So(err, ShouldErrLike, "invalid path value") |
| 228 }) | 225 }) |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 273 Desc: desc, | 270 Desc: desc, |
| 274 Source: &src, | 271 Source: &src, |
| 275 LogWriter: &lbuf, | 272 LogWriter: &lbuf, |
| 276 IndexWriter: &ibuf, | 273 IndexWriter: &ibuf, |
| 277 StreamIndexRange: 2, | 274 StreamIndexRange: 2, |
| 278 } | 275 } |
| 279 if err := archive.Archive(m); err != nil
{ | 276 if err := archive.Archive(m); err != nil
{ |
| 280 panic(err) | 277 panic(err) |
| 281 } | 278 } |
| 282 | 279 |
| 283 » » » » » gsc.put("testbucket", "stream", lbuf.Byt
es()) | 280 » » » » » now := tc.Now().UTC() |
| 284 » » » » » gsc.put("testbucket", "index", ibuf.Byte
s()) | 281 |
| 282 » » » » » gsc.put("gs://testbucket/stream", lbuf.B
ytes()) |
| 283 » » » » » gsc.put("gs://testbucket/index", ibuf.By
tes()) |
| 285 ls.State = coordinator.LSArchived | 284 ls.State = coordinator.LSArchived |
| 285 ls.TerminatedTime = now |
| 286 ls.ArchivedTime = now |
| 286 ls.ArchiveStreamURL = "gs://testbucket/s
tream" | 287 ls.ArchiveStreamURL = "gs://testbucket/s
tream" |
| 287 ls.ArchiveIndexURL = "gs://testbucket/in
dex" | 288 ls.ArchiveIndexURL = "gs://testbucket/in
dex" |
| 288 } | 289 } |
| 289 » » » » if err := ls.Put(ds.Get(c)); err != nil { | 290 » » » » if err := ds.Get(c).Put(ls); err != nil { |
| 290 panic(err) | 291 panic(err) |
| 291 } | 292 } |
| 292 | 293 |
| 293 Convey(`Testing Get requests`, func() { | 294 Convey(`Testing Get requests`, func() { |
| 294 req := logdog.GetRequest{ | 295 req := logdog.GetRequest{ |
| 295 Path: string(ls.Path()), | 296 Path: string(ls.Path()), |
| 296 } | 297 } |
| 297 | 298 |
| 298 Convey(`When the log stream is purged`,
func() { | 299 Convey(`When the log stream is purged`,
func() { |
| 299 ls.Purged = true | 300 ls.Purged = true |
| 300 » » » » » » if err := ls.Put(ds.Get(c)); err
!= nil { | 301 » » » » » » if err := ds.Get(c).Put(ls); err
!= nil { |
| 301 panic(err) | 302 panic(err) |
| 302 } | 303 } |
| 303 | 304 |
| 304 Convey(`Will return NotFound if
the user is not an administrator.`, func() { | 305 Convey(`Will return NotFound if
the user is not an administrator.`, func() { |
| 305 _, err := s.Get(c, &req) | 306 _, err := s.Get(c, &req) |
| 306 So(err, ShouldBeRPCNotFo
und) | 307 So(err, ShouldBeRPCNotFo
und) |
| 307 }) | 308 }) |
| 308 | 309 |
| 309 Convey(`Will process the request
if the user is an administrator.`, func() { | 310 Convey(`Will process the request
if the user is an administrator.`, func() { |
| 310 fs.IdentityGroups = []st
ring{"test-administrators"} | 311 fs.IdentityGroups = []st
ring{"test-administrators"} |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 397 | 398 |
| 398 Convey(`With a byte limit of sizeof({0,
1, 2})+1, will return log entries {0, 1, 2}.`, func() { | 399 Convey(`With a byte limit of sizeof({0,
1, 2})+1, will return log entries {0, 1, 2}.`, func() { |
| 399 req.ByteCount = int32(len(protob
ufs[0]) + len(protobufs[1]) + len(protobufs[2]) + 1) | 400 req.ByteCount = int32(len(protob
ufs[0]) + len(protobufs[1]) + len(protobufs[2]) + 1) |
| 400 | 401 |
| 401 resp, err := s.Get(c, &req) | 402 resp, err := s.Get(c, &req) |
| 402 So(err, ShouldBeRPCOK) | 403 So(err, ShouldBeRPCOK) |
| 403 So(resp, shouldHaveLogs, 0, 1, 2
) | 404 So(resp, shouldHaveLogs, 0, 1, 2
) |
| 404 }) | 405 }) |
| 405 | 406 |
| 406 Convey(`Will successfully retrieve a str
eam path hash.`, func() { | 407 Convey(`Will successfully retrieve a str
eam path hash.`, func() { |
| 407 » » » » » » req.Path = ls.HashID() | 408 » » » » » » req.Path = ls.HashID |
| 408 resp, err := s.Get(c, &req) | 409 resp, err := s.Get(c, &req) |
| 409 So(err, ShouldBeRPCOK) | 410 So(err, ShouldBeRPCOK) |
| 410 So(resp, shouldHaveLogs, 0, 1, 2
) | 411 So(resp, shouldHaveLogs, 0, 1, 2
) |
| 411 }) | 412 }) |
| 412 | 413 |
| 413 Convey(`When requesting state`, func() { | 414 Convey(`When requesting state`, func() { |
| 414 req.State = true | 415 req.State = true |
| 415 req.LogCount = -1 | 416 req.LogCount = -1 |
| 416 | 417 |
| 417 Convey(`Will successfully retrie
ve stream state.`, func() { | 418 Convey(`Will successfully retrie
ve stream state.`, func() { |
| 418 resp, err := s.Get(c, &r
eq) | 419 resp, err := s.Get(c, &r
eq) |
| 419 So(err, ShouldBeRPCOK) | 420 So(err, ShouldBeRPCOK) |
| 420 So(resp.State, ShouldRes
emble, loadLogStreamState(ls)) | 421 So(resp.State, ShouldRes
emble, loadLogStreamState(ls)) |
| 421 So(len(resp.Logs), Shoul
dEqual, 0) | 422 So(len(resp.Logs), Shoul
dEqual, 0) |
| 422 }) | 423 }) |
| 423 | 424 |
| 424 Convey(`Will return Internal if
the protobuf descriptor data is corrupt.`, func() { | 425 Convey(`Will return Internal if
the protobuf descriptor data is corrupt.`, func() { |
| 425 » » » » » » » // We can't use "ls.Put"
here because it validates the protobuf! | 426 » » » » » » » ls.SetDSValidate(false) |
| 426 ls.Descriptor = []byte{0
x00} // Invalid protobuf, zero tag. | 427 ls.Descriptor = []byte{0
x00} // Invalid protobuf, zero tag. |
| 427 if err := ds.Get(c).Put(
ls); err != nil { | 428 if err := ds.Get(c).Put(
ls); err != nil { |
| 428 panic(err) | 429 panic(err) |
| 429 } | 430 } |
| 430 | 431 |
| 431 _, err := s.Get(c, &req) | 432 _, err := s.Get(c, &req) |
| 432 So(err, ShouldBeRPCInter
nal) | 433 So(err, ShouldBeRPCInter
nal) |
| 433 }) | 434 }) |
| 434 }) | 435 }) |
| 435 | 436 |
| 436 Convey(`Will return Internal if the prot
obuf log entry data is corrupt.`, func() { | 437 Convey(`Will return Internal if the prot
obuf log entry data is corrupt.`, func() { |
| 437 if v { | 438 if v { |
| 438 // Corrupt the archive d
atastream. | 439 // Corrupt the archive d
atastream. |
| 439 » » » » » » » stream := gsc.get("testb
ucket", "stream") | 440 » » » » » » » stream := gsc.get("gs://
testbucket/stream") |
| 440 zeroRecords(stream) | 441 zeroRecords(stream) |
| 441 } else { | 442 } else { |
| 442 // Add corrupted entry t
o Storage. Create a new entry here, since | 443 // Add corrupted entry t
o Storage. Create a new entry here, since |
| 443 // the storage will reje
ct a duplicate/overwrite. | 444 // the storage will reje
ct a duplicate/overwrite. |
| 444 err := ms.Put(storage.Pu
tRequest{ | 445 err := ms.Put(storage.Pu
tRequest{ |
| 445 Path: types.St
reamPath(req.Path), | 446 Path: types.St
reamPath(req.Path), |
| 446 Index: 666, | 447 Index: 666, |
| 447 Values: [][]byte
{{0x00}}, // Invalid protobuf, zero tag. | 448 Values: [][]byte
{{0x00}}, // Invalid protobuf, zero tag. |
| 448 }) | 449 }) |
| 449 if err != nil { | 450 if err != nil { |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 556 Path: string(ls.Path()), | 557 Path: string(ls.Path()), |
| 557 } | 558 } |
| 558 | 559 |
| 559 Convey(`Will successfully retrieve a str
eam path.`, func() { | 560 Convey(`Will successfully retrieve a str
eam path.`, func() { |
| 560 resp, err := s.Tail(c, &req) | 561 resp, err := s.Tail(c, &req) |
| 561 So(err, ShouldBeRPCOK) | 562 So(err, ShouldBeRPCOK) |
| 562 So(resp, shouldHaveLogs, 7) | 563 So(resp, shouldHaveLogs, 7) |
| 563 }) | 564 }) |
| 564 | 565 |
| 565 Convey(`Will successfully retrieve a str
eam path hash and state.`, func() { | 566 Convey(`Will successfully retrieve a str
eam path hash and state.`, func() { |
| 566 » » » » » » req.Path = ls.HashID() | 567 » » » » » » req.Path = ls.HashID |
| 567 req.State = true | 568 req.State = true |
| 568 | 569 |
| 569 resp, err := s.Tail(c, &req) | 570 resp, err := s.Tail(c, &req) |
| 570 So(err, ShouldBeRPCOK) | 571 So(err, ShouldBeRPCOK) |
| 571 So(resp, shouldHaveLogs, 7) | 572 So(resp, shouldHaveLogs, 7) |
| 572 So(resp.State, ShouldResemble, l
oadLogStreamState(ls)) | 573 So(resp.State, ShouldResemble, l
oadLogStreamState(ls)) |
| 573 }) | 574 }) |
| 574 }) | 575 }) |
| 575 }) | 576 }) |
| 576 } | 577 } |
| 577 }) | 578 }) |
| 578 } | 579 } |
| OLD | NEW |