| Index: server/internal/logdog/archivist/archivist_test.go
|
| diff --git a/server/internal/logdog/archivist/archivist_test.go b/server/internal/logdog/archivist/archivist_test.go
|
| index 12daac169e7ca7d70f8181b69d077782b5ac5e8d..9dccb6434aebbaeea2fc8a2218eb45efca0c95cc 100644
|
| --- a/server/internal/logdog/archivist/archivist_test.go
|
| +++ b/server/internal/logdog/archivist/archivist_test.go
|
| @@ -5,7 +5,6 @@
|
| package archivist
|
|
|
| import (
|
| - "errors"
|
| "fmt"
|
| "strings"
|
| "sync"
|
| @@ -15,6 +14,7 @@ import (
|
| "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| "github.com/luci/luci-go/common/proto/google"
|
| @@ -225,13 +225,14 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
|
|
| var archiveRequest *logdog.ArchiveStreamRequest
|
| + var archiveStreamErr error
|
| sc := testServicesClient{
|
| lsCallback: func(req *logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
|
| return &stream, nil
|
| },
|
| asCallback: func(req *logdog.ArchiveStreamRequest) error {
|
| archiveRequest = req
|
| - return nil
|
| + return archiveStreamErr
|
| },
|
| }
|
|
|
| @@ -242,10 +243,12 @@ func TestHandleArchive(t *testing.T) {
|
| GSBase: gs.Path("gs://archive-test/path/to/archive/"), // Extra slashes to test concatenation.
|
| }
|
|
|
| + expired := 10 * time.Minute
|
| task := logdog.ArchiveTask{
|
| - Path: stream.State.Path,
|
| - Complete: true,
|
| + Path: stream.State.Path,
|
| + CompletePeriod: google.NewDuration(expired),
|
| }
|
| + expired++ // This represents a time PAST CompletePeriod.
|
|
|
| gsURL := func(p string) string {
|
| return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s", desc.Path(), p)
|
| @@ -274,60 +277,53 @@ func TestHandleArchive(t *testing.T) {
|
| sc.lsCallback = func(*logdog.LoadStreamRequest) (*logdog.LoadStreamResponse, error) {
|
| return nil, errors.New("does not exist")
|
| }
|
| - So(ar.Archive(c, &task), ShouldErrLike, "does not exist")
|
| + So(ar.Archive(c, &task, 0), ShouldErrLike, "does not exist")
|
| })
|
|
|
| Convey(`Will refrain from archiving if the stream is already archived.`, func() {
|
| stream.State.Archived = true
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + So(ar.Archive(c, &task, 0), ShouldBeNil)
|
| So(archiveRequest, ShouldBeNil)
|
| })
|
|
|
| Convey(`Will refrain from archiving if the stream is purged.`, func() {
|
| stream.State.Purged = true
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + So(ar.Archive(c, &task, 0), ShouldBeNil)
|
| So(archiveRequest, ShouldBeNil)
|
| })
|
|
|
| // Weird case: the log has been marked for archival, has not been
|
| - // terminated, and is within its completeness delay. This task will not
|
| - // have been dispatched by our archive cron, but let's assert that it
|
| - // behaves correctly regardless.
|
| - Convey(`Will succeed if the log stream had no entries and no terminal index.`, func() {
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| -
|
| - So(hasStreams(true, true, false), ShouldBeTrue)
|
| - So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: task.Path,
|
| - Complete: true,
|
| - TerminalIndex: -1,
|
| -
|
| - StreamUrl: gsURL("logstream.entries"),
|
| - IndexUrl: gsURL("logstream.index"),
|
| - DataUrl: gsURL("data.bin"),
|
| - })
|
| + // terminated, and is within its completeness delay. This task should not
|
| + // have been dispatched by our expired archive cron, but let's assert that
|
| + // it behaves correctly regardless.
|
| + Convey(`Will refuse to archive a complete steram with no terminal index.`, func() {
|
| + err := ar.Archive(c, &task, 0)
|
| + So(err, ShouldErrLike, "cannot archive complete stream with no terminal index")
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| })
|
|
|
| Convey(`With terminal index "3"`, func() {
|
| stream.State.TerminalIndex = 3
|
|
|
| - Convey(`Will fail if the log stream had a terminal index and no entries.`, func() {
|
| - So(ar.Archive(c, &task), ShouldErrLike, "stream finished short of terminal index")
|
| + Convey(`Will fail transiently if the log stream had no entries.`, func() {
|
| + err := ar.Archive(c, &task, 0)
|
| + So(err, ShouldErrLike, "stream has missing entries")
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| })
|
|
|
| Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`, func() {
|
| addTestEntry(0, 1, 2, 4)
|
| - So(ar.Archive(c, &task), ShouldErrLike, "non-contiguous log stream")
|
| + So(ar.Archive(c, &task, 0), ShouldErrLike, "stream has missing entries")
|
| })
|
|
|
| Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopping at the terminal index.`, func() {
|
| addTestEntry(0, 1, 2, 3, 4)
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + So(ar.Archive(c, &task, 0), ShouldBeNil)
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| Path: task.Path,
|
| - Complete: true,
|
| + LogEntryCount: 4,
|
| TerminalIndex: 3,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -335,19 +331,53 @@ func TestHandleArchive(t *testing.T) {
|
| DataUrl: gsURL("data.bin"),
|
| })
|
| })
|
| +
|
| + Convey(`When a transient archival error occurs, will return it.`, func() {
|
| + gsc.newWriterErr = func(*testGSWriter) error { return errors.WrapTransient(errors.New("test error")) }
|
| +
|
| + err := ar.Archive(c, &task, 0)
|
| + So(err, ShouldErrLike, "test error")
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| + })
|
| +
|
| + Convey(`When a non-transient archival error occurs, will report it`, func() {
|
| + gsc.newWriterErr = func(*testGSWriter) error { return errors.New("test error") }
|
| +
|
| + Convey(`If remote returns an error, forwards a transient error.`, func() {
|
| + archiveStreamErr = errors.New("test error")
|
| +
|
| + err := ar.Archive(c, &task, 0)
|
| + So(err, ShouldErrLike, "failed to report archive state")
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| +
|
| + So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Path: task.Path,
|
| + Error: true,
|
| + TerminalIndex: -1,
|
| + })
|
| + })
|
| +
|
| + Convey(`If remote returns success, returns nil.`, func() {
|
| + So(ar.Archive(c, &task, 0), ShouldBeNil)
|
| + So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Path: task.Path,
|
| + Error: true,
|
| + TerminalIndex: -1,
|
| + })
|
| + })
|
| + })
|
| })
|
|
|
| Convey(`When not enforcing stream completeness`, func() {
|
| - task.Complete = false
|
|
|
| Convey(`With no terminal index`, func() {
|
| Convey(`Will successfully archive if there are no entries.`, func() {
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + So(ar.Archive(c, &task, expired), ShouldBeNil)
|
|
|
| So(hasStreams(true, true, false), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| Path: task.Path,
|
| - Complete: true,
|
| + LogEntryCount: 0,
|
| TerminalIndex: -1,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -358,12 +388,12 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index.`, func() {
|
| addTestEntry(0, 1, 2, 4)
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + So(ar.Archive(c, &task, expired), ShouldBeNil)
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| Path: task.Path,
|
| - Complete: false,
|
| + LogEntryCount: 4,
|
| TerminalIndex: 4,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -377,12 +407,12 @@ func TestHandleArchive(t *testing.T) {
|
| stream.State.TerminalIndex = 3
|
|
|
| Convey(`Will successfully archive if there are no entries.`, func() {
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + So(ar.Archive(c, &task, expired), ShouldBeNil)
|
|
|
| So(hasStreams(true, true, false), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| Path: task.Path,
|
| - Complete: true,
|
| + LogEntryCount: 0,
|
| TerminalIndex: -1,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -393,12 +423,12 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index to 2.`, func() {
|
| addTestEntry(0, 1, 2, 4)
|
| - So(ar.Archive(c, &task), ShouldBeNil)
|
| + So(ar.Archive(c, &task, expired), ShouldBeNil)
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| Path: task.Path,
|
| - Complete: false,
|
| + LogEntryCount: 3,
|
| TerminalIndex: 2,
|
|
|
| StreamUrl: gsURL("logstream.entries"),
|
| @@ -416,10 +446,11 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} {
|
| for _, testCase := range []struct {
|
| - name string
|
| - setup func()
|
| + name string
|
| + transient bool
|
| + setup func()
|
| }{
|
| - {"delete failure", func() {
|
| + {"delete failure", false, func() {
|
| gsc.deleteErr = func(b, p string) error {
|
| if strings.HasSuffix(p, failName) {
|
| return errors.New("test error")
|
| @@ -428,7 +459,7 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
| }},
|
|
|
| - {"writer create failure", func() {
|
| + {"writer create failure", false, func() {
|
| gsc.newWriterErr = func(w *testGSWriter) error {
|
| if strings.HasSuffix(w.path, failName) {
|
| return errors.New("test error")
|
| @@ -437,7 +468,7 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
| }},
|
|
|
| - {"write failure", func() {
|
| + {"write failure", false, func() {
|
| gsc.newWriterErr = func(w *testGSWriter) error {
|
| if strings.HasSuffix(w.path, failName) {
|
| w.writeErr = errors.New("test error")
|
| @@ -446,7 +477,7 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
| }},
|
|
|
| - {"close failure", func() {
|
| + {"close failure", false, func() {
|
| gsc.newWriterErr = func(w *testGSWriter) error {
|
| if strings.HasSuffix(w.path, failName) {
|
| w.closeErr = errors.New("test error")
|
| @@ -455,7 +486,7 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
| }},
|
|
|
| - {"delete on fail failure (double-failure)", func() {
|
| + {"delete on fail failure (double-failure)", false, func() {
|
| failed := false
|
|
|
| // Simulate a write failure. This is the error that will actually
|
| @@ -483,9 +514,21 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
| }},
|
| } {
|
| - Convey(fmt.Sprintf(`Can handle %s for %s`, testCase.name, failName), func() {
|
| + Convey(fmt.Sprintf(`Can handle %s for %s, and will not archive.`, testCase.name, failName), func() {
|
| testCase.setup()
|
| - So(ar.Archive(c, &task), ShouldErrLike, "test error")
|
| +
|
| + err := ar.Archive(c, &task, 0)
|
| + if testCase.transient {
|
| + // Transient errors are directly returned without consulting the
|
| + // Coordinator service.
|
| + So(err, ShouldErrLike, "test error")
|
| + So(errors.IsTransient(err), ShouldBeTrue)
|
| + So(archiveRequest, ShouldBeNil)
|
| + } else {
|
| + So(err, ShouldBeNil)
|
| + So(archiveRequest, ShouldNotBeNil)
|
| + So(archiveRequest.Error, ShouldBeTrue)
|
| + }
|
| })
|
| }
|
| }
|
|
|