Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(644)

Unified Diff: server/internal/logdog/collector/collector_test.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/internal/logdog/collector/collector.go ('k') | server/internal/logdog/collector/utils_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/collector/collector_test.go
diff --git a/server/internal/logdog/collector/collector_test.go b/server/internal/logdog/collector/collector_test.go
index e23b5a862041301d21875c6eeab32434b2f6fb58..eb1e8e0e32e7931d41899bc10c52ac37c05f77d0 100644
--- a/server/internal/logdog/collector/collector_test.go
+++ b/server/internal/logdog/collector/collector_test.go
@@ -19,6 +19,7 @@ import (
"github.com/luci/luci-go/server/logdog/storage/memory"
"golang.org/x/net/context"
+ . "github.com/luci/luci-go/common/testing/assertions"
. "github.com/smartystreets/goconvey/convey"
)
@@ -106,12 +107,12 @@ func TestCollector(t *testing.T) {
So(errors.IsTransient(err), ShouldBeFalse)
})
- Convey(`Will return a transient error if an error happened on storage.`, func() {
+ Convey(`Will return a transient error if a transient error happened on storage.`, func() {
// Single transient error.
count := int32(0)
st.err = func() error {
if atomic.AddInt32(&count, 1) == 1 {
- return errors.New("test error")
+ return errors.WrapTransient(errors.New("test error"))
}
return nil
}
@@ -123,20 +124,21 @@ func TestCollector(t *testing.T) {
})
Convey(`Will drop invalid LogStreamDescriptor bundle entries and process the valid ones.`, func() {
- be := bb.genBundleEntry("foo/+/trash", 1337, 4, 6, 7, 8)
- be.Desc.ContentType = "" // Missing ContentType => invalid.
-
- bb.addStreamEntries("foo/+/trash", -1, 0, 1, 2, 3, 5)
+ be := bb.genBundleEntry("foo/+/trash", 1337, 4, 5, 6, 7, 8)
bb.addBundleEntry(be)
+
+ bb.addStreamEntries("foo/+/trash", 0, 1, 3) // Invalid: non-contiguous
bb.addFullStream("foo/+/bar", 32)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldNotBeNil)
+ So(errors.IsTransient(err), ShouldBeFalse)
So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 32)
So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 31})
- So(tcc, shouldHaveRegisteredStream, "foo/+/trash", -1)
- So(st, shouldHaveStoredStream, "foo/+/trash", 0, 1, 2, 3, 5)
+ So(tcc, shouldHaveRegisteredStream, "foo/+/trash", 1337)
+ So(st, shouldHaveStoredStream, "foo/+/trash", 4, 5, 6, 7, 8)
})
Convey(`Will drop streams with missing secrets.`, func() {
@@ -144,8 +146,10 @@ func TestCollector(t *testing.T) {
be.Secret = nil
bb.addBundleEntry(be)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
- So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar", 127)
+ err := coll.Process(c, bb.bundle())
+ So(err, ShouldErrLike, "missing stream secret")
+ So(errors.IsTransient(err), ShouldBeFalse)
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
})
Convey(`Will drop messages with mismatching secrets.`, func() {
@@ -182,32 +186,6 @@ func TestCollector(t *testing.T) {
So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
})
- Convey(`Will drop records beyond a local terminal index.`, func() {
- bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- bb.addStreamEntries("foo/+/bar", 3, 3, 5)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
- So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
- })
-
- Convey(`Will not ingest records beyond a remote terminal index.`, func() {
- tcc.register(coordinator.LogStreamState{
- Path: "foo/+/bar",
- Secret: testSecret,
- TerminalIndex: 3,
- })
-
- bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2)
- bb.addStreamEntries("foo/+/bar", 3, 3, 5)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
-
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 3)
- So(st, shouldHaveStoredStream, "foo/+/bar", indexRange{0, 3})
- })
-
Convey(`Will not ingest records if the stream is archived.`, func() {
tcc.register(coordinator.LogStreamState{
Path: "foo/+/bar",
@@ -216,7 +194,7 @@ func TestCollector(t *testing.T) {
Archived: true,
})
- bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 4)
+ bb.addStreamEntries("foo/+/bar", 3, 0, 1, 2, 3, 4)
So(coll.Process(c, bb.bundle()), ShouldBeNil)
So(tcc, shouldHaveRegisteredStream, "foo/+/bar", -1)
@@ -242,7 +220,7 @@ func TestCollector(t *testing.T) {
})
Convey(`Will not ingest a bundle whose log entries don't match their descriptor.`, func() {
- be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 3, 4)
+ be := bb.genBundleEntry("foo/+/bar", 4, 0, 1, 2, 3, 4)
// Add a binary log entry. This does NOT match the text descriptor, and
// should fail validation.
@@ -256,10 +234,9 @@ func TestCollector(t *testing.T) {
},
})
bb.addBundleEntry(be)
- So(coll.Process(c, bb.bundle()), ShouldBeNil)
+ So(coll.Process(c, bb.bundle()), ShouldErrLike, "invalid log entry")
- So(tcc, shouldHaveRegisteredStream, "foo/+/bar", 4)
- So(st, shouldHaveStoredStream, "foo/+/bar", 0, 1, 3, 4)
+ So(tcc, shouldNotHaveRegisteredStream, "foo/+/bar")
})
})
}
« no previous file with comments | « server/internal/logdog/collector/collector.go ('k') | server/internal/logdog/collector/utils_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698