Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1889)

Unified Diff: server/internal/logdog/collector/utils_test.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « server/internal/logdog/collector/coordinator/coordinator.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: server/internal/logdog/collector/utils_test.go
diff --git a/server/internal/logdog/collector/utils_test.go b/server/internal/logdog/collector/utils_test.go
index d32ddcec51619d02cc22925f8c826b66890ae7f3..882ca5499d0ab809195fdcd78e875c005a5bfef9 100644
--- a/server/internal/logdog/collector/utils_test.go
+++ b/server/internal/logdog/collector/utils_test.go
@@ -11,10 +11,10 @@ import (
"sort"
"strings"
"sync"
- "time"
"github.com/golang/protobuf/proto"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/logdog/butlerproto"
"github.com/luci/luci-go/common/logdog/types"
"github.com/luci/luci-go/common/proto/google"
@@ -26,6 +26,15 @@ import (
var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength)
+type streamKey struct {
+ project string
+ name string
+}
+
+func mkStreamKey(project, name string) streamKey {
+ return streamKey{project, name}
+}
+
// testCoordinator is an implementation of Coordinator that can be used for
// testing.
type testCoordinator struct {
@@ -35,7 +44,7 @@ type testCoordinator struct {
errC chan error
// state is the latest tracked stream state.
- state map[string]*cc.LogStreamState
+ state map[streamKey]*cc.LogStreamState
}
var _ cc.Coordinator = (*testCoordinator)(nil)
@@ -46,12 +55,14 @@ func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState {
// Update our state.
if c.state == nil {
- c.state = make(map[string]*cc.LogStreamState)
+ c.state = make(map[streamKey]*cc.LogStreamState)
}
- if sp := c.state[string(s.Path)]; sp != nil {
+
+ key := mkStreamKey(string(s.Project), string(s.Path))
+ if sp := c.state[key]; sp != nil {
return *sp
}
- c.state[string(s.Path)] = &s
+ c.state[key] = &s
return s
}
@@ -83,7 +94,7 @@ func (c *testCoordinator) TerminateStream(ctx context.Context, st *cc.LogStreamS
defer c.Unlock()
// Update our state.
- cachedState, ok := c.state[string(st.Path)]
+ cachedState, ok := c.state[mkStreamKey(string(st.Project), string(st.Path))]
if !ok {
return fmt.Errorf("no such stream: %s", st.Path)
}
@@ -107,11 +118,11 @@ func (c *testCoordinator) enter() error {
return nil
}
-func (c *testCoordinator) stream(name string) (int, bool) {
+func (c *testCoordinator) stream(project, name string) (int, bool) {
c.Lock()
defer c.Unlock()
- sp, ok := c.state[name]
+ sp, ok := c.state[mkStreamKey(project, name)]
if !ok {
return 0, false
}
@@ -138,22 +149,30 @@ func (s *testStorage) Put(r storage.PutRequest) error {
type bundleBuilder struct {
context.Context
- base time.Time
- entries []*logpb.ButlerLogBundle_Entry
+ base *logpb.ButlerLogBundle
}
-func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
- if b.base.IsZero() {
- b.base = clock.Now(b)
+func (b *bundleBuilder) genBase() *logpb.ButlerLogBundle {
+ if b.base == nil {
+ b.base = &logpb.ButlerLogBundle{
+ Source: "test stream",
+ Timestamp: google.NewTimestamp(clock.Now(b)),
+ Project: "test-project",
+ Prefix: "foo",
+ Secret: testSecret,
+ }
}
+ return b.base
+}
- b.entries = append(b.entries, be)
+func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
+ base := b.genBase()
+ base.Entries = append(base.Entries, be)
}
func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logpb.ButlerLogBundle_Entry {
p, n := types.StreamPath(name).Split()
be := logpb.ButlerLogBundle_Entry{
- Secret: testSecret,
Desc: &logpb.LogStreamDescriptor{
Prefix: string(p),
Name: string(n),
@@ -207,24 +226,13 @@ func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry {
}
func (b *bundleBuilder) bundle() []byte {
- bytes := b.bundleWithEntries(b.entries...)
- b.entries = nil
-
- return bytes
-}
-
-func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []byte {
- bundle := logpb.ButlerLogBundle{
- Source: "test stream",
- Timestamp: google.NewTimestamp(clock.Now(b)),
- Entries: e,
- }
-
buf := bytes.Buffer{}
w := butlerproto.Writer{Compress: true}
- if err := w.Write(&buf, &bundle); err != nil {
+ if err := w.Write(&buf, b.genBase()); err != nil {
panic(err)
}
+
+ b.base = nil
return buf.Bytes()
}
@@ -239,10 +247,15 @@ func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r
// registered a stream (string) and its terminal index (int).
func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
tcc := actual.(*testCoordinator)
- name := expected[0].(string)
- tidx := expected[1].(int)
- cur, ok := tcc.stream(name)
+ if len(expected) != 3 {
+ return "invalid number of expected arguments (should be 3)."
+ }
+ project := expected[0].(string)
+ name := expected[1].(string)
+ tidx := expected[2].(int)
+
+ cur, ok := tcc.stream(project, name)
if !ok {
return fmt.Sprintf("stream %q is not registered", name)
}
@@ -259,9 +272,13 @@ func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str
// registered a stream (string).
func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
tcc := actual.(*testCoordinator)
- name := expected[0].(string)
+ if len(expected) != 2 {
+ return "invalid number of expected arguments (should be 2)."
+ }
+ project := expected[0].(string)
+ name := expected[1].(string)
- if _, ok := tcc.stream(name); ok {
+ if _, ok := tcc.stream(project, name); ok {
return fmt.Sprintf("stream %q is registered, but it should NOT be.", name)
}
return ""
@@ -275,11 +292,14 @@ func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{})
// integer index or an intexRange marking a closed range of indices.
func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string {
st := actual.(storage.Storage)
- name := expected[0].(string)
+ project := expected[0].(string)
+ name := expected[1].(string)
+ expected = expected[2:]
// Load all entries for this stream.
req := storage.GetRequest{
- Path: types.StreamPath(name),
+ Project: config.ProjectName(project),
+ Path: types.StreamPath(name),
}
entries := make(map[int]*logpb.LogEntry)
@@ -313,7 +333,7 @@ func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string
}
var failed []string
- for _, exp := range expected[1:] {
+ for _, exp := range expected {
switch e := exp.(type) {
case int:
if err := assertLogEntry(e); err != "" {
« no previous file with comments | « server/internal/logdog/collector/coordinator/coordinator.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698