Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(76)

Side by Side Diff: server/internal/logdog/collector/utils_test.go

Issue 1906023002: LogDog: Add project namespace to Butler/Collector. (Closed) Base URL: https://github.com/luci/luci-go@logdog-project-archivist
Patch Set: Rebase? Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « server/internal/logdog/collector/coordinator/coordinator.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package collector 5 package collector
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "sort" 11 "sort"
12 "strings" 12 "strings"
13 "sync" 13 "sync"
14 "time"
15 14
16 "github.com/golang/protobuf/proto" 15 "github.com/golang/protobuf/proto"
17 "github.com/luci/luci-go/common/clock" 16 "github.com/luci/luci-go/common/clock"
17 "github.com/luci/luci-go/common/config"
18 "github.com/luci/luci-go/common/logdog/butlerproto" 18 "github.com/luci/luci-go/common/logdog/butlerproto"
19 "github.com/luci/luci-go/common/logdog/types" 19 "github.com/luci/luci-go/common/logdog/types"
20 "github.com/luci/luci-go/common/proto/google" 20 "github.com/luci/luci-go/common/proto/google"
21 "github.com/luci/luci-go/common/proto/logdog/logpb" 21 "github.com/luci/luci-go/common/proto/logdog/logpb"
22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator " 22 cc "github.com/luci/luci-go/server/internal/logdog/collector/coordinator "
23 "github.com/luci/luci-go/server/logdog/storage" 23 "github.com/luci/luci-go/server/logdog/storage"
24 "golang.org/x/net/context" 24 "golang.org/x/net/context"
25 ) 25 )
26 26
27 var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength) 27 var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength)
28 28
29 type streamKey struct {
30 project string
31 name string
32 }
33
34 func mkStreamKey(project, name string) streamKey {
35 return streamKey{project, name}
36 }
37
29 // testCoordinator is an implementation of Coordinator that can be used for 38 // testCoordinator is an implementation of Coordinator that can be used for
30 // testing. 39 // testing.
31 type testCoordinator struct { 40 type testCoordinator struct {
32 sync.Mutex 41 sync.Mutex
33 42
34 // errC is a channel that error status will be read from if not nil. 43 // errC is a channel that error status will be read from if not nil.
35 errC chan error 44 errC chan error
36 45
37 // state is the latest tracked stream state. 46 // state is the latest tracked stream state.
38 » state map[string]*cc.LogStreamState 47 » state map[streamKey]*cc.LogStreamState
39 } 48 }
40 49
41 var _ cc.Coordinator = (*testCoordinator)(nil) 50 var _ cc.Coordinator = (*testCoordinator)(nil)
42 51
43 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { 52 func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState {
44 c.Lock() 53 c.Lock()
45 defer c.Unlock() 54 defer c.Unlock()
46 55
47 // Update our state. 56 // Update our state.
48 if c.state == nil { 57 if c.state == nil {
49 » » c.state = make(map[string]*cc.LogStreamState) 58 » » c.state = make(map[streamKey]*cc.LogStreamState)
50 } 59 }
51 » if sp := c.state[string(s.Path)]; sp != nil { 60
61 » key := mkStreamKey(string(s.Project), string(s.Path))
62 » if sp := c.state[key]; sp != nil {
52 return *sp 63 return *sp
53 } 64 }
54 » c.state[string(s.Path)] = &s 65 » c.state[key] = &s
55 return s 66 return s
56 } 67 }
57 68
58 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta te, d *logpb.LogStreamDescriptor) ( 69 func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamSta te, d *logpb.LogStreamDescriptor) (
59 *cc.LogStreamState, error) { 70 *cc.LogStreamState, error) {
60 if err := c.enter(); err != nil { 71 if err := c.enter(); err != nil {
61 return nil, err 72 return nil, err
62 } 73 }
63 74
64 // Enforce that the submitted stream is not terminal. 75 // Enforce that the submitted stream is not terminal.
(...skipping 11 matching lines...) Expand all
76 } 87 }
77 88
78 if st.TerminalIndex < 0 { 89 if st.TerminalIndex < 0 {
79 return errors.New("submitted stream is not terminal") 90 return errors.New("submitted stream is not terminal")
80 } 91 }
81 92
82 c.Lock() 93 c.Lock()
83 defer c.Unlock() 94 defer c.Unlock()
84 95
85 // Update our state. 96 // Update our state.
86 » cachedState, ok := c.state[string(st.Path)] 97 » cachedState, ok := c.state[mkStreamKey(string(st.Project), string(st.Pat h))]
87 if !ok { 98 if !ok {
88 return fmt.Errorf("no such stream: %s", st.Path) 99 return fmt.Errorf("no such stream: %s", st.Path)
89 } 100 }
90 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter minalIndex { 101 if cachedState.TerminalIndex >= 0 && st.TerminalIndex != cachedState.Ter minalIndex {
91 return fmt.Errorf("incompatible terminal indexes: %d != %d", st. TerminalIndex, cachedState.TerminalIndex) 102 return fmt.Errorf("incompatible terminal indexes: %d != %d", st. TerminalIndex, cachedState.TerminalIndex)
92 } 103 }
93 104
94 cachedState.TerminalIndex = st.TerminalIndex 105 cachedState.TerminalIndex = st.TerminalIndex
95 return nil 106 return nil
96 } 107 }
97 108
98 // enter is an entry point for client goroutines. It offers the opportunity 109 // enter is an entry point for client goroutines. It offers the opportunity
99 // to trap executing goroutines within client calls. 110 // to trap executing goroutines within client calls.
100 // 111 //
101 // This must NOT be called while the lock is held, else it could lead to 112 // This must NOT be called while the lock is held, else it could lead to
102 // deadlock if multiple goroutines are trapped. 113 // deadlock if multiple goroutines are trapped.
103 func (c *testCoordinator) enter() error { 114 func (c *testCoordinator) enter() error {
104 if c.errC != nil { 115 if c.errC != nil {
105 return <-c.errC 116 return <-c.errC
106 } 117 }
107 return nil 118 return nil
108 } 119 }
109 120
110 func (c *testCoordinator) stream(name string) (int, bool) { 121 func (c *testCoordinator) stream(project, name string) (int, bool) {
111 c.Lock() 122 c.Lock()
112 defer c.Unlock() 123 defer c.Unlock()
113 124
114 » sp, ok := c.state[name] 125 » sp, ok := c.state[mkStreamKey(project, name)]
115 if !ok { 126 if !ok {
116 return 0, false 127 return 0, false
117 } 128 }
118 return int(sp.TerminalIndex), true 129 return int(sp.TerminalIndex), true
119 } 130 }
120 131
121 // testStorage is a testing storage instance that returns errors. 132 // testStorage is a testing storage instance that returns errors.
122 type testStorage struct { 133 type testStorage struct {
123 storage.Storage 134 storage.Storage
124 err func() error 135 err func() error
125 } 136 }
126 137
127 func (s *testStorage) Put(r storage.PutRequest) error { 138 func (s *testStorage) Put(r storage.PutRequest) error {
128 if s.err != nil { 139 if s.err != nil {
129 if err := s.err(); err != nil { 140 if err := s.err(); err != nil {
130 return err 141 return err
131 } 142 }
132 } 143 }
133 return s.Storage.Put(r) 144 return s.Storage.Put(r)
134 } 145 }
135 146
136 // bundleBuilder is a set of utility functions to help test cases construct 147 // bundleBuilder is a set of utility functions to help test cases construct
137 // specific logpb.ButlerLogBundle layouts. 148 // specific logpb.ButlerLogBundle layouts.
138 type bundleBuilder struct { 149 type bundleBuilder struct {
139 context.Context 150 context.Context
140 151
141 » base time.Time 152 » base *logpb.ButlerLogBundle
142 » entries []*logpb.ButlerLogBundle_Entry 153 }
154
155 func (b *bundleBuilder) genBase() *logpb.ButlerLogBundle {
156 » if b.base == nil {
157 » » b.base = &logpb.ButlerLogBundle{
158 » » » Source: "test stream",
159 » » » Timestamp: google.NewTimestamp(clock.Now(b)),
160 » » » Project: "test-project",
161 » » » Prefix: "foo",
162 » » » Secret: testSecret,
163 » » }
164 » }
165 » return b.base
143 } 166 }
144 167
145 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { 168 func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) {
146 » if b.base.IsZero() { 169 » base := b.genBase()
147 » » b.base = clock.Now(b) 170 » base.Entries = append(base.Entries, be)
148 » }
149
150 » b.entries = append(b.entries, be)
151 } 171 }
152 172
153 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp b.ButlerLogBundle_Entry { 173 func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logp b.ButlerLogBundle_Entry {
154 p, n := types.StreamPath(name).Split() 174 p, n := types.StreamPath(name).Split()
155 be := logpb.ButlerLogBundle_Entry{ 175 be := logpb.ButlerLogBundle_Entry{
156 Secret: testSecret,
157 Desc: &logpb.LogStreamDescriptor{ 176 Desc: &logpb.LogStreamDescriptor{
158 Prefix: string(p), 177 Prefix: string(p),
159 Name: string(n), 178 Name: string(n),
160 ContentType: "application/test-message", 179 ContentType: "application/test-message",
161 StreamType: logpb.StreamType_TEXT, 180 StreamType: logpb.StreamType_TEXT,
162 Timestamp: google.NewTimestamp(clock.Now(b)), 181 Timestamp: google.NewTimestamp(clock.Now(b)),
163 }, 182 },
164 } 183 }
165 184
166 if len(idxs) > 0 { 185 if len(idxs) > 0 {
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
200 Value: fmt.Sprintf("Line #%d ", idx), 219 Value: fmt.Sprintf("Line #%d ", idx),
201 Delimiter: "\n", 220 Delimiter: "\n",
202 }, 221 },
203 }, 222 },
204 }, 223 },
205 }, 224 },
206 } 225 }
207 } 226 }
208 227
209 func (b *bundleBuilder) bundle() []byte { 228 func (b *bundleBuilder) bundle() []byte {
210 » bytes := b.bundleWithEntries(b.entries...) 229 » buf := bytes.Buffer{}
211 » b.entries = nil 230 » w := butlerproto.Writer{Compress: true}
212 231 » if err := w.Write(&buf, b.genBase()); err != nil {
213 » return bytes 232 » » panic(err)
214 }
215
216 func (b *bundleBuilder) bundleWithEntries(e ...*logpb.ButlerLogBundle_Entry) []b yte {
217 » bundle := logpb.ButlerLogBundle{
218 » » Source: "test stream",
219 » » Timestamp: google.NewTimestamp(clock.Now(b)),
220 » » Entries: e,
221 } 233 }
222 234
223 » buf := bytes.Buffer{} 235 » b.base = nil
224 » w := butlerproto.Writer{Compress: true}
225 » if err := w.Write(&buf, &bundle); err != nil {
226 » » panic(err)
227 » }
228 return buf.Bytes() 236 return buf.Bytes()
229 } 237 }
230 238
231 type indexRange struct { 239 type indexRange struct {
232 start int 240 start int
233 end int 241 end int
234 } 242 }
235 243
236 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r .end) } 244 func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r .end) }
237 245
238 // shouldHaveRegisteredStream asserts that a testCoordinator has 246 // shouldHaveRegisteredStream asserts that a testCoordinator has
239 // registered a stream (string) and its terminal index (int). 247 // registered a stream (string) and its terminal index (int).
240 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str ing { 248 func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) str ing {
241 tcc := actual.(*testCoordinator) 249 tcc := actual.(*testCoordinator)
242 name := expected[0].(string)
243 tidx := expected[1].(int)
244 250
245 » cur, ok := tcc.stream(name) 251 » if len(expected) != 3 {
252 » » return "invalid number of expected arguments (should be 3)."
253 » }
254 » project := expected[0].(string)
255 » name := expected[1].(string)
256 » tidx := expected[2].(int)
257
258 » cur, ok := tcc.stream(project, name)
246 if !ok { 259 if !ok {
247 return fmt.Sprintf("stream %q is not registered", name) 260 return fmt.Sprintf("stream %q is not registered", name)
248 } 261 }
249 if tidx >= 0 && cur < 0 { 262 if tidx >= 0 && cur < 0 {
250 return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", name) 263 return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", name)
251 } 264 }
252 if cur >= 0 && tidx < 0 { 265 if cur >= 0 && tidx < 0 {
253 return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", name) 266 return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", name)
254 } 267 }
255 return "" 268 return ""
256 } 269 }
257 270
258 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not 271 // shoudNotHaveRegisteredStream asserts that a testCoordinator has not
259 // registered a stream (string). 272 // registered a stream (string).
260 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string { 273 func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string {
261 tcc := actual.(*testCoordinator) 274 tcc := actual.(*testCoordinator)
262 » name := expected[0].(string) 275 » if len(expected) != 2 {
276 » » return "invalid number of expected arguments (should be 2)."
277 » }
278 » project := expected[0].(string)
279 » name := expected[1].(string)
263 280
264 » if _, ok := tcc.stream(name); ok { 281 » if _, ok := tcc.stream(project, name); ok {
265 return fmt.Sprintf("stream %q is registered, but it should NOT b e.", name) 282 return fmt.Sprintf("stream %q is registered, but it should NOT b e.", name)
266 } 283 }
267 return "" 284 return ""
268 } 285 }
269 286
270 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous 287 // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous
271 // stream records in it. 288 // stream records in it.
272 // 289 //
273 // actual is the storage.Storage instance. expected is a stream name (string) 290 // actual is the storage.Storage instance. expected is a stream name (string)
274 // followed by a a series of records to assert. This can either be a specific 291 // followed by a a series of records to assert. This can either be a specific
275 // integer index or an intexRange marking a closed range of indices. 292 // integer index or an intexRange marking a closed range of indices.
276 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string { 293 func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string {
277 st := actual.(storage.Storage) 294 st := actual.(storage.Storage)
278 » name := expected[0].(string) 295 » project := expected[0].(string)
296 » name := expected[1].(string)
297 » expected = expected[2:]
279 298
280 // Load all entries for this stream. 299 // Load all entries for this stream.
281 req := storage.GetRequest{ 300 req := storage.GetRequest{
282 » » Path: types.StreamPath(name), 301 » » Project: config.ProjectName(project),
302 » » Path: types.StreamPath(name),
283 } 303 }
284 304
285 entries := make(map[int]*logpb.LogEntry) 305 entries := make(map[int]*logpb.LogEntry)
286 var ierr error 306 var ierr error
287 err := st.Get(req, func(idx types.MessageIndex, d []byte) bool { 307 err := st.Get(req, func(idx types.MessageIndex, d []byte) bool {
288 le := logpb.LogEntry{} 308 le := logpb.LogEntry{}
289 if ierr = proto.Unmarshal(d, &le); ierr != nil { 309 if ierr = proto.Unmarshal(d, &le); ierr != nil {
290 return false 310 return false
291 } 311 }
292 entries[int(idx)] = &le 312 entries[int(idx)] = &le
(...skipping 13 matching lines...) Expand all
306 } 326 }
307 delete(entries, i) 327 delete(entries, i)
308 328
309 if le.StreamIndex != uint64(i) { 329 if le.StreamIndex != uint64(i) {
310 return fmt.Sprintf("*%d", i) 330 return fmt.Sprintf("*%d", i)
311 } 331 }
312 return "" 332 return ""
313 } 333 }
314 334
315 var failed []string 335 var failed []string
316 » for _, exp := range expected[1:] { 336 » for _, exp := range expected {
317 switch e := exp.(type) { 337 switch e := exp.(type) {
318 case int: 338 case int:
319 if err := assertLogEntry(e); err != "" { 339 if err := assertLogEntry(e); err != "" {
320 failed = append(failed, fmt.Sprintf("missing{%s} ", err)) 340 failed = append(failed, fmt.Sprintf("missing{%s} ", err))
321 } 341 }
322 342
323 case indexRange: 343 case indexRange:
324 var errs []string 344 var errs []string
325 for i := e.start; i <= e.end; i++ { 345 for i := e.start; i <= e.end; i++ {
326 if err := assertLogEntry(i); err != "" { 346 if err := assertLogEntry(i); err != "" {
(...skipping 22 matching lines...) Expand all
349 extra[i] = fmt.Sprintf("%d", idx) 369 extra[i] = fmt.Sprintf("%d", idx)
350 } 370 }
351 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex tra, ","))) 371 failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(ex tra, ",")))
352 } 372 }
353 373
354 if len(failed) > 0 { 374 if len(failed) > 0 {
355 return strings.Join(failed, ", ") 375 return strings.Join(failed, ", ")
356 } 376 }
357 return "" 377 return ""
358 } 378 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/coordinator/coordinator.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698