Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(364)

Unified Diff: server/internal/logdog/archivist/archivist_test.go

Issue 1909053003: LogDog: Add project namespacing to Archivist. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-coordinator-logs
Patch Set: Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/archivist/archivist_test.go
diff --git a/server/internal/logdog/archivist/archivist_test.go b/server/internal/logdog/archivist/archivist_test.go
index d038ced77f5113d1fe4c1b014d94619a0be366b6..399b878535575e62495e87c0e8dc3822b3778756 100644
--- a/server/internal/logdog/archivist/archivist_test.go
+++ b/server/internal/logdog/archivist/archivist_test.go
@@ -14,6 +14,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
"github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/errors"
"github.com/luci/luci-go/common/gcloud/gs"
"github.com/luci/luci-go/common/logdog/types"
@@ -202,6 +203,7 @@ func TestHandleArchive(t *testing.T) {
gsc := testGSClient{}
// Set up our test log stream.
+ project := "test-project"
desc := logpb.LogStreamDescriptor{
Prefix: "testing",
Name: "foo",
@@ -213,7 +215,7 @@ func TestHandleArchive(t *testing.T) {
}
// Utility function to add a log entry for "ls".
- addTestEntry := func(idxs ...int) {
+ addTestEntry := func(p string, idxs ...int) {
for _, v := range idxs {
le := logpb.LogEntry{
PrefixIndex: uint64(v),
@@ -234,9 +236,10 @@ func TestHandleArchive(t *testing.T) {
}
err = st.Put(storage.PutRequest{
- Path: desc.Path(),
- Index: types.MessageIndex(v),
- Values: [][]byte{d},
+ Project: config.ProjectName(p),
+ Path: desc.Path(),
+ Index: types.MessageIndex(v),
+ Values: [][]byte{d},
})
if err != nil {
panic(err)
@@ -250,6 +253,7 @@ func TestHandleArchive(t *testing.T) {
// Set up our testing archival task.
expired := 10 * time.Minute
archiveTask := logdog.ArchiveTask{
+ Project: project,
Path: string(desc.Path()),
SettleDelay: google.NewDuration(10 * time.Second),
CompletePeriod: google.NewDuration(expired),
@@ -297,8 +301,8 @@ func TestHandleArchive(t *testing.T) {
GSStagingBase: gs.Path("gs://archive-test-staging/path/to/archive/"), // Extra slashes to test concatenation.
}
- gsURL := func(p string) string {
- return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s", desc.Path(), p)
+ gsURL := func(project, name string) string {
+ return fmt.Sprintf("gs://archive-test/path/to/archive/%s/%s/%s", project, desc.Path(), name)
}
// hasStreams can be called to check that the retained archiveRequest had
@@ -398,7 +402,7 @@ func TestHandleArchive(t *testing.T) {
})
Convey(`Will fail to archive {0, 1, 2, 4} (incomplete).`, func() {
- addTestEntry(0, 1, 2, 4)
+ addTestEntry(project, 0, 1, 2, 4)
ack, err := ar.archiveTaskImpl(c, task)
So(err, ShouldErrLike, "missing log entry")
@@ -406,7 +410,7 @@ func TestHandleArchive(t *testing.T) {
})
Convey(`Will successfully archive {0, 1, 2, 3, 4}, stopping at the terminal index.`, func() {
- addTestEntry(0, 1, 2, 3, 4)
+ addTestEntry(project, 0, 1, 2, 3, 4)
ack, err := ar.archiveTaskImpl(c, task)
So(err, ShouldBeNil)
@@ -414,18 +418,19 @@ func TestHandleArchive(t *testing.T) {
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Project: project,
Path: archiveTask.Path,
LogEntryCount: 4,
TerminalIndex: 3,
- StreamUrl: gsURL("logstream.entries"),
- IndexUrl: gsURL("logstream.index"),
- DataUrl: gsURL("data.bin"),
+ StreamUrl: gsURL(project, "logstream.entries"),
+ IndexUrl: gsURL(project, "logstream.index"),
+ DataUrl: gsURL(project, "data.bin"),
})
})
Convey(`When a transient archival error occurs, will not ACK it.`, func() {
- addTestEntry(0, 1, 2, 3, 4)
+ addTestEntry(project, 0, 1, 2, 3, 4)
gsc.newWriterErr = func(*testGSWriter) error { return errors.WrapTransient(errors.New("test error")) }
ack, err := ar.archiveTaskImpl(c, task)
@@ -434,7 +439,7 @@ func TestHandleArchive(t *testing.T) {
})
Convey(`When a non-transient archival error occurs`, func() {
- addTestEntry(0, 1, 2, 3, 4)
+ addTestEntry(project, 0, 1, 2, 3, 4)
archiveErr := errors.New("archive failure error")
gsc.newWriterErr = func(*testGSWriter) error { return archiveErr }
@@ -446,8 +451,9 @@ func TestHandleArchive(t *testing.T) {
So(ack, ShouldBeFalse)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: archiveTask.Path,
- Error: "archive failure error",
+ Project: project,
+ Path: archiveTask.Path,
+ Error: "archive failure error",
})
})
@@ -456,8 +462,9 @@ func TestHandleArchive(t *testing.T) {
So(err, ShouldBeNil)
So(ack, ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: archiveTask.Path,
- Error: "archive failure error",
+ Project: project,
+ Path: archiveTask.Path,
+ Error: "archive failure error",
})
})
@@ -468,8 +475,9 @@ func TestHandleArchive(t *testing.T) {
So(err, ShouldBeNil)
So(ack, ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
- Path: archiveTask.Path,
- Error: "archival error",
+ Project: project,
+ Path: archiveTask.Path,
+ Error: "archival error",
})
})
})
@@ -486,18 +494,19 @@ func TestHandleArchive(t *testing.T) {
So(hasStreams(true, true, false), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Project: project,
Path: archiveTask.Path,
LogEntryCount: 0,
TerminalIndex: -1,
- StreamUrl: gsURL("logstream.entries"),
- IndexUrl: gsURL("logstream.index"),
- DataUrl: gsURL("data.bin"),
+ StreamUrl: gsURL(project, "logstream.entries"),
+ IndexUrl: gsURL(project, "logstream.index"),
+ DataUrl: gsURL(project, "data.bin"),
})
})
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index.`, func() {
- addTestEntry(0, 1, 2, 4)
+ addTestEntry(project, 0, 1, 2, 4)
ack, err := ar.archiveTaskImpl(c, task)
So(err, ShouldBeNil)
@@ -505,13 +514,14 @@ func TestHandleArchive(t *testing.T) {
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Project: project,
Path: archiveTask.Path,
LogEntryCount: 4,
TerminalIndex: 4,
- StreamUrl: gsURL("logstream.entries"),
- IndexUrl: gsURL("logstream.index"),
- DataUrl: gsURL("data.bin"),
+ StreamUrl: gsURL(project, "logstream.entries"),
+ IndexUrl: gsURL(project, "logstream.index"),
+ DataUrl: gsURL(project, "data.bin"),
})
})
})
@@ -526,18 +536,19 @@ func TestHandleArchive(t *testing.T) {
So(hasStreams(true, true, false), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Project: project,
Path: archiveTask.Path,
LogEntryCount: 0,
TerminalIndex: -1,
- StreamUrl: gsURL("logstream.entries"),
- IndexUrl: gsURL("logstream.index"),
- DataUrl: gsURL("data.bin"),
+ StreamUrl: gsURL(project, "logstream.entries"),
+ IndexUrl: gsURL(project, "logstream.index"),
+ DataUrl: gsURL(project, "data.bin"),
})
})
Convey(`With {0, 1, 2, 4} (incomplete) will archive the stream and update its terminal index to 2.`, func() {
- addTestEntry(0, 1, 2, 4)
+ addTestEntry(project, 0, 1, 2, 4)
ack, err := ar.archiveTaskImpl(c, task)
So(err, ShouldBeNil)
@@ -545,22 +556,48 @@ func TestHandleArchive(t *testing.T) {
So(hasStreams(true, true, true), ShouldBeTrue)
So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Project: project,
Path: archiveTask.Path,
LogEntryCount: 3,
TerminalIndex: 2,
- StreamUrl: gsURL("logstream.entries"),
- IndexUrl: gsURL("logstream.index"),
- DataUrl: gsURL("data.bin"),
+ StreamUrl: gsURL(project, "logstream.entries"),
+ IndexUrl: gsURL(project, "logstream.index"),
+ DataUrl: gsURL(project, "data.bin"),
})
})
})
})
+ Convey(`With an empty project`, func() {
+ archiveTask.Project = ""
+
+ Convey(`Will successfully archive {0, 1, 2, 3} with terminal index 3 using "_" for project archive path.`, func() {
+ stream.State.TerminalIndex = 3
+ addTestEntry("", 0, 1, 2, 3)
+
+ ack, err := ar.archiveTaskImpl(c, task)
+ So(err, ShouldBeNil)
+ So(ack, ShouldBeTrue)
+
+ So(hasStreams(true, true, true), ShouldBeTrue)
+ So(archiveRequest, ShouldResemble, &logdog.ArchiveStreamRequest{
+ Project: "",
+ Path: archiveTask.Path,
+ LogEntryCount: 4,
+ TerminalIndex: 3,
+
+ StreamUrl: gsURL("_", "logstream.entries"),
+ IndexUrl: gsURL("_", "logstream.index"),
+ DataUrl: gsURL("_", "data.bin"),
+ })
+ })
+ })
+
// Simulate failures during the various stream generation operations.
Convey(`Stream generation failures`, func() {
stream.State.TerminalIndex = 3
- addTestEntry(0, 1, 2, 3)
+ addTestEntry(project, 0, 1, 2, 3)
for _, failName := range []string{"/logstream.entries", "/logstream.index", "/data.bin"} {
for _, testCase := range []struct {

Powered by Google App Engine
This is Rietveld 408576698