Index: server/internal/logdog/archivist/archivist_test.go |
diff --git a/server/internal/logdog/archivist/archivist_test.go b/server/internal/logdog/archivist/archivist_test.go |
index d038ced77f5113d1fe4c1b014d94619a0be366b6..399b878535575e62495e87c0e8dc3822b3778756 100644 |
--- a/server/internal/logdog/archivist/archivist_test.go |
+++ b/server/internal/logdog/archivist/archivist_test.go |
@@ -14,6 +14,7 @@ import ( |
"github.com/golang/protobuf/proto" |
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
"github.com/luci/luci-go/common/clock/testclock" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/errors" |
"github.com/luci/luci-go/common/gcloud/gs" |
"github.com/luci/luci-go/common/logdog/types" |
@@ -202,6 +203,7 @@ func TestHandleArchive(t *testing.T) { |
gsc := testGSClient{} |
// Set up our test log stream. |
+ project := "test-project" |
desc := logpb.LogStreamDescriptor{ |
Prefix: "testing", |
Name: "foo", |
@@ -213,7 +215,7 @@ func TestHandleArchive(t *testing.T) { |
} |
// Utility function to add a log entry for "ls". |
- addTestEntry := func(idxs ...int) { |
+ addTestEntry := func(p string, idxs ...int) { |
for _, v := range idxs { |
le := logpb.LogEntry{ |
PrefixIndex: uint64(v), |
@@ -234,9 +236,10 @@ func TestHandleArchive(t *testing.T) { |
} |
err = st.Put(storage.PutRequest{ |
- Path: desc.Path(), |
- Index: types.MessageIndex(v), |
- Values: [][]byte{d}, |
+ Project: config.ProjectName(p), |
+ Path: desc.Path(), |
+ Index: types.MessageIndex(v), |
+ Values: [][]byte{d}, |
}) |
if err != nil { |
panic(err) |
@@ -250,6 +253,7 @@ func TestHandleArchive(t *testing.T) { |
// Set up our testing archival task. |
expired := 10 * time.Minute |
archiveTask := logdog.ArchiveTask{ |
+ Project: project, |
Path: string(desc.Path()), |
SettleDelay: google.NewDuration(10 * time.Second), |
CompletePeriod: google.NewDuration(expired), |
@@ -297,8 +301,8 @@ func TestHandleArchive(t *testing.T) { |
GSStagingBase: gs.Path("gs://archive-test-staging/path/to/archive/"), // Extra slashes to test concatenation. |
} |
- gsURL := func(p string) string { |
- return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s", desc.Path(), p) |
+ gsURL := func(project, name string) string { |
+ return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s/%s", project, desc.Path(), name) |
} |
// hasStreams can be called to check that the retained archiveRequest had |
@@ -398,7 +402,7 @@ func TestHandleArchive(t *testing.T) { |
}) |
Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`, func() { |
- addTestEntry(0, 1, 2, 4) |
+ addTestEntry(project, 0, 1, 2, 4) |
ack, err := ar.archiveTaskImpl(c, task) |
So(err, ShouldErrLike, "missing log entry") |
@@ -406,7 +410,7 @@ func TestHandleArchive(t *testing.T) { |
}) |
Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopping at the terminal index.`, func() { |
- addTestEntry(0, 1, 2, 3, 4) |
+ addTestEntry(project, 0, 1, 2, 3, 4) |
ack, err := ar.archiveTaskImpl(c, task) |
So(err, ShouldBeNil) |
@@ -414,18 +418,19 @@ func TestHandleArchive(t *testing.T) { |
So(hasStreams(true, true, true), ShouldBeTrue) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
+ Project: project, |
Path: archiveTask.Path, |
LogEntryCount: 4, |
TerminalIndex: 3, |
- StreamUrl: gsURL("logstream.entries"), |
- IndexUrl: gsURL("logstream.index"), |
- DataUrl: gsURL("data.bin"), |
+ StreamUrl: gsURL(project, "logstream.entries"), |
+ IndexUrl: gsURL(project, "logstream.index"), |
+ DataUrl: gsURL(project, "data.bin"), |
}) |
}) |
Convey(`When a transient archival error occurs, will not ACK it.`, func() { |
- addTestEntry(0, 1, 2, 3, 4) |
+ addTestEntry(project, 0, 1, 2, 3, 4) |
gsc.newWriterErr = func(*testGSWriter) error { return errors.WrapTransient(errors.New("test error")) } |
ack, err := ar.archiveTaskImpl(c, task) |
@@ -434,7 +439,7 @@ func TestHandleArchive(t *testing.T) { |
}) |
Convey(`When a non-transient archival error occurs`, func() { |
- addTestEntry(0, 1, 2, 3, 4) |
+ addTestEntry(project, 0, 1, 2, 3, 4) |
archiveErr := errors.New("archive failure error") |
gsc.newWriterErr = func(*testGSWriter) error { return archiveErr } |
@@ -446,8 +451,9 @@ func TestHandleArchive(t *testing.T) { |
So(ack, ShouldBeFalse) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
- Path: archiveTask.Path, |
- Error: "archive failure error", |
+ Project: project, |
+ Path: archiveTask.Path, |
+ Error: "archive failure error", |
}) |
}) |
@@ -456,8 +462,9 @@ func TestHandleArchive(t *testing.T) { |
So(err, ShouldBeNil) |
So(ack, ShouldBeTrue) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
- Path: archiveTask.Path, |
- Error: "archive failure error", |
+ Project: project, |
+ Path: archiveTask.Path, |
+ Error: "archive failure error", |
}) |
}) |
@@ -468,8 +475,9 @@ func TestHandleArchive(t *testing.T) { |
So(err, ShouldBeNil) |
So(ack, ShouldBeTrue) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
- Path: archiveTask.Path, |
- Error: "archival error", |
+ Project: project, |
+ Path: archiveTask.Path, |
+ Error: "archival error", |
}) |
}) |
}) |
@@ -486,18 +494,19 @@ func TestHandleArchive(t *testing.T) { |
So(hasStreams(true, true, false), ShouldBeTrue) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
+ Project: project, |
Path: archiveTask.Path, |
LogEntryCount: 0, |
TerminalIndex: -1, |
- StreamUrl: gsURL("logstream.entries"), |
- IndexUrl: gsURL("logstream.index"), |
- DataUrl: gsURL("data.bin"), |
+ StreamUrl: gsURL(project, "logstream.entries"), |
+ IndexUrl: gsURL(project, "logstream.index"), |
+ DataUrl: gsURL(project, "data.bin"), |
}) |
}) |
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index.`, func() { |
- addTestEntry(0, 1, 2, 4) |
+ addTestEntry(project, 0, 1, 2, 4) |
ack, err := ar.archiveTaskImpl(c, task) |
So(err, ShouldBeNil) |
@@ -505,13 +514,14 @@ func TestHandleArchive(t *testing.T) { |
So(hasStreams(true, true, true), ShouldBeTrue) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
+ Project: project, |
Path: archiveTask.Path, |
LogEntryCount: 4, |
TerminalIndex: 4, |
- StreamUrl: gsURL("logstream.entries"), |
- IndexUrl: gsURL("logstream.index"), |
- DataUrl: gsURL("data.bin"), |
+ StreamUrl: gsURL(project, "logstream.entries"), |
+ IndexUrl: gsURL(project, "logstream.index"), |
+ DataUrl: gsURL(project, "data.bin"), |
}) |
}) |
}) |
@@ -526,18 +536,19 @@ func TestHandleArchive(t *testing.T) { |
So(hasStreams(true, true, false), ShouldBeTrue) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
+ Project: project, |
Path: archiveTask.Path, |
LogEntryCount: 0, |
TerminalIndex: -1, |
- StreamUrl: gsURL("logstream.entries"), |
- IndexUrl: gsURL("logstream.index"), |
- DataUrl: gsURL("data.bin"), |
+ StreamUrl: gsURL(project, "logstream.entries"), |
+ IndexUrl: gsURL(project, "logstream.index"), |
+ DataUrl: gsURL(project, "data.bin"), |
}) |
}) |
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index to 2.`, func() { |
- addTestEntry(0, 1, 2, 4) |
+ addTestEntry(project, 0, 1, 2, 4) |
ack, err := ar.archiveTaskImpl(c, task) |
So(err, ShouldBeNil) |
@@ -545,22 +556,48 @@ func TestHandleArchive(t *testing.T) { |
So(hasStreams(true, true, true), ShouldBeTrue) |
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
+ Project: project, |
Path: archiveTask.Path, |
LogEntryCount: 3, |
TerminalIndex: 2, |
- StreamUrl: gsURL("logstream.entries"), |
- IndexUrl: gsURL("logstream.index"), |
- DataUrl: gsURL("data.bin"), |
+ StreamUrl: gsURL(project, "logstream.entries"), |
+ IndexUrl: gsURL(project, "logstream.index"), |
+ DataUrl: gsURL(project, "data.bin"), |
}) |
}) |
}) |
}) |
+ Convey(`With an empty project`, func() { |
+ archiveTask.Project = "" |
+ |
+ Convey(`Will successfully archive {0, 1, 2, 3} with terminal index 3 using "_" for project archive path.`, func() { |
+ stream.State.TerminalIndex = 3 |
+ addTestEntry("", 0, 1, 2, 3) |
+ |
+ ack, err := ar.archiveTaskImpl(c, task) |
+ So(err, ShouldBeNil) |
+ So(ack, ShouldBeTrue) |
+ |
+ So(hasStreams(true, true, true), ShouldBeTrue) |
+ So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{ |
+ Project: "", |
+ Path: archiveTask.Path, |
+ LogEntryCount: 4, |
+ TerminalIndex: 3, |
+ |
+ StreamUrl: gsURL("_", "logstream.entries"), |
+ IndexUrl: gsURL("_", "logstream.index"), |
+ DataUrl: gsURL("_", "data.bin"), |
+ }) |
+ }) |
+ }) |
+ |
// Simulate failures during the various stream generation operations. |
Convey(`Stream generation failures`, func() { |
stream.State.TerminalIndex = 3 |
- addTestEntry(0, 1, 2, 3) |
+ addTestEntry(project, 0, 1, 2, 3) |
for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} { |
for _, testCase := range []struct { |