Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(801)

Unified Diff: logdog/client/coordinator/stream_test.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Comments. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/client/coordinator/stream_params.go ('k') | milo/appengine/logdog/build.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/client/coordinator/stream_test.go
diff --git a/logdog/client/coordinator/stream_test.go b/logdog/client/coordinator/stream_test.go
index 54ff31e2f7050b723a6bdf496fd5ec0f3bb0fc4e..3fe8792df23cf759d221facee252d7154ccd1671 100644
--- a/logdog/client/coordinator/stream_test.go
+++ b/logdog/client/coordinator/stream_test.go
@@ -14,7 +14,9 @@ import (
"github.com/luci/luci-go/grpc/grpcutil"
"github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
"github.com/luci/luci-go/logdog/api/logpb"
+
"golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
. "github.com/luci/luci-go/common/testing/assertions"
. "github.com/smartystreets/goconvey/convey"
@@ -33,6 +35,35 @@ func genLog(idx int64, id string) *logpb.LogEntry {
}
}
+func genDG(idx int64, content ...string) []*logpb.LogEntry {
+ var contentSize uint64
+ if len(content) > 1 {
+ for _, c := range content {
+ contentSize += uint64(len(c))
+ }
+ }
+
+ logs := make([]*logpb.LogEntry, len(content))
+ for i, c := range content {
+ dg := logpb.Datagram{
+ Data: []byte(c),
+ }
+ if len(content) > 1 {
+ dg.Partial = &logpb.Datagram_Partial{
+ Index: uint32(i),
+ Size: contentSize,
+ Last: (i == len(content)-1),
+ }
+ }
+
+ logs[i] = &logpb.LogEntry{
+ StreamIndex: uint64(idx + int64(i)),
+ Content: &logpb.LogEntry_Datagram{&dg},
+ }
+ }
+ return logs
+}
+
// testStreamLogsService implements just the Get and Tail endpoints,
// instrumented for testing.
type testStreamLogsService struct {
@@ -90,8 +121,6 @@ func TestStreamGet(t *testing.T) {
s := client.Stream("myproj", "test/+/a")
Convey(`Test Get`, func() {
- p := NewGetParams()
-
Convey(`A default Get query will return logs and no state.`, func() {
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
@@ -102,7 +131,7 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- l, err := s.Get(c, nil)
+ l, err := s.Get(c)
So(err, ShouldBeNil)
So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai"), genLog(1338, "kthxbye")})
@@ -114,13 +143,11 @@ func TestStreamGet(t *testing.T) {
})
Convey(`Will form a proper Get logs query.`, func() {
- p = p.NonContiguous().Index(1)
-
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{}, nil
}
- l, err := s.Get(c, p)
+ l, err := s.Get(c, NonContiguous(), Index(1))
So(err, ShouldBeNil)
So(l, ShouldBeNil)
@@ -134,8 +161,6 @@ func TestStreamGet(t *testing.T) {
})
Convey(`Will request a specific number of logs if a constraint is supplied.`, func() {
- p = p.Limit(32, 64)
-
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
Logs: []*logpb.LogEntry{
@@ -144,7 +169,7 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- l, err := s.Get(c, p)
+ l, err := s.Get(c, LimitCount(64), LimitBytes(32))
So(err, ShouldBeNil)
So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai")})
@@ -158,9 +183,6 @@ func TestStreamGet(t *testing.T) {
})
Convey(`Can decode a full protobuf and state.`, func() {
- var ls LogStream
- p = p.State(&ls)
-
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
Logs: []*logpb.LogEntry{
@@ -182,17 +204,18 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- l, err := s.Get(c, p)
+ var ls LogStream
+ l, err := s.Get(c, WithState(&ls))
So(err, ShouldBeNil)
So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "kthxbye")})
So(ls, ShouldResemble, LogStream{
Path: "test/+/a",
- Desc: &logpb.LogStreamDescriptor{
+ Desc: logpb.LogStreamDescriptor{
Prefix: "test",
Name: "a",
StreamType: logpb.StreamType_TEXT,
},
- State: &StreamState{
+ State: StreamState{
Created: now,
Archived: true,
ArchiveIndexURL: "index",
@@ -207,7 +230,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.NotFound
}
- _, err := s.Get(c, p)
+ _, err := s.Get(c)
So(err, ShouldEqual, ErrNoSuchStream)
})
@@ -216,7 +239,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.Unauthenticated
}
- _, err := s.Get(c, p)
+ _, err := s.Get(c)
So(err, ShouldEqual, ErrNoAccess)
})
@@ -225,7 +248,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.PermissionDenied
}
- _, err := s.Get(c, p)
+ _, err := s.Get(c)
So(err, ShouldEqual, ErrNoAccess)
})
})
@@ -235,6 +258,11 @@ func TestStreamGet(t *testing.T) {
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
Project: "myproj",
+ Desc: &logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_TEXT,
+ },
State: &logdog.LogStreamState{
Created: google.NewTimestamp(now),
},
@@ -246,7 +274,12 @@ func TestStreamGet(t *testing.T) {
So(l, ShouldResemble, &LogStream{
Project: "myproj",
Path: "test/+/a",
- State: &StreamState{
+ Desc: logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_TEXT,
+ },
+ State: StreamState{
Created: now.UTC(),
},
})
@@ -308,7 +341,7 @@ func TestStreamGet(t *testing.T) {
}
var ls LogStream
- l, err := s.Tail(c, &ls)
+ l, err := s.Tail(c, WithState(&ls))
So(err, ShouldBeNil)
// Validate the HTTP request that we made.
@@ -323,12 +356,12 @@ func TestStreamGet(t *testing.T) {
So(ls, ShouldResemble, LogStream{
Project: "myproj",
Path: "test/+/a",
- Desc: &logpb.LogStreamDescriptor{
+ Desc: logpb.LogStreamDescriptor{
Prefix: "test",
Name: "a",
StreamType: logpb.StreamType_TEXT,
},
- State: &StreamState{
+ State: StreamState{
Created: now,
},
})
@@ -350,18 +383,18 @@ func TestStreamGet(t *testing.T) {
}
var ls LogStream
- l, err := s.Tail(c, &ls)
+ l, err := s.Tail(c, WithState(&ls))
So(err, ShouldBeNil)
So(l, ShouldBeNil)
So(ls, ShouldResemble, LogStream{
Project: "myproj",
Path: "test/+/a",
- Desc: &logpb.LogStreamDescriptor{
+ Desc: logpb.LogStreamDescriptor{
Prefix: "test",
Name: "a",
StreamType: logpb.StreamType_TEXT,
},
- State: &StreamState{
+ State: StreamState{
Created: now,
},
})
@@ -380,7 +413,7 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldErrLike, "tail call returned 2 logs")
})
@@ -389,7 +422,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.NotFound
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldEqual, ErrNoSuchStream)
})
@@ -398,7 +431,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.Unauthenticated
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldEqual, ErrNoAccess)
})
@@ -407,9 +440,205 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.PermissionDenied
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldEqual, ErrNoAccess)
})
+
+ Convey(`When requesting complete streams`, func() {
+ var allLogs []*logpb.LogEntry
+ allLogs = append(allLogs, genDG(1337, "foo", "bar", "baz", "kthxbye")...)
+ allLogs = append(allLogs, genDG(1341, "qux", "ohai")...)
+ allLogs = append(allLogs, genDG(1343, "complete")...)
+ tailLog := allLogs[len(allLogs)-1]
+
+ svc.TH = func(req *logdog.TailRequest) (*logdog.GetResponse, error) {
+ return &logdog.GetResponse{
+ Logs: []*logpb.LogEntry{tailLog},
+ State: &logdog.LogStreamState{
+ Created: google.NewTimestamp(now),
+ },
+ Desc: &logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_DATAGRAM,
+ },
+ }, nil
+ }
+
+ svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) {
+ if req.State || req.NonContiguous || req.ByteCount != 0 {
+ return nil, errors.New("not implemented in test")
+ }
+ if len(allLogs) == 0 {
+ return &logdog.GetResponse{}, nil
+ }
+
+ // Identify the requested index.
+ var ret []*logpb.LogEntry
+ for i, le := range allLogs {
+ if le.StreamIndex == uint64(req.Index) {
+ ret = allLogs[i:]
+ break
+ }
+ }
+ count := int(req.LogCount)
+ if count > len(ret) {
+ count = len(ret)
+ }
+ return &logdog.GetResponse{
+ Logs: ret[:count],
+ }, nil
+ }
+
+ Convey(`With a non-partial datagram, returns that datagram.`, func() {
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1343)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("complete"))
+ })
+
+ Convey(`Can assemble a set of one partial datagram.`, func() {
+ // This is weird, since this doesn't need to be partial at all, but
+ // we should handle it gracefully.
+ dg := tailLog.GetDatagram()
+ dg.Partial = &logpb.Datagram_Partial{
+ Index: 0,
+ Size: uint64(len(dg.Data)),
+ Last: true,
+ }
+
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1343)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("complete"))
+ })
+
+ Convey(`Can assemble a set of two partial datagrams.`, func() {
+ tailLog = allLogs[5]
+
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1341)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("quxohai"))
+ })
+
+ Convey(`With a set of three partial datagrams.`, func() {
+ tailLog = allLogs[3]
+
+ Convey(`Will return a fully reassembled datagram.`, func() {
+ var ls LogStream
+ le, err := s.Tail(c, WithState(&ls), Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1337)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye"))
+
+ So(ls, ShouldResemble, LogStream{
+ Path: "test/+/a",
+ Desc: logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_DATAGRAM,
+ },
+ State: StreamState{
+ Created: now,
+ },
+ })
+ })
+
+ Convey(`Will return an error if the Get fails.`, func() {
+ svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) {
+ return nil, grpcutil.Errf(codes.InvalidArgument, "test error")
+ }
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "failed to get intermediate logs")
+ So(err, ShouldErrLike, "test error")
+ })
+
+ Convey(`Will return an error if the Get returns fewer logs than requested.`, func() {
+ allLogs = allLogs[0:1]
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "incomplete intermediate logs results")
+ })
+
+ Convey(`Will return an error if Get returns non-datagram logs.`, func() {
+ allLogs[1].Content = nil
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "is not a datagram")
+ })
+
+ Convey(`Will return an error if Get returns non-partial datagram logs.`, func() {
+ allLogs[1].GetDatagram().Partial = nil
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "is not partial")
+ })
+
+ Convey(`Will return an error if Get returns non-contiguous partial datagrams.`, func() {
+ allLogs[1].GetDatagram().Partial.Index = 2
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "does not have a contiguous index")
+ })
+
+ Convey(`Will return an error if the chunks declare different sizes.`, func() {
+ allLogs[1].GetDatagram().Partial.Size = 0
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "inconsistent datagram size")
+ })
+
+ Convey(`Will return an error if the reassembled length exceeds the declared size.`, func() {
+ for _, le := range allLogs {
+ if p := le.GetDatagram().Partial; p != nil {
+ p.Size = 0
+ }
+ }
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "appending chunk data would exceed the declared size")
+ })
+
+ Convey(`Will return an error if the reassembled length doesn't match the declared size.`, func() {
+ for _, le := range allLogs {
+ if p := le.GetDatagram().Partial; p != nil {
+ p.Size = 1024 * 1024
+ }
+ }
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "differs from declared length")
+ })
+ })
+
+ Convey(`When Tail returns a mid-partial datagram.`, func() {
+ tailLog = allLogs[4]
+
+ Convey(`If the previous datagram is partial, will return it reassembled.`, func() {
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1337)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye"))
+ })
+
+ Convey(`If the previous datagram is not partial, will return it.`, func() {
+ allLogs[3].GetDatagram().Partial = nil
+
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1340)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("kthxbye"))
+ })
+ })
+ })
})
})
})
« no previous file with comments | « logdog/client/coordinator/stream_params.go ('k') | milo/appengine/logdog/build.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698