| Index: server/internal/logdog/collector/utils_test.go
|
| diff --git a/server/internal/logdog/collector/utils_test.go b/server/internal/logdog/collector/utils_test.go
|
| index d32ddcec51619d02cc22925f8c826b66890ae7f3..882ca5499d0ab809195fdcd78e875c005a5bfef9 100644
|
| --- a/server/internal/logdog/collector/utils_test.go
|
| +++ b/server/internal/logdog/collector/utils_test.go
|
| @@ -11,10 +11,10 @@ import (
|
| "sort"
|
| "strings"
|
| "sync"
|
| - "time"
|
|
|
| "github.com/golang/protobuf/proto"
|
| "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/logdog/butlerproto"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| "github.com/luci/luci-go/common/proto/google"
|
| @@ -26,6 +26,15 @@ import (
|
|
|
| var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength)
|
|
|
| +type streamKey struct {
|
| + project string
|
| + name string
|
| +}
|
| +
|
| +func mkStreamKey(project, name string) streamKey {
|
| + return streamKey{project, name}
|
| +}
|
| +
|
| // testCoordinator is an implementation of Coordinator that can be used for
|
| // testing.
|
| type testCoordinator struct {
|
| @@ -35,7 +44,7 @@ type testCoordinator struct {
|
| errC chan error
|
|
|
| // state is the latest tracked stream state.
|
| - state map[string]*cc.LogStreamState
|
| + state map[streamKey]*cc.LogStreamState
|
| }
|
|
|
| var _ cc.Coordinator = (*testCoordinator)(nil)
|
| @@ -46,12 +55,14 @@ func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState {
|
|
|
| // Update our state.
|
| if c.state == nil {
|
| - c.state = make(map[string]*cc.LogStreamState)
|
| + c.state = make(map[streamKey]*cc.LogStreamState)
|
| }
|
| - if sp := c.state[string(s.Path)]; sp != nil {
|
| +
|
| + key := mkStreamKey(string(s.Project), string(s.Path))
|
| + if sp := c.state[key]; sp != nil {
|
| return *sp
|
| }
|
| - c.state[string(s.Path)] = &s
|
| + c.state[key] = &s
|
| return s
|
| }
|
|
|
| @@ -83,7 +94,7 @@ func (c *testCoordinator) TerminateStream(ctx context.Context, st *cc.LogStreamS
|
| defer c.Unlock()
|
|
|
| // Update our state.
|
| - cachedState, ok := c.state[string(st.Path)]
|
| + cachedState, ok := c.state[mkStreamKey(string(st.Project), string(st.Path))]
|
| if !ok {
|
| return fmt.Errorf("no such stream: %s", st.Path)
|
| }
|
| @@ -107,11 +118,11 @@ func (c *testCoordinator) enter() error {
|
| return nil
|
| }
|
|
|
| -func (c *testCoordinator) stream(name string) (int, bool) {
|
| +func (c *testCoordinator) stream(project, name string) (int, bool) {
|
| c.Lock()
|
| defer c.Unlock()
|
|
|
| - sp, ok := c.state[name]
|
| + sp, ok := c.state[mkStreamKey(project, name)]
|
| if !ok {
|
| return 0, false
|
| }
|
| @@ -138,22 +149,30 @@ func (s *testStorage) Put(r storage.PutRequest) error {
|
| type bundleBuilder struct {
|
| context.Context
|
|
|
| - base time.Time
|
| - entries []*logpb.ButlerLogBundle_Entry
|
| + base *logpb.ButlerLogBundle
|
| }
|
|
|
| -func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
|
| - if b.base.IsZero() {
|
| - b.base = clock.Now(b)
|
| +func (b *bundleBuilder) genBase() *logpb.ButlerLogBundle {
|
| + if b.base == nil {
|
| + b.base = &logpb.ButlerLogBundle{
|
| + Source: "test stream",
|
| + Timestamp: google.NewTimestamp(clock.Now(b)),
|
| + Project: "test-project",
|
| + Prefix: "foo",
|
| + Secret: testSecret,
|
| + }
|
| }
|
| + return b.base
|
| +}
|
|
|
| - b.entries = append(b.entries, be)
|
| +func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
|
| + base := b.genBase()
|
| + base.Entries = append(base.Entries, be)
|
| }
|
|
|
| func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logpb.ButlerLogBundle_Entry {
|
| p, n := types.StreamPath(name).Split()
|
| be := logpb.ButlerLogBundle_Entry{
|
| - Secret: testSecret,
|
| Desc: &logpb.LogStreamDescriptor{
|
| Prefix: string(p),
|
| Name: string(n),
|
| @@ -207,24 +226,13 @@ func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry {
|
| }
|
|
|
| func (b *bundleBuilder) bundle() []byte {
|
| - bytes := b.bundleWithEntries(b.entries...)
|
| - b.entries = nil
|
| -
|
| - return bytes
|
| -}
|
| -
|
| -func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []byte {
|
| - bundle := logpb.ButlerLogBundle{
|
| - Source: "test stream",
|
| - Timestamp: google.NewTimestamp(clock.Now(b)),
|
| - Entries: e,
|
| - }
|
| -
|
| buf := bytes.Buffer{}
|
| w := butlerproto.Writer{Compress: true}
|
| - if err := w.Write(&buf, &bundle); err != nil {
|
| + if err := w.Write(&buf, b.genBase()); err != nil {
|
| panic(err)
|
| }
|
| +
|
| + b.base = nil
|
| return buf.Bytes()
|
| }
|
|
|
| @@ -239,10 +247,15 @@ func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r
|
| // registered a stream (string) and its terminal index (int).
|
| func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
|
| tcc := actual.(*testCoordinator)
|
| - name := expected[0].(string)
|
| - tidx := expected[1].(int)
|
|
|
| - cur, ok := tcc.stream(name)
|
| + if len(expected) != 3 {
|
| + return "invalid number of expected arguments (should be 3)."
|
| + }
|
| + project := expected[0].(string)
|
| + name := expected[1].(string)
|
| + tidx := expected[2].(int)
|
| +
|
| + cur, ok := tcc.stream(project, name)
|
| if !ok {
|
| return fmt.Sprintf("stream %q is not registered", name)
|
| }
|
| @@ -259,9 +272,13 @@ func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str
|
| // registered a stream (string).
|
| func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
|
| tcc := actual.(*testCoordinator)
|
| - name := expected[0].(string)
|
| + if len(expected) != 2 {
|
| + return "invalid number of expected arguments (should be 2)."
|
| + }
|
| + project := expected[0].(string)
|
| + name := expected[1].(string)
|
|
|
| - if _, ok := tcc.stream(name); ok {
|
| + if _, ok := tcc.stream(project, name); ok {
|
| return fmt.Sprintf("stream %q is registered, but it should NOT be.", name)
|
| }
|
| return ""
|
| @@ -275,11 +292,14 @@ func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{})
|
| // integer index or an intexRange marking a closed range of indices.
|
| func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string {
|
| st := actual.(storage.Storage)
|
| - name := expected[0].(string)
|
| + project := expected[0].(string)
|
| + name := expected[1].(string)
|
| + expected = expected[2:]
|
|
|
| // Load all entries for this stream.
|
| req := storage.GetRequest{
|
| - Path: types.StreamPath(name),
|
| + Project: config.ProjectName(project),
|
| + Path: types.StreamPath(name),
|
| }
|
|
|
| entries := make(map[int]*logpb.LogEntry)
|
| @@ -313,7 +333,7 @@ func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string
|
| }
|
|
|
| var failed []string
|
| - for _, exp := range expected[1:] {
|
| + for _, exp := range expected {
|
| switch e := exp.(type) {
|
| case int:
|
| if err := assertLogEntry(e); err != "" {
|
|
|