OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2017 The LUCI Authors. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 package dsset | |
16 | |
17 import ( | |
18 "fmt" | |
19 "math/rand" | |
20 "sync" | |
21 "testing" | |
22 "time" | |
23 | |
24 "golang.org/x/net/context" | |
25 | |
26 "github.com/luci/gae/impl/memory" | |
27 "github.com/luci/gae/service/datastore" | |
28 "github.com/luci/luci-go/common/clock" | |
29 "github.com/luci/luci-go/common/clock/testclock" | |
30 "github.com/luci/luci-go/common/data/rand/mathrand" | |
31 "github.com/luci/luci-go/common/data/stringset" | |
32 | |
33 . "github.com/smartystreets/goconvey/convey" | |
34 ) | |
35 | |
36 func testingContext() context.Context { | |
37 c := memory.Use(context.Background()) | |
38 c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC())) | |
39 c = mathrand.Set(c, rand.New(rand.NewSource(1000))) | |
40 return c | |
41 } | |
42 | |
43 // pop pops a bunch of items from the set and returns items that were popped. | |
44 func pop(c context.Context, s *Set, ids []string, cleanup []Tombstone) (popped [ ]string, err error) { | |
45 op, err := s.BeginPop(c) | |
46 if err != nil { | |
47 return nil, err | |
48 } | |
49 op.CleanupTombstones(cleanup) | |
50 for _, id := range ids { | |
51 if op.Pop(id) { | |
52 popped = append(popped, id) | |
53 } | |
54 } | |
55 if err := op.Submit(); err != nil { | |
56 return nil, err | |
57 } | |
58 return popped, nil | |
59 } | |
60 | |
61 func TestSet(t *testing.T) { | |
62 t.Parallel() | |
63 | |
64 Convey("item one lifecycle", t, func() { | |
65 c := testingContext() | |
66 | |
67 set := Set{ | |
68 ID: "test", | |
69 ShardCount: 3, | |
70 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), | |
71 TombstonesDelay: time.Minute, | |
72 } | |
73 | |
74 // Add one item. | |
75 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) | |
76 | |
77 // The item is returned by the listing. | |
78 items, tombs, err := set.List(c) | |
79 So(err, ShouldBeNil) | |
80 So(items, ShouldResemble, []Item{{ID: "abc"}}) | |
81 So(tombs, ShouldBeNil) | |
82 | |
83 // Pop it! | |
84 err = datastore.RunInTransaction(c, func(c context.Context) erro r { | |
85 popped, err := pop(c, &set, []string{"abc"}, nil) | |
86 So(err, ShouldBeNil) | |
87 So(popped, ShouldResemble, []string{"abc"}) | |
88 return nil | |
89 }, nil) | |
90 So(err, ShouldBeNil) | |
91 | |
92 // The listing no longer returns it, but no tomb to cleanup yet (too fresh). | |
93 items, tombs, err = set.List(c) | |
94 So(err, ShouldBeNil) | |
95 So(items, ShouldBeNil) | |
96 So(tombs, ShouldBeNil) | |
97 | |
98 // Attempt to add it back (should be ignored). Add a bunch of ti mes to make | |
99 // sure to fill in many shards (this is pseudo-random). | |
100 for i := 0; i < 5; i++ { | |
101 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) | |
102 } | |
103 | |
104 // The listing still doesn't returns it. | |
105 items, tombs, err = set.List(c) | |
106 So(err, ShouldBeNil) | |
107 So(items, ShouldBeNil) | |
108 So(tombs, ShouldBeNil) | |
109 | |
110 // Popping it again doesn't work either. | |
111 err = datastore.RunInTransaction(c, func(c context.Context) erro r { | |
112 popped, err := pop(c, &set, []string{"abc"}, nil) | |
113 So(err, ShouldBeNil) | |
114 So(popped, ShouldBeNil) | |
115 return nil | |
116 }, nil) | |
117 So(err, ShouldBeNil) | |
118 | |
119 // Time passes, tombstone expires. | |
120 clock.Get(c).(testclock.TestClock).Add(2 * time.Minute) | |
121 | |
122 // Listing now returns expired tombstone. | |
123 items, tombs, err = set.List(c) | |
124 So(err, ShouldBeNil) | |
125 So(items, ShouldBeNil) | |
126 So(len(tombs), ShouldEqual, 1) | |
127 So(tombs[0].id, ShouldEqual, "abc") | |
128 So(len(tombs[0].storage), ShouldEqual, 3) // all shards | |
129 | |
130 // Cleanup storage keys. | |
131 So(set.CleanupStorage(c, tombs), ShouldBeNil) | |
132 | |
133 // Cleanup the tombstones themselves. | |
134 err = datastore.RunInTransaction(c, func(c context.Context) erro r { | |
135 popped, err := pop(c, &set, nil, tombs) | |
136 So(err, ShouldBeNil) | |
137 So(popped, ShouldBeNil) | |
138 return nil | |
139 }, nil) | |
140 So(err, ShouldBeNil) | |
141 | |
142 // No tombstones returned any longer. | |
143 items, tombs, err = set.List(c) | |
144 So(err, ShouldBeNil) | |
145 So(items, ShouldBeNil) | |
146 So(tombs, ShouldBeNil) | |
147 | |
148 // And the item can be added back now, since no trace of it is l eft. | |
149 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) | |
150 | |
151 // Yep, it is there. | |
152 items, tombs, err = set.List(c) | |
153 So(err, ShouldBeNil) | |
154 So(items, ShouldResemble, []Item{{ID: "abc"}}) | |
155 So(tombs, ShouldBeNil) | |
156 }) | |
157 | |
158 Convey("stress", t, func() { | |
159 // Add 1000 items in parallel from N goroutines, and (also in pa rallel), | |
160 // run N instances of "List and pop all", collecting the result in single | |
161 // list. There should be no duplicates in the final list! | |
162 c := testingContext() | |
163 | |
164 set := Set{ | |
165 ID: "test", | |
166 ShardCount: 3, | |
167 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), | |
168 TombstonesDelay: time.Minute, | |
tandrii(chromium)
2017/07/18 17:54:19
re: my comment about multiple consumers potentiall
Vadim Sh.
2017/07/24 00:30:18
It will all blow up. TombstonesDelay should be >>
| |
169 } | |
170 | |
171 producers := 3 | |
172 consumers := 5 | |
173 items := 100 | |
174 | |
175 wakeups := make(chan string) | |
176 | |
177 lock := sync.Mutex{} | |
178 consumed := []string{} | |
179 | |
180 for i := 0; i < producers; i++ { | |
181 go func() { | |
182 for j := 0; j < items; j++ { | |
183 set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}}) | |
184 // Wake up 3 consumers, so they "fight". | |
185 wakeups <- "wake" | |
186 wakeups <- "wake" | |
187 wakeups <- "wake" | |
188 } | |
189 for i := 0; i < consumers; i++ { | |
190 wakeups <- "done" | |
191 } | |
192 }() | |
193 } | |
194 | |
195 consume := func() { | |
196 items, _, _ := set.List(c) | |
197 if len(items) == 0 { | |
198 return | |
199 } | |
200 | |
201 keys := []string{} | |
202 for _, itm := range items { | |
203 keys = append(keys, itm.ID) | |
204 } | |
205 | |
206 // Try to pop all. | |
207 var popped []string | |
208 err := datastore.RunInTransaction(c, func(c context.Cont ext) error { | |
209 var err error | |
210 popped, err = pop(c, &set, keys, nil) | |
211 return err | |
212 }, nil) | |
213 | |
214 // Consider items consumed only if transaction has lande d. | |
215 if err == nil && len(popped) != 0 { | |
216 lock.Lock() | |
217 consumed = append(consumed, popped...) | |
218 lock.Unlock() | |
219 } | |
220 } | |
221 | |
222 wg := sync.WaitGroup{} | |
223 wg.Add(consumers) | |
224 for i := 0; i < consumers; i++ { | |
225 go func() { | |
226 defer wg.Done() | |
227 done := false | |
228 for !done { | |
229 done = (<-wakeups) == "done" | |
230 consume() | |
231 } | |
232 }() | |
233 } | |
234 | |
235 wg.Wait() // this waits for completion of the entire pipeline | |
236 | |
237 // Make sure 'consumed' is the initially produced set. | |
238 dedup := stringset.New(len(consumed)) | |
239 for _, itm := range consumed { | |
240 dedup.Add(itm) | |
241 } | |
242 So(dedup.Len(), ShouldEqual, len(consumed)) // no dups | |
243 So(len(consumed), ShouldEqual, items) // all are accounted for | |
244 }) | |
245 } | |
OLD | NEW |