Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(241)

Unified Diff: server/internal/logdog/archivist/archivist_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/archivist/archivist_test.go
diff --git a/server/internal/logdog/archivist/archivist_test.go b/server/internal/logdog/archivist/archivist_test.go
index 12daac169e7ca7d70f8181b69d077782b5ac5e8d..9dccb6434aebbaeea2fc8a2218eb45efca0c95cc 100644
--- a/server/internal/logdog/archivist/archivist_test.go
+++ b/server/internal/logdog/archivist/archivist_test.go
@@ -5,7 +5,6 @@
package archivist
import (
- "errors"
"fmt"
"strings"
"sync"
@@ -15,6 +14,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
"github.com/luci/luci-go/common/logdog/types"
"github.com/luci/luci-go/common/proto/google"
@@ -225,13 +225,14 @@ func TestHandleArchive(t *testing.T) {
}
var archiveRequest *logdog.ArchiveStreamRequest
+ var archiveStreamErr error
sc := testServicesClient{
lsCallback: func(req *logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
return &stream, nil
},
asCallback: func(req *logdog.ArchiveStreamRequest) error {
archiveRequest = req
- return nil
+ return archiveStreamErr
},
}
@@ -242,10 +243,12 @@ func TestHandleArchive(t *testing.T) {
GSBase: gs.Path("gs://archive-test/path/to/archive/"), // Extra slashes to test concatenation.
}
+ expired := 10 * time.Minute
task := logdog.ArchiveTask{
- Path: stream.State.Path,
- Complete: true,
+ Path: stream.State.Path,
+ CompletePeriod: google.NewDuration(expired),
}
+ expired++ // This represents a time PAST CompletePeriod.
gsURL := func(p string) string {
return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s", desc.Path(), p)
@@ -274,60 +277,53 @@ func TestHandleArchive(t *testing.T) {
sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
return nil, errors.New("does not exist")
}
- So(ar.Archive(c, &task), ShouldErrLike, "does not exist")
+ So(ar.Archive(c, &task, 0), ShouldErrLike, "does not exist")
})
Convey(`Will refrain from archiving if the stream is already archived.`, func() {
stream.State.Archived = true
- So(ar.Archive(c, &task), ShouldBeNil)
+ So(ar.Archive(c, &task, 0), ShouldBeNil)
So(archiveRequest, ShouldBeNil)
})
Convey(`Will refrain from archiving if the stream is purged.`, func() {
stream.State.Purged = true
- So(ar.Archive(c, &task), ShouldBeNil)
+ So(ar.Archive(c, &task, 0), ShouldBeNil)
So(archiveRequest, ShouldBeNil)
})
// Weird case: the log has been marked for archival, has not been
- // terminated, and is within its completeness delay. This task will not
- // have been dispatched by our archive cron, but let's assert that it
- // behaves correctly regardless.
- Convey(`Will succeed if the log stream had no entries and no terminal index.`, func() {
- So(ar.Archive(c, &task), ShouldBeNil)
-
- So(hasStreams(true, true, false), ShouldBeTrue)
- So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: task.Path,
- Complete: true,
- TerminalIndex: -1,
-
- StreamUrl: gsURL("logstream.entries"),
- IndexUrl: gsURL("logstream.index"),
- DataUrl: gsURL("data.bin"),
- })
+ // terminated, and is within its completeness delay. This task should not
+ // have been dispatched by our expired archive cron, but let's assert that
+ // it behaves correctly regardless.
+ Convey(`Will refuse to archive a complete steram with no terminal index.`, func() {
+ err := ar.Archive(c, &task, 0)
+ So(err, ShouldErrLike, "cannot archive complete stream with no terminal index")
+ So(errors.IsTransient(err), ShouldBeTrue)
})
Convey(`With terminal index "3"`, func() {
stream.State.TerminalIndex = 3
- Convey(`Will fail if the log stream had a terminal index and no entries.`, func() {
- So(ar.Archive(c, &task), ShouldErrLike, "stream finished short of terminal index")
+ Convey(`Will fail transiently if the log stream had no entries.`, func() {
+ err := ar.Archive(c, &task, 0)
+ So(err, ShouldErrLike, "stream has missing entries")
+ So(errors.IsTransient(err), ShouldBeTrue)
})
Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`, func() {
addTestEntry(0, 1, 2, 4)
- So(ar.Archive(c, &task), ShouldErrLike, "non-contiguous log stream")
+ So(ar.Archive(c, &task, 0), ShouldErrLike, "stream has missing entries")
})
Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopping at the terminal index.`, func() {
addTestEntry(0, 1, 2, 3, 4)
- So(ar.Archive(c, &task), ShouldBeNil)
+ So(ar.Archive(c, &task, 0), ShouldBeNil)
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
Path: task.Path,
- Complete: true,
+ LogEntryCount: 4,
TerminalIndex: 3,
StreamUrl: gsURL("logstream.entries"),
@@ -335,19 +331,53 @@ func TestHandleArchive(t *testing.T) {
DataUrl: gsURL("data.bin"),
})
})
+
+ Convey(`When a transient archival error occurs, will return it.`, func() {
+ gsc.newWriterErr = func(*testGSWriter) error { return errors.WrapTransient(errors.New("test error")) }
+
+ err := ar.Archive(c, &task, 0)
+ So(err, ShouldErrLike, "test error")
+ So(errors.IsTransient(err), ShouldBeTrue)
+ })
+
+ Convey(`When a non-transient archival error occurs, will report it`, func() {
+ gsc.newWriterErr = func(*testGSWriter) error { return errors.New("test error") }
+
+ Convey(`If remote returns an error, forwards a transient error.`, func() {
+ archiveStreamErr = errors.New("test error")
+
+ err := ar.Archive(c, &task, 0)
+ So(err, ShouldErrLike, "failed to report archive state")
+ So(errors.IsTransient(err), ShouldBeTrue)
+
+ So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Path: task.Path,
+ Error: true,
+ TerminalIndex: -1,
+ })
+ })
+
+ Convey(`If remote returns success, returns nil.`, func() {
+ So(ar.Archive(c, &task, 0), ShouldBeNil)
+ So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Path: task.Path,
+ Error: true,
+ TerminalIndex: -1,
+ })
+ })
+ })
})
Convey(`When not enforcing stream completeness`, func() {
- task.Complete = false
Convey(`With no terminal index`, func() {
Convey(`Will successfully archive if there are no entries.`, func() {
- So(ar.Archive(c, &task), ShouldBeNil)
+ So(ar.Archive(c, &task, expired), ShouldBeNil)
So(hasStreams(true, true, false), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
Path: task.Path,
- Complete: true,
+ LogEntryCount: 0,
TerminalIndex: -1,
StreamUrl: gsURL("logstream.entries"),
@@ -358,12 +388,12 @@ func TestHandleArchive(t *testing.T) {
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index.`, func() {
addTestEntry(0, 1, 2, 4)
- So(ar.Archive(c, &task), ShouldBeNil)
+ So(ar.Archive(c, &task, expired), ShouldBeNil)
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
Path: task.Path,
- Complete: false,
+ LogEntryCount: 4,
TerminalIndex: 4,
StreamUrl: gsURL("logstream.entries"),
@@ -377,12 +407,12 @@ func TestHandleArchive(t *testing.T) {
stream.State.TerminalIndex = 3
Convey(`Will successfully archive if there are no entries.`, func() {
- So(ar.Archive(c, &task), ShouldBeNil)
+ So(ar.Archive(c, &task, expired), ShouldBeNil)
So(hasStreams(true, true, false), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
Path: task.Path,
- Complete: true,
+ LogEntryCount: 0,
TerminalIndex: -1,
StreamUrl: gsURL("logstream.entries"),
@@ -393,12 +423,12 @@ func TestHandleArchive(t *testing.T) {
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index to 2.`, func() {
addTestEntry(0, 1, 2, 4)
- So(ar.Archive(c, &task), ShouldBeNil)
+ So(ar.Archive(c, &task, expired), ShouldBeNil)
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
Path: task.Path,
- Complete: false,
+ LogEntryCount: 3,
TerminalIndex: 2,
StreamUrl: gsURL("logstream.entries"),
@@ -416,10 +446,11 @@ func TestHandleArchive(t *testing.T) {
for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} {
for _, testCase := range []struct {
- name string
- setup func()
+ name string
+ transient bool
+ setup func()
}{
- {"delete failure", func() {
+ {"delete failure", false, func() {
gsc.deleteErr = func(b, p string) error {
if strings.HasSuffix(p, failName) {
return errors.New("test error")
@@ -428,7 +459,7 @@ func TestHandleArchive(t *testing.T) {
}
}},
- {"writer create failure", func() {
+ {"writer create failure", false, func() {
gsc.newWriterErr = func(w *testGSWriter) error {
if strings.HasSuffix(w.path, failName) {
return errors.New("test error")
@@ -437,7 +468,7 @@ func TestHandleArchive(t *testing.T) {
}
}},
- {"write failure", func() {
+ {"write failure", false, func() {
gsc.newWriterErr = func(w *testGSWriter) error {
if strings.HasSuffix(w.path, failName) {
w.writeErr = errors.New("test error")
@@ -446,7 +477,7 @@ func TestHandleArchive(t *testing.T) {
}
}},
- {"close failure", func() {
+ {"close failure", false, func() {
gsc.newWriterErr = func(w *testGSWriter) error {
if strings.HasSuffix(w.path, failName) {
w.closeErr = errors.New("test error")
@@ -455,7 +486,7 @@ func TestHandleArchive(t *testing.T) {
}
}},
- {"delete on fail failure (double-failure)", func() {
+ {"delete on fail failure (double-failure)", false, func() {
failed := false
// Simulate a write failure. This is the error that will actually
@@ -483,9 +514,21 @@ func TestHandleArchive(t *testing.T) {
}
}},
} {
- Convey(fmt.Sprintf(`Can handle %s for %s`, testCase.name, failName), func() {
+ Convey(fmt.Sprintf(`Can handle %s for %s, and will not archive.`, testCase.name, failName), func() {
testCase.setup()
- So(ar.Archive(c, &task), ShouldErrLike, "test error")
+
+ err := ar.Archive(c, &task, 0)
+ if testCase.transient {
+ // Transient errors are directly returned without consulting the
+ // Coordinator service.
+ So(err, ShouldErrLike, "test error")
+ So(errors.IsTransient(err), ShouldBeTrue)
+ So(archiveRequest, ShouldBeNil)
+ } else {
+ So(err, ShouldBeNil)
+ So(archiveRequest, ShouldNotBeNil)
+ So(archiveRequest.Error, ShouldBeTrue)
+ }
})
}
}

Powered by Google App Engine
This is Rietveld 408576698