Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(683)

Unified Diff: logdog/client/coordinator/stream_test.go

Issue 2341113002: Update Coordinator client, add datagram assembly. (Closed)
Patch Set: Remove outdated Milo warning message. Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: logdog/client/coordinator/stream_test.go
diff --git a/logdog/client/coordinator/stream_test.go b/logdog/client/coordinator/stream_test.go
index 54ff31e2f7050b723a6bdf496fd5ec0f3bb0fc4e..8794b060cf7e98edea08947f64102e2c4c430f4f 100644
--- a/logdog/client/coordinator/stream_test.go
+++ b/logdog/client/coordinator/stream_test.go
@@ -33,6 +33,35 @@ func genLog(idx int64, id string) *logpb.LogEntry {
}
}
+func genDG(idx int64, content ...string) []*logpb.LogEntry {
+ var contentSize uint64
+ if len(content) > 1 {
+ for _, c := range content {
+ contentSize += uint64(len(c))
+ }
+ }
+
+ logs := make([]*logpb.LogEntry, len(content))
+ for i, c := range content {
+ dg := logpb.Datagram{
+ Data: []byte(c),
+ }
+ if len(content) > 1 {
+ dg.Partial = &logpb.Datagram_Partial{
+ Index: uint32(i),
+ Size: contentSize,
+ Last: (i == len(content)-1),
+ }
+ }
+
+ logs[i] = &logpb.LogEntry{
+ StreamIndex: uint64(idx + int64(i)),
+ Content: &logpb.LogEntry_Datagram{&dg},
+ }
+ }
+ return logs
+}
+
// testStreamLogsService implements just the Get and Tail endpoints,
// instrumented for testing.
type testStreamLogsService struct {
@@ -90,8 +119,6 @@ func TestStreamGet(t *testing.T) {
s := client.Stream("myproj", "test/+/a")
Convey(`Test Get`, func() {
- p := NewGetParams()
-
Convey(`A default Get query will return logs and no state.`, func() {
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
@@ -102,7 +129,7 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- l, err := s.Get(c, nil)
+ l, err := s.Get(c)
So(err, ShouldBeNil)
So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai"), genLog(1338, "kthxbye")})
@@ -114,13 +141,11 @@ func TestStreamGet(t *testing.T) {
})
Convey(`Will form a proper Get logs query.`, func() {
- p = p.NonContiguous().Index(1)
-
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{}, nil
}
- l, err := s.Get(c, p)
+ l, err := s.Get(c, NonContiguous(), Index(1))
So(err, ShouldBeNil)
So(l, ShouldBeNil)
@@ -134,8 +159,6 @@ func TestStreamGet(t *testing.T) {
})
Convey(`Will request a specific number of logs if a constraint is supplied.`, func() {
- p = p.Limit(32, 64)
-
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
Logs: []*logpb.LogEntry{
@@ -144,7 +167,7 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- l, err := s.Get(c, p)
+ l, err := s.Get(c, LimitCount(64), LimitBytes(32))
So(err, ShouldBeNil)
So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "ohai")})
@@ -158,9 +181,6 @@ func TestStreamGet(t *testing.T) {
})
Convey(`Can decode a full protobuf and state.`, func() {
- var ls LogStream
- p = p.State(&ls)
-
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
Logs: []*logpb.LogEntry{
@@ -182,17 +202,18 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- l, err := s.Get(c, p)
+ var ls LogStream
+ l, err := s.Get(c, WithState(&ls))
So(err, ShouldBeNil)
So(l, ShouldResemble, []*logpb.LogEntry{genLog(1337, "kthxbye")})
So(ls, ShouldResemble, LogStream{
Path: "test/+/a",
- Desc: &logpb.LogStreamDescriptor{
+ Desc: logpb.LogStreamDescriptor{
Prefix: "test",
Name: "a",
StreamType: logpb.StreamType_TEXT,
},
- State: &StreamState{
+ State: StreamState{
Created: now,
Archived: true,
ArchiveIndexURL: "index",
@@ -207,7 +228,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.NotFound
}
- _, err := s.Get(c, p)
+ _, err := s.Get(c)
So(err, ShouldEqual, ErrNoSuchStream)
})
@@ -216,7 +237,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.Unauthenticated
}
- _, err := s.Get(c, p)
+ _, err := s.Get(c)
So(err, ShouldEqual, ErrNoAccess)
})
@@ -225,7 +246,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.PermissionDenied
}
- _, err := s.Get(c, p)
+ _, err := s.Get(c)
So(err, ShouldEqual, ErrNoAccess)
})
})
@@ -235,6 +256,11 @@ func TestStreamGet(t *testing.T) {
svc.GH = func(*logdog.GetRequest) (*logdog.GetResponse, error) {
return &logdog.GetResponse{
Project: "myproj",
+ Desc: &logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_TEXT,
+ },
State: &logdog.LogStreamState{
Created: google.NewTimestamp(now),
},
@@ -246,7 +272,12 @@ func TestStreamGet(t *testing.T) {
So(l, ShouldResemble, &LogStream{
Project: "myproj",
Path: "test/+/a",
- State: &StreamState{
+ Desc: logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_TEXT,
+ },
+ State: StreamState{
Created: now.UTC(),
},
})
@@ -308,7 +339,7 @@ func TestStreamGet(t *testing.T) {
}
var ls LogStream
- l, err := s.Tail(c, &ls)
+ l, err := s.Tail(c, WithState(&ls))
So(err, ShouldBeNil)
// Validate the HTTP request that we made.
@@ -323,12 +354,12 @@ func TestStreamGet(t *testing.T) {
So(ls, ShouldResemble, LogStream{
Project: "myproj",
Path: "test/+/a",
- Desc: &logpb.LogStreamDescriptor{
+ Desc: logpb.LogStreamDescriptor{
Prefix: "test",
Name: "a",
StreamType: logpb.StreamType_TEXT,
},
- State: &StreamState{
+ State: StreamState{
Created: now,
},
})
@@ -350,18 +381,18 @@ func TestStreamGet(t *testing.T) {
}
var ls LogStream
- l, err := s.Tail(c, &ls)
+ l, err := s.Tail(c, WithState(&ls))
So(err, ShouldBeNil)
So(l, ShouldBeNil)
So(ls, ShouldResemble, LogStream{
Project: "myproj",
Path: "test/+/a",
- Desc: &logpb.LogStreamDescriptor{
+ Desc: logpb.LogStreamDescriptor{
Prefix: "test",
Name: "a",
StreamType: logpb.StreamType_TEXT,
},
- State: &StreamState{
+ State: StreamState{
Created: now,
},
})
@@ -380,7 +411,7 @@ func TestStreamGet(t *testing.T) {
}, nil
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldErrLike, "tail call returned 2 logs")
})
@@ -389,7 +420,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.NotFound
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldEqual, ErrNoSuchStream)
})
@@ -398,7 +429,7 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.Unauthenticated
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldEqual, ErrNoAccess)
})
@@ -407,9 +438,204 @@ func TestStreamGet(t *testing.T) {
return nil, grpcutil.PermissionDenied
}
- _, err := s.Tail(c, nil)
+ _, err := s.Tail(c)
So(err, ShouldEqual, ErrNoAccess)
})
+
+ Convey(`When requesting complete streams`, func() {
+ var allLogs []*logpb.LogEntry
+ allLogs = append(allLogs, genDG(1337, "foo", "bar", "baz", "kthxbye")...)
+ allLogs = append(allLogs, genDG(1341, "qux", "ohai")...)
+ allLogs = append(allLogs, genDG(1343, "complete")...)
+ tailLog := allLogs[len(allLogs)-1]
+
+ svc.TH = func(req *logdog.TailRequest) (*logdog.GetResponse, error) {
+ return &logdog.GetResponse{
+ Logs: []*logpb.LogEntry{tailLog},
+ State: &logdog.LogStreamState{
+ Created: google.NewTimestamp(now),
+ },
+ Desc: &logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_DATAGRAM,
+ },
+ }, nil
+ }
+
+ svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) {
+ if req.State || req.NonContiguous || req.ByteCount != 0 {
+ return nil, errors.New("not implemented in test")
+ }
+ if len(allLogs) == 0 {
+ return &logdog.GetResponse{}, nil
+ }
+
+ // Identify the requested index.
+ var ret []*logpb.LogEntry
+ for i, le := range allLogs {
+ if le.StreamIndex == uint64(req.Index) {
+ ret = allLogs[i:]
+ break
+ }
+ }
+ count := int(req.LogCount)
+ if count > len(ret) {
+ count = len(ret)
+ }
+ return &logdog.GetResponse{
+ Logs: ret[:count],
+ }, nil
+ }
+
+ Convey(`With a non-partial datagram, returns that datagram.`, func() {
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1343)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("complete"))
+ })
+
+ Convey(`Can assemble a set of one partial datagram.`, func() {
+ // This is weird, since this doesn't need to be partial at all, but
+ // we should handle it gracefully.
+ dg := tailLog.GetDatagram()
+ dg.Partial = &logpb.Datagram_Partial{
+ Index: 0,
+ Size: uint64(len(dg.Data)),
+ Last: true,
+ }
+
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1343)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("complete"))
+ })
+
+ Convey(`Can assemble a set of two partial datagrams.`, func() {
+ tailLog = allLogs[5]
+
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1341)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("quxohai"))
+ })
+
+ Convey(`With a set of three partial datagrams.`, func() {
+ tailLog = allLogs[3]
+
+ Convey(`Will return a fully reassembled datagram.`, func() {
+ var ls LogStream
+ le, err := s.Tail(c, WithState(&ls), Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1337)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye"))
+
+ So(ls, ShouldResemble, LogStream{
+ Path: "test/+/a",
+ Desc: logpb.LogStreamDescriptor{
+ Prefix: "test",
+ Name: "a",
+ StreamType: logpb.StreamType_DATAGRAM,
+ },
+ State: StreamState{
+ Created: now,
+ },
+ })
+ })
+
+ Convey(`Will return an error if the Get fails.`, func() {
+ svc.GH = func(req *logdog.GetRequest) (*logdog.GetResponse, error) {
+ return nil, errors.New("test error")
+ }
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "failed to get intermediate logs")
martiniss 2016/09/22 03:40:39 test for test error in the err?
dnj 2016/09/22 16:00:05 Done. I'll have to make this function return a gRP
+ })
+
+ Convey(`Will return an error if the Get returns fewer logs than requested.`, func() {
+ allLogs = allLogs[0:1]
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "incomplete intermediate logs results")
+ })
+
+ Convey(`Will return an error if Get returns non-datagram logs.`, func() {
+ allLogs[1].Content = nil
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "is not a datagram")
+ })
+
+ Convey(`Will return an error if Get returns non-partial datagram logs.`, func() {
+ allLogs[1].GetDatagram().Partial = nil
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "is not partial")
+ })
+
+ Convey(`Will return an error if Get returns non-contiguous partial datagrams.`, func() {
+ allLogs[1].GetDatagram().Partial.Index = 2
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "does not have a contiguous index")
+ })
+
+ Convey(`Will return an error if the chunks declare different sizes.`, func() {
+ allLogs[1].GetDatagram().Partial.Size = 0
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "inconsistent datagram size")
+ })
+
+ Convey(`Will return an error if the reassembled length exceeds the declared size.`, func() {
+ for _, le := range allLogs {
+ if p := le.GetDatagram().Partial; p != nil {
+ p.Size = 0
+ }
+ }
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "appending chunk data would exceed the declared size")
+ })
+
+ Convey(`Will return an error if the reassembled length doesn't match the declared size.`, func() {
+ for _, le := range allLogs {
+ if p := le.GetDatagram().Partial; p != nil {
+ p.Size = 1024 * 1024
+ }
+ }
+
+ _, err := s.Tail(c, Complete())
+ So(err, ShouldErrLike, "differs from declared length")
+ })
+ })
+
+ Convey(`When Tail returns a mid-partial datagram.`, func() {
+ tailLog = allLogs[4]
+
+ Convey(`If the previous datagram is not partial, will return it.`, func() {
+ allLogs[3].GetDatagram().Partial = nil
+
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1340)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("kthxbye"))
+ })
+
+ Convey(`If the previous datagram is partial, will return it reassembled.`, func() {
martiniss 2016/09/22 03:40:39 nit: move this above the other test?
dnj 2016/09/22 16:00:05 Done.
+ le, err := s.Tail(c, Complete())
+ So(err, ShouldBeNil)
+ So(le.StreamIndex, ShouldEqual, 1337)
+ So(le.GetDatagram().Partial, ShouldBeNil)
+ So(le.GetDatagram().Data, ShouldResemble, []byte("foobarbazkthxbye"))
+ })
+ })
+ })
})
})
})

Powered by Google App Engine
This is Rietveld 408576698