| Index: server/internal/logdog/archivist/archivist_test.go
|
| diff --git a/server/internal/logdog/archivist/archivist_test.go b/server/internal/logdog/archivist/archivist_test.go
|
| index d038ced77f5113d1fe4c1b014d94619a0be366b6..399b878535575e62495e87c0e8dc3822b3778756 100644
|
| --- a/server/internal/logdog/archivist/archivist_test.go
|
| +++ b/server/internal/logdog/archivist/archivist_test.go
|
| @@ -14,6 +14,7 @@ import (
|
| "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| @@ -202,6 +203,7 @@ func TestHandleArchive(t *testing.T) {
|
| gsc := testGSClient{}
|
|
|
| // Set up our test log stream.
|
| + project := "test-project"
|
| desc := logpb.LogStreamDescriptor{
|
| Prefix: "testing",
|
| Name: "foo",
|
| @@ -213,7 +215,7 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
|
|
| // Utility function to add a log entry for "ls".
|
| - addTestEntry := func(idxs ...int) {
|
| + addTestEntry := func(p string, idxs ...int) {
|
| for _, v := range idxs {
|
| le := logpb.LogEntry{
|
| PrefixIndex: uint64(v),
|
| @@ -234,9 +236,10 @@ func TestHandleArchive(t *testing.T) {
|
| }
|
|
|
| err = st.Put(storage.PutRequest{
|
| - Path: desc.Path(),
|
| - Index: types.MessageIndex(v),
|
| - Values: [][]byte{d},
|
| + Project: config.ProjectName(p),
|
| + Path: desc.Path(),
|
| + Index: types.MessageIndex(v),
|
| + Values: [][]byte{d},
|
| })
|
| if err != nil {
|
| panic(err)
|
| @@ -250,6 +253,7 @@ func TestHandleArchive(t *testing.T) {
|
| // Set up our testing archival task.
|
| expired := 10 * time.Minute
|
| archiveTask := logdog.ArchiveTask{
|
| + Project: project,
|
| Path: string(desc.Path()),
|
| SettleDelay: google.NewDuration(10 * time.Second),
|
| CompletePeriod: google.NewDuration(expired),
|
| @@ -297,8 +301,8 @@ func TestHandleArchive(t *testing.T) {
|
| GSStagingBase: gs.Path("gs://archive-test-staging/path/to/archive/"), // Extra slashes to test concatenation.
|
| }
|
|
|
| - gsURL := func(p string) string {
|
| - return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s", desc.Path(), p)
|
| + gsURL := func(project, name string) string {
|
| + return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s/%s", project, desc.Path(), name)
|
| }
|
|
|
| // hasStreams can be called to check that the retained archiveRequest had
|
| @@ -398,7 +402,7 @@ func TestHandleArchive(t *testing.T) {
|
| })
|
|
|
| Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`, func() {
|
| - addTestEntry(0, 1, 2, 4)
|
| + addTestEntry(project, 0, 1, 2, 4)
|
|
|
| ack, err := ar.archiveTaskImpl(c, task)
|
| So(err, ShouldErrLike, "missing log entry")
|
| @@ -406,7 +410,7 @@ func TestHandleArchive(t *testing.T) {
|
| })
|
|
|
| Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopping at the terminal index.`, func() {
|
| - addTestEntry(0, 1, 2, 3, 4)
|
| + addTestEntry(project, 0, 1, 2, 3, 4)
|
|
|
| ack, err := ar.archiveTaskImpl(c, task)
|
| So(err, ShouldBeNil)
|
| @@ -414,18 +418,19 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Project: project,
|
| Path: archiveTask.Path,
|
| LogEntryCount: 4,
|
| TerminalIndex: 3,
|
|
|
| - StreamUrl: gsURL("logstream.entries"),
|
| - IndexUrl: gsURL("logstream.index"),
|
| - DataUrl: gsURL("data.bin"),
|
| + StreamUrl: gsURL(project, "logstream.entries"),
|
| + IndexUrl: gsURL(project, "logstream.index"),
|
| + DataUrl: gsURL(project, "data.bin"),
|
| })
|
| })
|
|
|
| Convey(`When a transient archival error occurs, will not ACK it.`, func() {
|
| - addTestEntry(0, 1, 2, 3, 4)
|
| + addTestEntry(project, 0, 1, 2, 3, 4)
|
| gsc.newWriterErr = func(*testGSWriter) error { return errors.WrapTransient(errors.New("test error")) }
|
|
|
| ack, err := ar.archiveTaskImpl(c, task)
|
| @@ -434,7 +439,7 @@ func TestHandleArchive(t *testing.T) {
|
| })
|
|
|
| Convey(`When a non-transient archival error occurs`, func() {
|
| - addTestEntry(0, 1, 2, 3, 4)
|
| + addTestEntry(project, 0, 1, 2, 3, 4)
|
| archiveErr := errors.New("archive failure error")
|
| gsc.newWriterErr = func(*testGSWriter) error { return archiveErr }
|
|
|
| @@ -446,8 +451,9 @@ func TestHandleArchive(t *testing.T) {
|
| So(ack, ShouldBeFalse)
|
|
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: archiveTask.Path,
|
| - Error: "archive failure error",
|
| + Project: project,
|
| + Path: archiveTask.Path,
|
| + Error: "archive failure error",
|
| })
|
| })
|
|
|
| @@ -456,8 +462,9 @@ func TestHandleArchive(t *testing.T) {
|
| So(err, ShouldBeNil)
|
| So(ack, ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: archiveTask.Path,
|
| - Error: "archive failure error",
|
| + Project: project,
|
| + Path: archiveTask.Path,
|
| + Error: "archive failure error",
|
| })
|
| })
|
|
|
| @@ -468,8 +475,9 @@ func TestHandleArchive(t *testing.T) {
|
| So(err, ShouldBeNil)
|
| So(ack, ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| - Path: archiveTask.Path,
|
| - Error: "archival error",
|
| + Project: project,
|
| + Path: archiveTask.Path,
|
| + Error: "archival error",
|
| })
|
| })
|
| })
|
| @@ -486,18 +494,19 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| So(hasStreams(true, true, false), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Project: project,
|
| Path: archiveTask.Path,
|
| LogEntryCount: 0,
|
| TerminalIndex: -1,
|
|
|
| - StreamUrl: gsURL("logstream.entries"),
|
| - IndexUrl: gsURL("logstream.index"),
|
| - DataUrl: gsURL("data.bin"),
|
| + StreamUrl: gsURL(project, "logstream.entries"),
|
| + IndexUrl: gsURL(project, "logstream.index"),
|
| + DataUrl: gsURL(project, "data.bin"),
|
| })
|
| })
|
|
|
| Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index.`, func() {
|
| - addTestEntry(0, 1, 2, 4)
|
| + addTestEntry(project, 0, 1, 2, 4)
|
|
|
| ack, err := ar.archiveTaskImpl(c, task)
|
| So(err, ShouldBeNil)
|
| @@ -505,13 +514,14 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Project: project,
|
| Path: archiveTask.Path,
|
| LogEntryCount: 4,
|
| TerminalIndex: 4,
|
|
|
| - StreamUrl: gsURL("logstream.entries"),
|
| - IndexUrl: gsURL("logstream.index"),
|
| - DataUrl: gsURL("data.bin"),
|
| + StreamUrl: gsURL(project, "logstream.entries"),
|
| + IndexUrl: gsURL(project, "logstream.index"),
|
| + DataUrl: gsURL(project, "data.bin"),
|
| })
|
| })
|
| })
|
| @@ -526,18 +536,19 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| So(hasStreams(true, true, false), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Project: project,
|
| Path: archiveTask.Path,
|
| LogEntryCount: 0,
|
| TerminalIndex: -1,
|
|
|
| - StreamUrl: gsURL("logstream.entries"),
|
| - IndexUrl: gsURL("logstream.index"),
|
| - DataUrl: gsURL("data.bin"),
|
| + StreamUrl: gsURL(project, "logstream.entries"),
|
| + IndexUrl: gsURL(project, "logstream.index"),
|
| + DataUrl: gsURL(project, "data.bin"),
|
| })
|
| })
|
|
|
| Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index to 2.`, func() {
|
| - addTestEntry(0, 1, 2, 4)
|
| + addTestEntry(project, 0, 1, 2, 4)
|
|
|
| ack, err := ar.archiveTaskImpl(c, task)
|
| So(err, ShouldBeNil)
|
| @@ -545,22 +556,48 @@ func TestHandleArchive(t *testing.T) {
|
|
|
| So(hasStreams(true, true, true), ShouldBeTrue)
|
| So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Project: project,
|
| Path: archiveTask.Path,
|
| LogEntryCount: 3,
|
| TerminalIndex: 2,
|
|
|
| - StreamUrl: gsURL("logstream.entries"),
|
| - IndexUrl: gsURL("logstream.index"),
|
| - DataUrl: gsURL("data.bin"),
|
| + StreamUrl: gsURL(project, "logstream.entries"),
|
| + IndexUrl: gsURL(project, "logstream.index"),
|
| + DataUrl: gsURL(project, "data.bin"),
|
| })
|
| })
|
| })
|
| })
|
|
|
| + Convey(`With an empty project`, func() {
|
| + archiveTask.Project = ""
|
| +
|
| + Convey(`Will successfully archive {0, 1, 2, 3} with terminal index 3 using "_" for project archive path.`, func() {
|
| + stream.State.TerminalIndex = 3
|
| + addTestEntry("", 0, 1, 2, 3)
|
| +
|
| + ack, err := ar.archiveTaskImpl(c, task)
|
| + So(err, ShouldBeNil)
|
| + So(ack, ShouldBeTrue)
|
| +
|
| + So(hasStreams(true, true, true), ShouldBeTrue)
|
| + So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
|
| + Project: "",
|
| + Path: archiveTask.Path,
|
| + LogEntryCount: 4,
|
| + TerminalIndex: 3,
|
| +
|
| + StreamUrl: gsURL("_", "logstream.entries"),
|
| + IndexUrl: gsURL("_", "logstream.index"),
|
| + DataUrl: gsURL("_", "data.bin"),
|
| + })
|
| + })
|
| + })
|
| +
|
| // Simulate failures during the various stream generation operations.
|
| Convey(`Stream generation failures`, func() {
|
| stream.State.TerminalIndex = 3
|
| - addTestEntry(0, 1, 2, 3)
|
| + addTestEntry(project, 0, 1, 2, 3)
|
|
|
| for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} {
|
| for _, testCase := range []struct {
|
|
|