Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1040)

Unified Diff: server/internal/logdog/collector/utils_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: server/internal/logdog/collector/utils_test.go
diff --git a/server/internal/logdog/collector/utils_test.go b/server/internal/logdog/collector/utils_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..d86435e72afe64f00e80520363f82093005945ae
--- /dev/null
+++ b/server/internal/logdog/collector/utils_test.go
@@ -0,0 +1,145 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package collector
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
+ "github.com/luci/luci-go/common/logdog/types"
+ cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
+ "github.com/luci/luci-go/server/logdog/storage"
+ "golang.org/x/net/context"
+)
+
+// testCoordinatorClient is an implementation of CoordinatorClient that can be
+// used for testing.
+type testCoordinatorClient struct {
+ sync.Mutex
+
+ // calls is the number of calls made to the interface's methods.
+ calls int32
+ // callC, if not nil, will have a token pushed to it when a call is made.
+ callC chan struct{}
+ // errC is a channel that error status will be read from if not nil.
+ errC chan error
+
+ // state is the latest tracked stream state.
+ state map[string]*stateProxy
+}
+
+func (c *testCoordinatorClient) register(s stateProxy) stateProxy {
+ c.Lock()
+ defer c.Unlock()
+
+ // Update our state.
+ if c.state == nil {
+ c.state = make(map[string]*stateProxy)
+ }
+ if sp := c.state[string(s.path)]; sp != nil {
+ return *sp
+ }
+ c.state[string(s.path)] = &s
+ return s
+}
+
+func (c *testCoordinatorClient) RegisterStream(ctx context.Context, s cc.State) (*cc.State, error) {
+ if err := c.incCalls(); err != nil {
+ return nil, err
+ }
+
+ // Update our state.
+ sp := c.register(stateProxy{
+ path: s.Path,
+ proto: s.ProtoVersion,
+ secret: s.Secret,
+ terminalIndex: -1,
+ archived: s.Archived(),
+ purged: (s.State != nil && s.State.Purged),
+ })
+
+ // Set the ProtoVersion to differentiate the output State from the input.
+ rs := cc.State{
+ Path: s.Path,
+ ProtoVersion: "remote",
+ Secret: s.Secret,
+ Descriptor: s.Descriptor,
+ State: &service.LogStreamState{
+ TerminalIndex: int64(sp.terminalIndex),
+ Purged: sp.purged,
+ },
+ }
+ if sp.archived {
+ rs.State.ArchiveStreamURL = "something so we are marked as archived"
+ }
+ return &rs, nil
+}
+
+func (c *testCoordinatorClient) TerminateStream(ctx context.Context, p types.StreamPath, s []byte, idx types.MessageIndex) error {
+ if err := c.incCalls(); err != nil {
+ return err
+ }
+
+ c.Lock()
+ defer c.Unlock()
+
+ // Update our state.
+ sp, ok := c.state[string(p)]
+ if !ok {
+ return fmt.Errorf("no such stream: %v", p)
+ }
+ if sp.terminalIndex >= 0 && idx != sp.terminalIndex {
+ return fmt.Errorf("incompatible terminal indexes: %d != %d", idx, sp.terminalIndex)
+ }
+
+ sp.terminalIndex = idx
+ return nil
+}
+
+// incCalls is an entry point for client goroutines. It offers the opportunity
+// to track call count as well as trap executing goroutines within client calls.
+//
+// This must not be called while the lock is held, else it could lead to
+// deadlock if multiple goroutines are trapped.
+func (c *testCoordinatorClient) incCalls() error {
+ if c.callC != nil {
+ c.callC <- struct{}{}
+ }
+
+ atomic.AddInt32(&c.calls, 1)
+
+ if c.errC != nil {
+ return <-c.errC
+ }
+ return nil
+}
+
+func (c *testCoordinatorClient) stream(name string) (int, bool) {
+ c.Lock()
+ defer c.Unlock()
+
+ sp, ok := c.state[name]
+ if !ok {
+ return 0, false
+ }
+ return int(sp.terminalIndex), true
+}
+
+// testStorage is a testing storage instance that returns errors.
+type testStorage struct {
+ storage.Storage
+ err func() error
+}
+
+func (s *testStorage) Put(r *storage.PutRequest) error {
+ if s.err != nil {
+ if err := s.err(); err != nil {
+ return err
+ }
+ }
+ return s.Storage.Put(r)
+}

Powered by Google App Engine
This is Rietveld 408576698