Index: server/internal/logdog/collector/utils_test.go |
diff --git a/server/internal/logdog/collector/utils_test.go b/server/internal/logdog/collector/utils_test.go |
index d32ddcec51619d02cc22925f8c826b66890ae7f3..882ca5499d0ab809195fdcd78e875c005a5bfef9 100644 |
--- a/server/internal/logdog/collector/utils_test.go |
+++ b/server/internal/logdog/collector/utils_test.go |
@@ -11,10 +11,10 @@ import ( |
"sort" |
"strings" |
"sync" |
- "time" |
"github.com/golang/protobuf/proto" |
"github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/config" |
"github.com/luci/luci-go/common/logdog/butlerproto" |
"github.com/luci/luci-go/common/logdog/types" |
"github.com/luci/luci-go/common/proto/google" |
@@ -26,6 +26,15 @@ import ( |
var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength) |
+type streamKey struct { |
+ project string |
+ name string |
+} |
+ |
+func mkStreamKey(project, name string) streamKey { |
+ return streamKey{project, name} |
+} |
+ |
// testCoordinator is an implementation of Coordinator that can be used for |
// testing. |
type testCoordinator struct { |
@@ -35,7 +44,7 @@ type testCoordinator struct { |
errC chan error |
// state is the latest tracked stream state. |
- state map[string]*cc.LogStreamState |
+ state map[streamKey]*cc.LogStreamState |
} |
var _ cc.Coordinator = (*testCoordinator)(nil) |
@@ -46,12 +55,14 @@ func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { |
// Update our state. |
if c.state == nil { |
- c.state = make(map[string]*cc.LogStreamState) |
+ c.state = make(map[streamKey]*cc.LogStreamState) |
} |
- if sp := c.state[string(s.Path)]; sp != nil { |
+ |
+ key := mkStreamKey(string(s.Project), string(s.Path)) |
+ if sp := c.state[key]; sp != nil { |
return *sp |
} |
- c.state[string(s.Path)] = &s |
+ c.state[key] = &s |
return s |
} |
@@ -83,7 +94,7 @@ func (c *testCoordinator) TerminateStream(ctx context.Context, st *cc.LogStreamS |
defer c.Unlock() |
// Update our state. |
- cachedState, ok := c.state[string(st.Path)] |
+ cachedState, ok := c.state[mkStreamKey(string(st.Project), string(st.Path))] |
if !ok { |
return fmt.Errorf("no such stream: %s", st.Path) |
} |
@@ -107,11 +118,11 @@ func (c *testCoordinator) enter() error { |
return nil |
} |
-func (c *testCoordinator) stream(name string) (int, bool) { |
+func (c *testCoordinator) stream(project, name string) (int, bool) { |
c.Lock() |
defer c.Unlock() |
- sp, ok := c.state[name] |
+ sp, ok := c.state[mkStreamKey(project, name)] |
if !ok { |
return 0, false |
} |
@@ -138,22 +149,30 @@ func (s *testStorage) Put(r storage.PutRequest) error { |
type bundleBuilder struct { |
context.Context |
- base time.Time |
- entries []*logpb.ButlerLogBundle_Entry |
+ base *logpb.ButlerLogBundle |
} |
-func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { |
- if b.base.IsZero() { |
- b.base = clock.Now(b) |
+func (b *bundleBuilder) genBase() *logpb.ButlerLogBundle { |
+ if b.base == nil { |
+ b.base = &logpb.ButlerLogBundle{ |
+ Source: "test stream", |
+ Timestamp: google.NewTimestamp(clock.Now(b)), |
+ Project: "test-project", |
+ Prefix: "foo", |
+ Secret: testSecret, |
+ } |
} |
+ return b.base |
+} |
- b.entries = append(b.entries, be) |
+func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { |
+ base := b.genBase() |
+ base.Entries = append(base.Entries, be) |
} |
func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logpb.ButlerLogBundle_Entry { |
p, n := types.StreamPath(name).Split() |
be := logpb.ButlerLogBundle_Entry{ |
- Secret: testSecret, |
Desc: &logpb.LogStreamDescriptor{ |
Prefix: string(p), |
Name: string(n), |
@@ -207,24 +226,13 @@ func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry { |
} |
func (b *bundleBuilder) bundle() []byte { |
- bytes := b.bundleWithEntries(b.entries...) |
- b.entries = nil |
- |
- return bytes |
-} |
- |
-func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []byte { |
- bundle := logpb.ButlerLogBundle{ |
- Source: "test stream", |
- Timestamp: google.NewTimestamp(clock.Now(b)), |
- Entries: e, |
- } |
- |
buf := bytes.Buffer{} |
w := butlerproto.Writer{Compress: true} |
- if err := w.Write(&buf, &bundle); err != nil { |
+ if err := w.Write(&buf, b.genBase()); err != nil { |
panic(err) |
} |
+ |
+ b.base = nil |
return buf.Bytes() |
} |
@@ -239,10 +247,15 @@ func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r |
// registered a stream (string) and its terminal index (int). |
func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) string { |
tcc := actual.(*testCoordinator) |
- name := expected[0].(string) |
- tidx := expected[1].(int) |
- cur, ok := tcc.stream(name) |
+ if len(expected) != 3 { |
+ return "invalid number of expected arguments (should be 3)." |
+ } |
+ project := expected[0].(string) |
+ name := expected[1].(string) |
+ tidx := expected[2].(int) |
+ |
+ cur, ok := tcc.stream(project, name) |
if !ok { |
return fmt.Sprintf("stream %q is not registered", name) |
} |
@@ -259,9 +272,13 @@ func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str |
// registered a stream (string). |
func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string { |
tcc := actual.(*testCoordinator) |
- name := expected[0].(string) |
+ if len(expected) != 2 { |
+ return "invalid number of expected arguments (should be 2)." |
+ } |
+ project := expected[0].(string) |
+ name := expected[1].(string) |
- if _, ok := tcc.stream(name); ok { |
+ if _, ok := tcc.stream(project, name); ok { |
return fmt.Sprintf("stream %q is registered, but it should NOT be.", name) |
} |
return "" |
@@ -275,11 +292,14 @@ func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) |
// integer index or an intexRange marking a closed range of indices. |
func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string { |
st := actual.(storage.Storage) |
- name := expected[0].(string) |
+ project := expected[0].(string) |
+ name := expected[1].(string) |
+ expected = expected[2:] |
// Load all entries for this stream. |
req := storage.GetRequest{ |
- Path: types.StreamPath(name), |
+ Project: config.ProjectName(project), |
+ Path: types.StreamPath(name), |
} |
entries := make(map[int]*logpb.LogEntry) |
@@ -313,7 +333,7 @@ func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string |
} |
var failed []string |
- for _, exp := range expected[1:] { |
+ for _, exp := range expected { |
switch e := exp.(type) { |
case int: |
if err := assertLogEntry(e); err != "" { |