| Index: logdog/client/coordinator/stream_test.go
|
| diff --git a/logdog/client/coordinator/stream_test.go b/logdog/client/coordinator/stream_test.go
|
| index 54ff31e2f7050b723a6bdf496fd5ec0f3bb0fc4e..3fe8792df23cf759d221facee252d7154ccd1671 100644
|
| --- a/logdog/client/coordinator/stream_test.go
|
| +++ b/logdog/client/coordinator/stream_test.go
|
| @@ -14,7 +14,9 @@ import (
|
| "github.com/luci/luci-go/grpc/grpcutil"
|
| "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
|
| "github.com/luci/luci-go/logdog/api/logpb"
|
| +
|
| "golang.org/x/net/context"
|
| + "google.golang.org/grpc/codes"
|
|
|
| . "github.com/luci/luci-go/common/testing/assertions"
|
| . "github.com/smartystreets/goconvey/convey"
|
| @@ -33,6 +35,35 @@ func genLog(idx int64, id string) *logpb.LogEntry {
|
| }
|
| }
|
|
|
| +func genDG(idx int64, content ...string) []*logpb.LogEntry {
|
| + var contentSize uint64
|
| + if len(content) > 1 {
|
| + for _, c := range content {
|
| + contentSize += uint64(len(c))
|
| + }
|
| + }
|
| +
|
| + logs := make([]*logpb.LogEntry, len(content))
|
| + for i, c := range content {
|
| + dg := logpb.Datagram{
|
| + Data: []byte(c),
|
| + }
|
| + if len(content) > 1 {
|
| + dg.Partial = &logpb.Datagram_Partial{
|
| + Index: uint32(i),
|
| + Size: contentSize,
|
| + Last: (i == len(content)-1),
|
| + }
|
| + }
|
| +
|
| + logs[i] = &logpb.LogEntry{
|
| + StreamIndex: uint64(idx + int64(i)),
|
| + Content: &logpb.LogEntry_Datagram{&dg},
|
| + }
|
| + }
|
| + return logs
|
| +}
|
| +
|
| // testStreamLogsService implements just the Get and Tail endpoints,
|
| // instrumented for testing.
|
| type testStreamLogsService struct {
|
| @@ -90,8 +121,6 @@ func TestStreamGet(t *testing.T) {
|
| s := client.Stream("myproj", "test/+/a")
|
|
|
| Convey(`Test Get`, func() {
|
| - p := NewGetParams()
|
| -
|
| Convey(`A default Get query will return logs and no state.`, func() {
|
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
|
| return &logdog.GetResponse{
|
| @@ -102,7 +131,7 @@ func TestStreamGet(t *testing.T) {
|
| }, nil
|
| }
|
|
|
| - l, err := s.Get(c, nil)
|
| + l, err := s.Get(c)
|
| So(err, ShouldBeNil)
|
| So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai"), genLog(1338, "kthxbye")})
|
|
|
| @@ -114,13 +143,11 @@ func TestStreamGet(t *testing.T) {
|
| })
|
|
|
| Convey(`Will form a proper Get logs query.`, func() {
|
| - p = p.NonContiguous().Index(1)
|
| -
|
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
|
| return &logdog.GetResponse{}, nil
|
| }
|
|
|
| - l, err := s.Get(c, p)
|
| + l, err := s.Get(c, NonContiguous(), Index(1))
|
| So(err, ShouldBeNil)
|
| So(l, ShouldBeNil)
|
|
|
| @@ -134,8 +161,6 @@ func TestStreamGet(t *testing.T) {
|
| })
|
|
|
| Convey(`Will request a specific number of logs if a constraint is supplied.`, func() {
|
| - p = p.Limit(32, 64)
|
| -
|
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
|
| return &logdog.GetResponse{
|
| Logs: []*logpb.LogEntry{
|
| @@ -144,7 +169,7 @@ func TestStreamGet(t *testing.T) {
|
| }, nil
|
| }
|
|
|
| - l, err := s.Get(c, p)
|
| + l, err := s.Get(c, LimitCount(64), LimitBytes(32))
|
| So(err, ShouldBeNil)
|
| So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai")})
|
|
|
| @@ -158,9 +183,6 @@ func TestStreamGet(t *testing.T) {
|
| })
|
|
|
| Convey(`Can decode a full protobuf and state.`, func() {
|
| - var ls LogStream
|
| - p = p.State(&ls)
|
| -
|
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
|
| return &logdog.GetResponse{
|
| Logs: []*logpb.LogEntry{
|
| @@ -182,17 +204,18 @@ func TestStreamGet(t *testing.T) {
|
| }, nil
|
| }
|
|
|
| - l, err := s.Get(c, p)
|
| + var ls LogStream
|
| + l, err := s.Get(c, WithState(&ls))
|
| So(err, ShouldBeNil)
|
| So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "kthxbye")})
|
| So(ls, ShouldResemble, LogStream{
|
| Path: "test/+/a",
|
| - Desc: &logpb.LogStreamDescriptor{
|
| + Desc: logpb.LogStreamDescriptor{
|
| Prefix: "test",
|
| Name: "a",
|
| StreamType: logpb.StreamType_TEXT,
|
| },
|
| - State: &StreamState{
|
| + State: StreamState{
|
| Created: now,
|
| Archived: true,
|
| ArchiveIndexURL: "index",
|
| @@ -207,7 +230,7 @@ func TestStreamGet(t *testing.T) {
|
| return nil, grpcutil.NotFound
|
| }
|
|
|
| - _, err := s.Get(c, p)
|
| + _, err := s.Get(c)
|
| So(err, ShouldEqual, ErrNoSuchStream)
|
| })
|
|
|
| @@ -216,7 +239,7 @@ func TestStreamGet(t *testing.T) {
|
| return nil, grpcutil.Unauthenticated
|
| }
|
|
|
| - _, err := s.Get(c, p)
|
| + _, err := s.Get(c)
|
| So(err, ShouldEqual, ErrNoAccess)
|
| })
|
|
|
| @@ -225,7 +248,7 @@ func TestStreamGet(t *testing.T) {
|
| return nil, grpcutil.PermissionDenied
|
| }
|
|
|
| - _, err := s.Get(c, p)
|
| + _, err := s.Get(c)
|
| So(err, ShouldEqual, ErrNoAccess)
|
| })
|
| })
|
| @@ -235,6 +258,11 @@ func TestStreamGet(t *testing.T) {
|
| svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
|
| return &logdog.GetResponse{
|
| Project: "myproj",
|
| + Desc: &logpb.LogStreamDescriptor{
|
| + Prefix: "test",
|
| + Name: "a",
|
| + StreamType: logpb.StreamType_TEXT,
|
| + },
|
| State: &logdog.LogStreamState{
|
| Created: google.NewTimestamp(now),
|
| },
|
| @@ -246,7 +274,12 @@ func TestStreamGet(t *testing.T) {
|
| So(l, ShouldResemble, &LogStream{
|
| Project: "myproj",
|
| Path: "test/+/a",
|
| - State: &StreamState{
|
| + Desc: logpb.LogStreamDescriptor{
|
| + Prefix: "test",
|
| + Name: "a",
|
| + StreamType: logpb.StreamType_TEXT,
|
| + },
|
| + State: StreamState{
|
| Created: now.UTC(),
|
| },
|
| })
|
| @@ -308,7 +341,7 @@ func TestStreamGet(t *testing.T) {
|
| }
|
|
|
| var ls LogStream
|
| - l, err := s.Tail(c, &ls)
|
| + l, err := s.Tail(c, WithState(&ls))
|
| So(err, ShouldBeNil)
|
|
|
| // Validate the HTTP request that we made.
|
| @@ -323,12 +356,12 @@ func TestStreamGet(t *testing.T) {
|
| So(ls, ShouldResemble, LogStream{
|
| Project: "myproj",
|
| Path: "test/+/a",
|
| - Desc: &logpb.LogStreamDescriptor{
|
| + Desc: logpb.LogStreamDescriptor{
|
| Prefix: "test",
|
| Name: "a",
|
| StreamType: logpb.StreamType_TEXT,
|
| },
|
| - State: &StreamState{
|
| + State: StreamState{
|
| Created: now,
|
| },
|
| })
|
| @@ -350,18 +383,18 @@ func TestStreamGet(t *testing.T) {
|
| }
|
|
|
| var ls LogStream
|
| - l, err := s.Tail(c, &ls)
|
| + l, err := s.Tail(c, WithState(&ls))
|
| So(err, ShouldBeNil)
|
| So(l, ShouldBeNil)
|
| So(ls, ShouldResemble, LogStream{
|
| Project: "myproj",
|
| Path: "test/+/a",
|
| - Desc: &logpb.LogStreamDescriptor{
|
| + Desc: logpb.LogStreamDescriptor{
|
| Prefix: "test",
|
| Name: "a",
|
| StreamType: logpb.StreamType_TEXT,
|
| },
|
| - State: &StreamState{
|
| + State: StreamState{
|
| Created: now,
|
| },
|
| })
|
| @@ -380,7 +413,7 @@ func TestStreamGet(t *testing.T) {
|
| }, nil
|
| }
|
|
|
| - _, err := s.Tail(c, nil)
|
| + _, err := s.Tail(c)
|
| So(err, ShouldErrLike, "tail call returned 2 logs")
|
| })
|
|
|
| @@ -389,7 +422,7 @@ func TestStreamGet(t *testing.T) {
|
| return nil, grpcutil.NotFound
|
| }
|
|
|
| - _, err := s.Tail(c, nil)
|
| + _, err := s.Tail(c)
|
| So(err, ShouldEqual, ErrNoSuchStream)
|
| })
|
|
|
| @@ -398,7 +431,7 @@ func TestStreamGet(t *testing.T) {
|
| return nil, grpcutil.Unauthenticated
|
| }
|
|
|
| - _, err := s.Tail(c, nil)
|
| + _, err := s.Tail(c)
|
| So(err, ShouldEqual, ErrNoAccess)
|
| })
|
|
|
| @@ -407,9 +440,205 @@ func TestStreamGet(t *testing.T) {
|
| return nil, grpcutil.PermissionDenied
|
| }
|
|
|
| - _, err := s.Tail(c, nil)
|
| + _, err := s.Tail(c)
|
| So(err, ShouldEqual, ErrNoAccess)
|
| })
|
| +
|
| + Convey(`When requesting complete streams`, func() {
|
| + var allLogs []*logpb.LogEntry
|
| + allLogs = append(allLogs, genDG(1337, "foo", "bar", "baz", "kthxbye")...)
|
| + allLogs = append(allLogs, genDG(1341, "qux", "ohai")...)
|
| + allLogs = append(allLogs, genDG(1343, "complete")...)
|
| + tailLog := allLogs[len(allLogs)-1]
|
| +
|
| + svc.TH = func(req *logdog.TailRequest) (*logdog.GetResponse, error) {
|
| + return &logdog.GetResponse{
|
| + Logs: []*logpb.LogEntry{tailLog},
|
| + State: &logdog.LogStreamState{
|
| + Created: google.NewTimestamp(now),
|
| + },
|
| + Desc: &logpb.LogStreamDescriptor{
|
| + Prefix: "test",
|
| + Name: "a",
|
| + StreamType: logpb.StreamType_DATAGRAM,
|
| + },
|
| + }, nil
|
| + }
|
| +
|
| + svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) {
|
| + if req.State || req.NonContiguous || req.ByteCount != 0 {
|
| + return nil, errors.New("not implemented in test")
|
| + }
|
| + if len(allLogs) == 0 {
|
| + return &logdog.GetResponse{}, nil
|
| + }
|
| +
|
| + // Identify the requested index.
|
| + var ret []*logpb.LogEntry
|
| + for i, le := range allLogs {
|
| + if le.StreamIndex == uint64(req.Index) {
|
| + ret = allLogs[i:]
|
| + break
|
| + }
|
| + }
|
| + count := int(req.LogCount)
|
| + if count > len(ret) {
|
| + count = len(ret)
|
| + }
|
| + return &logdog.GetResponse{
|
| + Logs: ret[:count],
|
| + }, nil
|
| + }
|
| +
|
| + Convey(`With a non-partial datagram, returns that datagram.`, func() {
|
| + le, err := s.Tail(c, Complete())
|
| + So(err, ShouldBeNil)
|
| + So(le.StreamIndex, ShouldEqual, 1343)
|
| + So(le.GetDatagram().Partial, ShouldBeNil)
|
| + So(le.GetDatagram().Data, ShouldResemble, []byte("complete"))
|
| + })
|
| +
|
| + Convey(`Can assemble a set of one partial datagram.`, func() {
|
| + // This is weird, since this doesn't need to be partial at all, but
|
| + // we should handle it gracefully.
|
| + dg := tailLog.GetDatagram()
|
| + dg.Partial = &logpb.Datagram_Partial{
|
| + Index: 0,
|
| + Size: uint64(len(dg.Data)),
|
| + Last: true,
|
| + }
|
| +
|
| + le, err := s.Tail(c, Complete())
|
| + So(err, ShouldBeNil)
|
| + So(le.StreamIndex, ShouldEqual, 1343)
|
| + So(le.GetDatagram().Partial, ShouldBeNil)
|
| + So(le.GetDatagram().Data, ShouldResemble, []byte("complete"))
|
| + })
|
| +
|
| + Convey(`Can assemble a set of two partial datagrams.`, func() {
|
| + tailLog = allLogs[5]
|
| +
|
| + le, err := s.Tail(c, Complete())
|
| + So(err, ShouldBeNil)
|
| + So(le.StreamIndex, ShouldEqual, 1341)
|
| + So(le.GetDatagram().Partial, ShouldBeNil)
|
| + So(le.GetDatagram().Data, ShouldResemble, []byte("quxohai"))
|
| + })
|
| +
|
| + Convey(`With a set of three partial datagrams.`, func() {
|
| + tailLog = allLogs[3]
|
| +
|
| + Convey(`Will return a fully reassembled datagram.`, func() {
|
| + var ls LogStream
|
| + le, err := s.Tail(c, WithState(&ls), Complete())
|
| + So(err, ShouldBeNil)
|
| + So(le.StreamIndex, ShouldEqual, 1337)
|
| + So(le.GetDatagram().Partial, ShouldBeNil)
|
| + So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye"))
|
| +
|
| + So(ls, ShouldResemble, LogStream{
|
| + Path: "test/+/a",
|
| + Desc: logpb.LogStreamDescriptor{
|
| + Prefix: "test",
|
| + Name: "a",
|
| + StreamType: logpb.StreamType_DATAGRAM,
|
| + },
|
| + State: StreamState{
|
| + Created: now,
|
| + },
|
| + })
|
| + })
|
| +
|
| + Convey(`Will return an error if the Get fails.`, func() {
|
| + svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) {
|
| + return nil, grpcutil.Errf(codes.InvalidArgument, "test error")
|
| + }
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "failed to get intermediate logs")
|
| + So(err, ShouldErrLike, "test error")
|
| + })
|
| +
|
| + Convey(`Will return an error if the Get returns fewer logs than requested.`, func() {
|
| + allLogs = allLogs[0:1]
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "incomplete intermediate logs results")
|
| + })
|
| +
|
| + Convey(`Will return an error if Get returns non-datagram logs.`, func() {
|
| + allLogs[1].Content = nil
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "is not a datagram")
|
| + })
|
| +
|
| + Convey(`Will return an error if Get returns non-partial datagram logs.`, func() {
|
| + allLogs[1].GetDatagram().Partial = nil
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "is not partial")
|
| + })
|
| +
|
| + Convey(`Will return an error if Get returns non-contiguous partial datagrams.`, func() {
|
| + allLogs[1].GetDatagram().Partial.Index = 2
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "does not have a contiguous index")
|
| + })
|
| +
|
| + Convey(`Will return an error if the chunks declare different sizes.`, func() {
|
| + allLogs[1].GetDatagram().Partial.Size = 0
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "inconsistent datagram size")
|
| + })
|
| +
|
| + Convey(`Will return an error if the reassembled length exceeds the declared size.`, func() {
|
| + for _, le := range allLogs {
|
| + if p := le.GetDatagram().Partial; p != nil {
|
| + p.Size = 0
|
| + }
|
| + }
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "appending chunk data would exceed the declared size")
|
| + })
|
| +
|
| + Convey(`Will return an error if the reassembled length doesn't match the declared size.`, func() {
|
| + for _, le := range allLogs {
|
| + if p := le.GetDatagram().Partial; p != nil {
|
| + p.Size = 1024 * 1024
|
| + }
|
| + }
|
| +
|
| + _, err := s.Tail(c, Complete())
|
| + So(err, ShouldErrLike, "differs from declared length")
|
| + })
|
| + })
|
| +
|
| + Convey(`When Tail returns a mid-partial datagram.`, func() {
|
| + tailLog = allLogs[4]
|
| +
|
| + Convey(`If the previous datagram is partial, will return it reassembled.`, func() {
|
| + le, err := s.Tail(c, Complete())
|
| + So(err, ShouldBeNil)
|
| + So(le.StreamIndex, ShouldEqual, 1337)
|
| + So(le.GetDatagram().Partial, ShouldBeNil)
|
| + So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye"))
|
| + })
|
| +
|
| + Convey(`If the previous datagram is not partial, will return it.`, func() {
|
| + allLogs[3].GetDatagram().Partial = nil
|
| +
|
| + le, err := s.Tail(c, Complete())
|
| + So(err, ShouldBeNil)
|
| + So(le.StreamIndex, ShouldEqual, 1340)
|
| + So(le.GetDatagram().Partial, ShouldBeNil)
|
| + So(le.GetDatagram().Data, ShouldResemble, []byte("kthxbye"))
|
| + })
|
| + })
|
| + })
|
| })
|
| })
|
| })
|
|
|