Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(168)

Side by Side Diff: scheduler/appengine/engine/dsset/dsset_test.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package dsset
16
17 import (
18 "fmt"
19 "math/rand"
20 "sync"
21 "testing"
22 "time"
23
24 "golang.org/x/net/context"
25
26 "github.com/luci/gae/impl/memory"
27 "github.com/luci/gae/service/datastore"
28 "github.com/luci/luci-go/common/clock"
29 "github.com/luci/luci-go/common/clock/testclock"
30 "github.com/luci/luci-go/common/data/rand/mathrand"
31 "github.com/luci/luci-go/common/data/stringset"
32
33 . "github.com/smartystreets/goconvey/convey"
34 )
35
36 func testingContext() context.Context {
37 c := memory.Use(context.Background())
38 c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC()))
39 c = mathrand.Set(c, rand.New(rand.NewSource(1000)))
40 return c
41 }
42
43 // pop pops a bunch of items from the set and returns items that were popped.
44 func pop(c context.Context, s *Set, ids []string, cleanup []Tombstone) (popped [ ]string, err error) {
45 op, err := s.BeginPop(c)
46 if err != nil {
47 return nil, err
48 }
49 op.CleanupTombstones(cleanup)
50 for _, id := range ids {
51 if op.Pop(id) {
52 popped = append(popped, id)
53 }
54 }
55 if err := op.Submit(); err != nil {
56 return nil, err
57 }
58 return popped, nil
59 }
60
61 func TestSet(t *testing.T) {
62 t.Parallel()
63
64 Convey("item one lifecycle", t, func() {
65 c := testingContext()
66
67 set := Set{
68 ID: "test",
69 ShardCount: 3,
70 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
71 TombstonesDelay: time.Minute,
72 }
73
74 // Add one item.
75 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
76
77 // The item is returned by the listing.
78 items, tombs, err := set.List(c)
79 So(err, ShouldBeNil)
80 So(items, ShouldResemble, []Item{{ID: "abc"}})
81 So(tombs, ShouldBeNil)
82
83 // Pop it!
84 err = datastore.RunInTransaction(c, func(c context.Context) erro r {
85 popped, err := pop(c, &set, []string{"abc"}, nil)
86 So(err, ShouldBeNil)
87 So(popped, ShouldResemble, []string{"abc"})
88 return nil
89 }, nil)
90 So(err, ShouldBeNil)
91
92 // The listing no longer returns it, but no tomb to cleanup yet (too fresh).
93 items, tombs, err = set.List(c)
94 So(err, ShouldBeNil)
95 So(items, ShouldBeNil)
96 So(tombs, ShouldBeNil)
97
98 // Attempt to add it back (should be ignored). Add a bunch of ti mes to make
99 // sure to fill in many shards (this is pseudo-random).
100 for i := 0; i < 5; i++ {
101 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
102 }
103
104 // The listing still doesn't returns it.
105 items, tombs, err = set.List(c)
106 So(err, ShouldBeNil)
107 So(items, ShouldBeNil)
108 So(tombs, ShouldBeNil)
109
110 // Popping it again doesn't work either.
111 err = datastore.RunInTransaction(c, func(c context.Context) erro r {
112 popped, err := pop(c, &set, []string{"abc"}, nil)
113 So(err, ShouldBeNil)
114 So(popped, ShouldBeNil)
115 return nil
116 }, nil)
117 So(err, ShouldBeNil)
118
119 // Time passes, tombstone expires.
120 clock.Get(c).(testclock.TestClock).Add(2 * time.Minute)
121
122 // Listing now returns expired tombstone.
123 items, tombs, err = set.List(c)
124 So(err, ShouldBeNil)
125 So(items, ShouldBeNil)
126 So(len(tombs), ShouldEqual, 1)
127 So(tombs[0].id, ShouldEqual, "abc")
128 So(len(tombs[0].storage), ShouldEqual, 3) // all shards
129
130 // Cleanup storage keys.
131 So(set.CleanupStorage(c, tombs), ShouldBeNil)
132
133 // Cleanup the tombstones themselves.
134 err = datastore.RunInTransaction(c, func(c context.Context) erro r {
135 popped, err := pop(c, &set, nil, tombs)
136 So(err, ShouldBeNil)
137 So(popped, ShouldBeNil)
138 return nil
139 }, nil)
140 So(err, ShouldBeNil)
141
142 // No tombstones returned any longer.
143 items, tombs, err = set.List(c)
144 So(err, ShouldBeNil)
145 So(items, ShouldBeNil)
146 So(tombs, ShouldBeNil)
147
148 // And the item can be added back now, since no trace of it is l eft.
149 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
150
151 // Yep, it is there.
152 items, tombs, err = set.List(c)
153 So(err, ShouldBeNil)
154 So(items, ShouldResemble, []Item{{ID: "abc"}})
155 So(tombs, ShouldBeNil)
156 })
157
158 Convey("stress", t, func() {
159 // Add 1000 items in parallel from N goroutines, and (also in pa rallel),
160 // run N instances of "List and pop all", collecting the result in single
161 // list. There should be no duplicates in the final list!
162 c := testingContext()
163
164 set := Set{
165 ID: "test",
166 ShardCount: 3,
167 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
168 TombstonesDelay: time.Minute,
tandrii(chromium) 2017/07/18 17:54:19 re: my comment about multiple consumers potentiall
Vadim Sh. 2017/07/24 00:30:18 It will all blow up. TombstonesDelay should be >>
169 }
170
171 producers := 3
172 consumers := 5
173 items := 100
174
175 wakeups := make(chan string)
176
177 lock := sync.Mutex{}
178 consumed := []string{}
179
180 for i := 0; i < producers; i++ {
181 go func() {
182 for j := 0; j < items; j++ {
183 set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}})
184 // Wake up 3 consumers, so they "fight".
185 wakeups <- "wake"
186 wakeups <- "wake"
187 wakeups <- "wake"
188 }
189 for i := 0; i < consumers; i++ {
190 wakeups <- "done"
191 }
192 }()
193 }
194
195 consume := func() {
196 items, _, _ := set.List(c)
197 if len(items) == 0 {
198 return
199 }
200
201 keys := []string{}
202 for _, itm := range items {
203 keys = append(keys, itm.ID)
204 }
205
206 // Try to pop all.
207 var popped []string
208 err := datastore.RunInTransaction(c, func(c context.Cont ext) error {
209 var err error
210 popped, err = pop(c, &set, keys, nil)
211 return err
212 }, nil)
213
214 // Consider items consumed only if transaction has lande d.
215 if err == nil && len(popped) != 0 {
216 lock.Lock()
217 consumed = append(consumed, popped...)
218 lock.Unlock()
219 }
220 }
221
222 wg := sync.WaitGroup{}
223 wg.Add(consumers)
224 for i := 0; i < consumers; i++ {
225 go func() {
226 defer wg.Done()
227 done := false
228 for !done {
229 done = (<-wakeups) == "done"
230 consume()
231 }
232 }()
233 }
234
235 wg.Wait() // this waits for completion of the entire pipeline
236
237 // Make sure 'consumed' is the initially produced set.
238 dedup := stringset.New(len(consumed))
239 for _, itm := range consumed {
240 dedup.Add(itm)
241 }
242 So(dedup.Len(), ShouldEqual, len(consumed)) // no dups
243 So(len(consumed), ShouldEqual, items) // all are accounted for
244 })
245 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698