Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(317)

Side by Side Diff: server/internal/logdog/collector/utils_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package collector
6
7 import (
8 "fmt"
9 "sync"
10 "sync/atomic"
11
12 "github.com/luci/luci-go/common/api/logdog_coordinator/service/v1"
13 "github.com/luci/luci-go/common/logdog/types"
14 cc "github.com/luci/luci-go/server/internal/logdog/coordinatorClient"
15 "github.com/luci/luci-go/server/logdog/storage"
16 "golang.org/x/net/context"
17 )
18
19 // testCoordinatorClient is an implementation of CoordinatorClient that can be
20 // used for testing.
21 type testCoordinatorClient struct {
22 sync.Mutex
23
24 // calls is the number of calls made to the interface's methods.
25 calls int32
26 // callC, if not nil, will have a token pushed to it when a call is made .
27 callC chan struct{}
28 // errC is a channel that error status will be read from if not nil.
29 errC chan error
30
31 // state is the latest tracked stream state.
32 state map[string]*stateProxy
33 }
34
35 func (c *testCoordinatorClient) register(s stateProxy) stateProxy {
36 c.Lock()
37 defer c.Unlock()
38
39 // Update our state.
40 if c.state == nil {
41 c.state = make(map[string]*stateProxy)
42 }
43 if sp := c.state[string(s.path)]; sp != nil {
44 return *sp
45 }
46 c.state[string(s.path)] = &s
47 return s
48 }
49
50 func (c *testCoordinatorClient) RegisterStream(ctx context.Context, s cc.State) (*cc.State, error) {
51 if err := c.incCalls(); err != nil {
52 return nil, err
53 }
54
55 // Update our state.
56 sp := c.register(stateProxy{
57 path: s.Path,
58 proto: s.ProtoVersion,
59 secret: s.Secret,
60 terminalIndex: -1,
61 archived: s.Archived(),
62 purged: (s.State != nil && s.State.Purged),
63 })
64
65 // Set the ProtoVersion to differentiate the output State from the input .
66 rs := cc.State{
67 Path: s.Path,
68 ProtoVersion: "remote",
69 Secret: s.Secret,
70 Descriptor: s.Descriptor,
71 State: &service.LogStreamState{
72 TerminalIndex: int64(sp.terminalIndex),
73 Purged: sp.purged,
74 },
75 }
76 if sp.archived {
77 rs.State.ArchiveStreamURL = "something so we are marked as archi ved"
78 }
79 return &rs, nil
80 }
81
82 func (c *testCoordinatorClient) TerminateStream(ctx context.Context, p types.Str eamPath, s []byte, idx types.MessageIndex) error {
83 if err := c.incCalls(); err != nil {
84 return err
85 }
86
87 c.Lock()
88 defer c.Unlock()
89
90 // Update our state.
91 sp, ok := c.state[string(p)]
92 if !ok {
93 return fmt.Errorf("no such stream: %v", p)
94 }
95 if sp.terminalIndex >= 0 && idx != sp.terminalIndex {
96 return fmt.Errorf("incompatible terminal indexes: %d != %d", idx , sp.terminalIndex)
97 }
98
99 sp.terminalIndex = idx
100 return nil
101 }
102
103 // incCalls is an entry point for client goroutines. It offers the opportunity
104 // to track call count as well as trap executing goroutines within client calls.
105 //
106 // This must not be called while the lock is held, else it could lead to
107 // deadlock if multiple goroutines are trapped.
108 func (c *testCoordinatorClient) incCalls() error {
109 if c.callC != nil {
110 c.callC <- struct{}{}
111 }
112
113 atomic.AddInt32(&c.calls, 1)
114
115 if c.errC != nil {
116 return <-c.errC
117 }
118 return nil
119 }
120
121 func (c *testCoordinatorClient) stream(name string) (int, bool) {
122 c.Lock()
123 defer c.Unlock()
124
125 sp, ok := c.state[name]
126 if !ok {
127 return 0, false
128 }
129 return int(sp.terminalIndex), true
130 }
131
132 // testStorage is a testing storage instance that returns errors.
133 type testStorage struct {
134 storage.Storage
135 err func() error
136 }
137
138 func (s *testStorage) Put(r *storage.PutRequest) error {
139 if s.err != nil {
140 if err := s.err(); err != nil {
141 return err
142 }
143 }
144 return s.Storage.Put(r)
145 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698