Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(487)

Side by Side Diff: common/gcloud/gcps/ackbuffer/ackbuffer_test.go

Issue 1610993002: LogDog: Add collector service implementation. (Closed) Base URL: https://github.com/luci/luci-go@master
Patch Set: Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package ackbuffer 5 package ackbuffer
6 6
7 import ( 7 import (
8 "fmt"
9 "sort" 8 "sort"
10 "sync" 9 "sync"
11 "testing" 10 "testing"
12 "time" 11 "time"
13 12
14 "github.com/luci/luci-go/common/clock" 13 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/clock/testclock" 14 "github.com/luci/luci-go/common/clock/testclock"
16 "github.com/luci/luci-go/common/errors" 15 "github.com/luci/luci-go/common/errors"
17 "github.com/luci/luci-go/common/gcloud/gcps"
18 "github.com/luci/luci-go/common/stringset" 16 "github.com/luci/luci-go/common/stringset"
19 "golang.org/x/net/context" 17 "golang.org/x/net/context"
20 18
21 . "github.com/smartystreets/goconvey/convey" 19 . "github.com/smartystreets/goconvey/convey"
22 ) 20 )
23 21
24 type testPubSub struct { 22 type testACK struct {
25 sync.Mutex 23 sync.Mutex
26 24
27 » err error 25 » err error
28 » sub gcps.Subscription 26 » acks stringset.Set
29 » acks stringset.Set 27 » batchSize int
30 } 28 }
31 29
32 func (ps *testPubSub) Ack(s gcps.Subscription, acks ...string) error { 30 func (ps *testACK) Ack(c context.Context, acks ...string) error {
33 ps.Lock() 31 ps.Lock()
34 defer ps.Unlock() 32 defer ps.Unlock()
35 33
36 if ps.err != nil { 34 if ps.err != nil {
37 return ps.err 35 return ps.err
38 } 36 }
39 37
40 if s != ps.sub {
41 return fmt.Errorf("unknown subscription %q", s)
42 }
43
44 if ps.acks == nil { 38 if ps.acks == nil {
45 ps.acks = stringset.New(0) 39 ps.acks = stringset.New(0)
46 } 40 }
47 for _, ack := range acks { 41 for _, ack := range acks {
48 ps.acks.Add(ack) 42 ps.acks.Add(ack)
49 } 43 }
50 return nil 44 return nil
51 } 45 }
52 46
53 func (ps *testPubSub) ackIDs() []string { 47 func (ps *testACK) AckBatchSize() int {
48 » size := ps.batchSize
49 » if size <= 0 {
50 » » size = 4
51 » }
52 » return size
53 }
54
55 func (ps *testACK) ackIDs() []string {
54 ps.Lock() 56 ps.Lock()
55 defer ps.Unlock() 57 defer ps.Unlock()
56 58
57 v := make([]string, 0, ps.acks.Len()) 59 v := make([]string, 0, ps.acks.Len())
58 ps.acks.Iter(func(s string) bool { 60 ps.acks.Iter(func(s string) bool {
59 v = append(v, s) 61 v = append(v, s)
60 return true 62 return true
61 }) 63 })
62 sort.Strings(v) 64 sort.Strings(v)
63 return v 65 return v
64 } 66 }
65 67
66 func TestAckBuffer(t *testing.T) { 68 func TestAckBuffer(t *testing.T) {
67 t.Parallel() 69 t.Parallel()
68 70
69 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() { 71 Convey(`An AckBuffer configuration using a testing Pub/Sub`, t, func() {
70 c := context.Background() 72 c := context.Background()
71 c, tc := testclock.UseTime(c, testclock.TestTimeLocal) 73 c, tc := testclock.UseTime(c, testclock.TestTimeLocal)
72 » » ps := &testPubSub{ 74 » » ps := &testACK{}
73 » » » sub: gcps.Subscription("testsub"),
74 » » }
75 75
76 var discarded []string 76 var discarded []string
77 var discardedMu sync.Mutex 77 var discardedMu sync.Mutex
78 78
79 cfg := Config{ 79 cfg := Config{
80 » » » PubSub: ps, 80 » » » Ack: ps,
81 » » » Subscription: ps.sub,
82 DiscardCallback: func(acks []string) { 81 DiscardCallback: func(acks []string) {
83 discardedMu.Lock() 82 discardedMu.Lock()
84 defer discardedMu.Unlock() 83 defer discardedMu.Unlock()
85 84
86 discarded = append(discarded, acks...) 85 discarded = append(discarded, acks...)
87 }, 86 },
88 } 87 }
89 88
90 Convey(`Can instantiate an AckBuffer`, func() { 89 Convey(`Can instantiate an AckBuffer`, func() {
91 ab := New(c, cfg) 90 ab := New(c, cfg)
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
131 closeOnce() 130 closeOnce()
132 131
133 sort.Strings(acks) 132 sort.Strings(acks)
134 sort.Strings(discarded) 133 sort.Strings(discarded)
135 So(discarded, ShouldResemble, acks) 134 So(discarded, ShouldResemble, acks)
136 }) 135 })
137 }) 136 })
138 }) 137 })
139 }) 138 })
140 } 139 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698