Chromium Code Reviews| Index: logdog/client/coordinator/stream_test.go |
| diff --git a/logdog/client/coordinator/stream_test.go b/logdog/client/coordinator/stream_test.go |
| index 54ff31e2f7050b723a6bdf496fd5ec0f3bb0fc4e..8794b060cf7e98edea08947f64102e2c4c430f4f 100644 |
| --- a/logdog/client/coordinator/stream_test.go |
| +++ b/logdog/client/coordinator/stream_test.go |
| @@ -33,6 +33,35 @@ func genLog(idx int64, id string) *logpb.LogEntry { |
| } |
| } |
| +func genDG(idx int64, content ...string) []*logpb.LogEntry { |
| + var contentSize uint64 |
| + if len(content) > 1 { |
| + for _, c := range content { |
| + contentSize += uint64(len(c)) |
| + } |
| + } |
| + |
| + logs := make([]*logpb.LogEntry, len(content)) |
| + for i, c := range content { |
| + dg := logpb.Datagram{ |
| + Data: []byte(c), |
| + } |
| + if len(content) > 1 { |
| + dg.Partial = &logpb.Datagram_Partial{ |
| + Index: uint32(i), |
| + Size: contentSize, |
| + Last: (i == len(content)-1), |
| + } |
| + } |
| + |
| + logs[i] = &logpb.LogEntry{ |
| + StreamIndex: uint64(idx + int64(i)), |
| + Content: &logpb.LogEntry_Datagram{&dg}, |
| + } |
| + } |
| + return logs |
| +} |
| + |
| // testStreamLogsService implements just the Get and Tail endpoints, |
| // instrumented for testing. |
| type testStreamLogsService struct { |
| @@ -90,8 +119,6 @@ func TestStreamGet(t *testing.T) { |
| s := client.Stream("myproj", "test/+/a") |
| Convey(`Test Get`, func() { |
| - p := NewGetParams() |
| - |
| Convey(`A default Get query will return logs and no state.`, func() { |
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) { |
| return &logdog.GetResponse{ |
| @@ -102,7 +129,7 @@ func TestStreamGet(t *testing.T) { |
| }, nil |
| } |
| - l, err := s.Get(c, nil) |
| + l, err := s.Get(c) |
| So(err, ShouldBeNil) |
| So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai"), genLog(1338, "kthxbye")}) |
| @@ -114,13 +141,11 @@ func TestStreamGet(t *testing.T) { |
| }) |
| Convey(`Will form a proper Get logs query.`, func() { |
| - p = p.NonContiguous().Index(1) |
| - |
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) { |
| return &logdog.GetResponse{}, nil |
| } |
| - l, err := s.Get(c, p) |
| + l, err := s.Get(c, NonContiguous(), Index(1)) |
| So(err, ShouldBeNil) |
| So(l, ShouldBeNil) |
| @@ -134,8 +159,6 @@ func TestStreamGet(t *testing.T) { |
| }) |
| Convey(`Will request a specific number of logs if a constraint is supplied.`, func() { |
| - p = p.Limit(32, 64) |
| - |
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) { |
| return &logdog.GetResponse{ |
| Logs: []*logpb.LogEntry{ |
| @@ -144,7 +167,7 @@ func TestStreamGet(t *testing.T) { |
| }, nil |
| } |
| - l, err := s.Get(c, p) |
| + l, err := s.Get(c, LimitCount(64), LimitBytes(32)) |
| So(err, ShouldBeNil) |
| So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai")}) |
| @@ -158,9 +181,6 @@ func TestStreamGet(t *testing.T) { |
| }) |
| Convey(`Can decode a full protobuf and state.`, func() { |
| - var ls LogStream |
| - p = p.State(&ls) |
| - |
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) { |
| return &logdog.GetResponse{ |
| Logs: []*logpb.LogEntry{ |
| @@ -182,17 +202,18 @@ func TestStreamGet(t *testing.T) { |
| }, nil |
| } |
| - l, err := s.Get(c, p) |
| + var ls LogStream |
| + l, err := s.Get(c, WithState(&ls)) |
| So(err, ShouldBeNil) |
| So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "kthxbye")}) |
| So(ls, ShouldResemble, LogStream{ |
| Path: "test/+/a", |
| - Desc: &logpb.LogStreamDescriptor{ |
| + Desc: logpb.LogStreamDescriptor{ |
| Prefix: "test", |
| Name: "a", |
| StreamType: logpb.StreamType_TEXT, |
| }, |
| - State: &StreamState{ |
| + State: StreamState{ |
| Created: now, |
| Archived: true, |
| ArchiveIndexURL: "index", |
| @@ -207,7 +228,7 @@ func TestStreamGet(t *testing.T) { |
| return nil, grpcutil.NotFound |
| } |
| - _, err := s.Get(c, p) |
| + _, err := s.Get(c) |
| So(err, ShouldEqual, ErrNoSuchStream) |
| }) |
| @@ -216,7 +237,7 @@ func TestStreamGet(t *testing.T) { |
| return nil, grpcutil.Unauthenticated |
| } |
| - _, err := s.Get(c, p) |
| + _, err := s.Get(c) |
| So(err, ShouldEqual, ErrNoAccess) |
| }) |
| @@ -225,7 +246,7 @@ func TestStreamGet(t *testing.T) { |
| return nil, grpcutil.PermissionDenied |
| } |
| - _, err := s.Get(c, p) |
| + _, err := s.Get(c) |
| So(err, ShouldEqual, ErrNoAccess) |
| }) |
| }) |
| @@ -235,6 +256,11 @@ func TestStreamGet(t *testing.T) { |
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) { |
| return &logdog.GetResponse{ |
| Project: "myproj", |
| + Desc: &logpb.LogStreamDescriptor{ |
| + Prefix: "test", |
| + Name: "a", |
| + StreamType: logpb.StreamType_TEXT, |
| + }, |
| State: &logdog.LogStreamState{ |
| Created: google.NewTimestamp(now), |
| }, |
| @@ -246,7 +272,12 @@ func TestStreamGet(t *testing.T) { |
| So(l, ShouldResemble, &LogStream{ |
| Project: "myproj", |
| Path: "test/+/a", |
| - State: &StreamState{ |
| + Desc: logpb.LogStreamDescriptor{ |
| + Prefix: "test", |
| + Name: "a", |
| + StreamType: logpb.StreamType_TEXT, |
| + }, |
| + State: StreamState{ |
| Created: now.UTC(), |
| }, |
| }) |
| @@ -308,7 +339,7 @@ func TestStreamGet(t *testing.T) { |
| } |
| var ls LogStream |
| - l, err := s.Tail(c, &ls) |
| + l, err := s.Tail(c, WithState(&ls)) |
| So(err, ShouldBeNil) |
| // Validate the HTTP request that we made. |
| @@ -323,12 +354,12 @@ func TestStreamGet(t *testing.T) { |
| So(ls, ShouldResemble, LogStream{ |
| Project: "myproj", |
| Path: "test/+/a", |
| - Desc: &logpb.LogStreamDescriptor{ |
| + Desc: logpb.LogStreamDescriptor{ |
| Prefix: "test", |
| Name: "a", |
| StreamType: logpb.StreamType_TEXT, |
| }, |
| - State: &StreamState{ |
| + State: StreamState{ |
| Created: now, |
| }, |
| }) |
| @@ -350,18 +381,18 @@ func TestStreamGet(t *testing.T) { |
| } |
| var ls LogStream |
| - l, err := s.Tail(c, &ls) |
| + l, err := s.Tail(c, WithState(&ls)) |
| So(err, ShouldBeNil) |
| So(l, ShouldBeNil) |
| So(ls, ShouldResemble, LogStream{ |
| Project: "myproj", |
| Path: "test/+/a", |
| - Desc: &logpb.LogStreamDescriptor{ |
| + Desc: logpb.LogStreamDescriptor{ |
| Prefix: "test", |
| Name: "a", |
| StreamType: logpb.StreamType_TEXT, |
| }, |
| - State: &StreamState{ |
| + State: StreamState{ |
| Created: now, |
| }, |
| }) |
| @@ -380,7 +411,7 @@ func TestStreamGet(t *testing.T) { |
| }, nil |
| } |
| - _, err := s.Tail(c, nil) |
| + _, err := s.Tail(c) |
| So(err, ShouldErrLike, "tail call returned 2 logs") |
| }) |
| @@ -389,7 +420,7 @@ func TestStreamGet(t *testing.T) { |
| return nil, grpcutil.NotFound |
| } |
| - _, err := s.Tail(c, nil) |
| + _, err := s.Tail(c) |
| So(err, ShouldEqual, ErrNoSuchStream) |
| }) |
| @@ -398,7 +429,7 @@ func TestStreamGet(t *testing.T) { |
| return nil, grpcutil.Unauthenticated |
| } |
| - _, err := s.Tail(c, nil) |
| + _, err := s.Tail(c) |
| So(err, ShouldEqual, ErrNoAccess) |
| }) |
| @@ -407,9 +438,204 @@ func TestStreamGet(t *testing.T) { |
| return nil, grpcutil.PermissionDenied |
| } |
| - _, err := s.Tail(c, nil) |
| + _, err := s.Tail(c) |
| So(err, ShouldEqual, ErrNoAccess) |
| }) |
| + |
| + Convey(`When requesting complete streams`, func() { |
| + var allLogs []*logpb.LogEntry |
| + allLogs = append(allLogs, genDG(1337, "foo", "bar", "baz", "kthxbye")...) |
| + allLogs = append(allLogs, genDG(1341, "qux", "ohai")...) |
| + allLogs = append(allLogs, genDG(1343, "complete")...) |
| + tailLog := allLogs[len(allLogs)-1] |
| + |
| + svc.TH = func(req *logdog.TailRequest) (*logdog.GetResponse, error) { |
| + return &logdog.GetResponse{ |
| + Logs: []*logpb.LogEntry{tailLog}, |
| + State: &logdog.LogStreamState{ |
| + Created: google.NewTimestamp(now), |
| + }, |
| + Desc: &logpb.LogStreamDescriptor{ |
| + Prefix: "test", |
| + Name: "a", |
| + StreamType: logpb.StreamType_DATAGRAM, |
| + }, |
| + }, nil |
| + } |
| + |
| + svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) { |
| + if req.State || req.NonContiguous || req.ByteCount != 0 { |
| + return nil, errors.New("not implemented in test") |
| + } |
| + if len(allLogs) == 0 { |
| + return &logdog.GetResponse{}, nil |
| + } |
| + |
| + // Identify the requested index. |
| + var ret []*logpb.LogEntry |
| + for i, le := range allLogs { |
| + if le.StreamIndex == uint64(req.Index) { |
| + ret = allLogs[i:] |
| + break |
| + } |
| + } |
| + count := int(req.LogCount) |
| + if count > len(ret) { |
| + count = len(ret) |
| + } |
| + return &logdog.GetResponse{ |
| + Logs: ret[:count], |
| + }, nil |
| + } |
| + |
| + Convey(`With a non-partial datagram, returns that datagram.`, func() { |
| + le, err := s.Tail(c, Complete()) |
| + So(err, ShouldBeNil) |
| + So(le.StreamIndex, ShouldEqual, 1343) |
| + So(le.GetDatagram().Partial, ShouldBeNil) |
| + So(le.GetDatagram().Data, ShouldResemble, []byte("complete")) |
| + }) |
| + |
| + Convey(`Can assemble a set of one partial datagram.`, func() { |
| + // This is weird, since this doesn't need to be partial at all, but |
| + // we should handle it gracefully. |
| + dg := tailLog.GetDatagram() |
| + dg.Partial = &logpb.Datagram_Partial{ |
| + Index: 0, |
| + Size: uint64(len(dg.Data)), |
| + Last: true, |
| + } |
| + |
| + le, err := s.Tail(c, Complete()) |
| + So(err, ShouldBeNil) |
| + So(le.StreamIndex, ShouldEqual, 1343) |
| + So(le.GetDatagram().Partial, ShouldBeNil) |
| + So(le.GetDatagram().Data, ShouldResemble, []byte("complete")) |
| + }) |
| + |
| + Convey(`Can assemble a set of two partial datagrams.`, func() { |
| + tailLog = allLogs[5] |
| + |
| + le, err := s.Tail(c, Complete()) |
| + So(err, ShouldBeNil) |
| + So(le.StreamIndex, ShouldEqual, 1341) |
| + So(le.GetDatagram().Partial, ShouldBeNil) |
| + So(le.GetDatagram().Data, ShouldResemble, []byte("quxohai")) |
| + }) |
| + |
| + Convey(`With a set of three partial datagrams.`, func() { |
| + tailLog = allLogs[3] |
| + |
| + Convey(`Will return a fully reassembled datagram.`, func() { |
| + var ls LogStream |
| + le, err := s.Tail(c, WithState(&ls), Complete()) |
| + So(err, ShouldBeNil) |
| + So(le.StreamIndex, ShouldEqual, 1337) |
| + So(le.GetDatagram().Partial, ShouldBeNil) |
| + So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye")) |
| + |
| + So(ls, ShouldResemble, LogStream{ |
| + Path: "test/+/a", |
| + Desc: logpb.LogStreamDescriptor{ |
| + Prefix: "test", |
| + Name: "a", |
| + StreamType: logpb.StreamType_DATAGRAM, |
| + }, |
| + State: StreamState{ |
| + Created: now, |
| + }, |
| + }) |
| + }) |
| + |
| + Convey(`Will return an error if the Get fails.`, func() { |
| + svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) { |
| + return nil, errors.New("test error") |
| + } |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "failed to get intermediate logs") |
|
martiniss
2016/09/22 03:40:39
test for test error in the err?
dnj
2016/09/22 16:00:05
Done. I'll have to make this function return a gRP
|
| + }) |
| + |
| + Convey(`Will return an error if the Get returns fewer logs than requested.`, func() { |
| + allLogs = allLogs[0:1] |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "incomplete intermediate logs results") |
| + }) |
| + |
| + Convey(`Will return an error if Get returns non-datagram logs.`, func() { |
| + allLogs[1].Content = nil |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "is not a datagram") |
| + }) |
| + |
| + Convey(`Will return an error if Get returns non-partial datagram logs.`, func() { |
| + allLogs[1].GetDatagram().Partial = nil |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "is not partial") |
| + }) |
| + |
| + Convey(`Will return an error if Get returns non-contiguous partial datagrams.`, func() { |
| + allLogs[1].GetDatagram().Partial.Index = 2 |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "does not have a contiguous index") |
| + }) |
| + |
| + Convey(`Will return an error if the chunks declare different sizes.`, func() { |
| + allLogs[1].GetDatagram().Partial.Size = 0 |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "inconsistent datagram size") |
| + }) |
| + |
| + Convey(`Will return an error if the reassembled length exceeds the declared size.`, func() { |
| + for _, le := range allLogs { |
| + if p := le.GetDatagram().Partial; p != nil { |
| + p.Size = 0 |
| + } |
| + } |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "appending chunk data would exceed the declared size") |
| + }) |
| + |
| + Convey(`Will return an error if the reassembled length doesn't match the declared size.`, func() { |
| + for _, le := range allLogs { |
| + if p := le.GetDatagram().Partial; p != nil { |
| + p.Size = 1024 * 1024 |
| + } |
| + } |
| + |
| + _, err := s.Tail(c, Complete()) |
| + So(err, ShouldErrLike, "differs from declared length") |
| + }) |
| + }) |
| + |
| + Convey(`When Tail returns a mid-partial datagram.`, func() { |
| + tailLog = allLogs[4] |
| + |
| + Convey(`If the previous datagram is not partial, will return it.`, func() { |
| + allLogs[3].GetDatagram().Partial = nil |
| + |
| + le, err := s.Tail(c, Complete()) |
| + So(err, ShouldBeNil) |
| + So(le.StreamIndex, ShouldEqual, 1340) |
| + So(le.GetDatagram().Partial, ShouldBeNil) |
| + So(le.GetDatagram().Data, ShouldResemble, []byte("kthxbye")) |
| + }) |
| + |
| + Convey(`If the previous datagram is partial, will return it reassembled.`, func() { |
|
martiniss
2016/09/22 03:40:39
nit: move this above the other test?
dnj
2016/09/22 16:00:05
Done.
|
| + le, err := s.Tail(c, Complete()) |
| + So(err, ShouldBeNil) |
| + So(le.StreamIndex, ShouldEqual, 1337) |
| + So(le.GetDatagram().Partial, ShouldBeNil) |
| + So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye")) |
| + }) |
| + }) |
| + }) |
| }) |
| }) |
| }) |