Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 package dsset | |
| 16 | |
| 17 import ( | |
| 18 "fmt" | |
| 19 "math/rand" | |
| 20 "sync" | |
| 21 "testing" | |
| 22 "time" | |
| 23 | |
| 24 "golang.org/x/net/context" | |
| 25 | |
| 26 "github.com/luci/gae/impl/memory" | |
| 27 "github.com/luci/gae/service/datastore" | |
| 28 "github.com/luci/luci-go/common/clock" | |
| 29 "github.com/luci/luci-go/common/clock/testclock" | |
| 30 "github.com/luci/luci-go/common/data/rand/mathrand" | |
| 31 "github.com/luci/luci-go/common/data/stringset" | |
| 32 | |
| 33 . "github.com/smartystreets/goconvey/convey" | |
| 34 ) | |
| 35 | |
| 36 func testingContext() context.Context { | |
| 37 c := memory.Use(context.Background()) | |
| 38 c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC())) | |
| 39 c = mathrand.Set(c, rand.New(rand.NewSource(1000))) | |
| 40 return c | |
| 41 } | |
| 42 | |
| 43 // pop pops a bunch of items from the set and returns items that were popped. | |
| 44 func pop(c context.Context, s *Set, ids []string, cleanup []Tombstone) (popped [ ]string, err error) { | |
| 45 op, err := s.BeginPop(c) | |
| 46 if err != nil { | |
| 47 return nil, err | |
| 48 } | |
| 49 op.CleanupTombstones(cleanup) | |
| 50 for _, id := range ids { | |
| 51 if op.Pop(id) { | |
| 52 popped = append(popped, id) | |
| 53 } | |
| 54 } | |
| 55 if err := op.Submit(); err != nil { | |
| 56 return nil, err | |
| 57 } | |
| 58 return popped, nil | |
| 59 } | |
| 60 | |
| 61 func TestSet(t *testing.T) { | |
| 62 t.Parallel() | |
| 63 | |
| 64 Convey("item one lifecycle", t, func() { | |
| 65 c := testingContext() | |
| 66 | |
| 67 set := Set{ | |
| 68 ID: "test", | |
| 69 ShardCount: 3, | |
| 70 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), | |
| 71 TombstonesDelay: time.Minute, | |
| 72 } | |
| 73 | |
| 74 // Add one item. | |
| 75 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) | |
| 76 | |
| 77 // The item is returned by the listing. | |
| 78 items, tombs, err := set.List(c) | |
| 79 So(err, ShouldBeNil) | |
| 80 So(items, ShouldResemble, []Item{{ID: "abc"}}) | |
| 81 So(tombs, ShouldBeNil) | |
| 82 | |
| 83 // Pop it! | |
| 84 err = datastore.RunInTransaction(c, func(c context.Context) erro r { | |
| 85 popped, err := pop(c, &set, []string{"abc"}, nil) | |
| 86 So(err, ShouldBeNil) | |
| 87 So(popped, ShouldResemble, []string{"abc"}) | |
| 88 return nil | |
| 89 }, nil) | |
| 90 So(err, ShouldBeNil) | |
| 91 | |
| 92 // The listing no longer returns it, but no tomb to cleanup yet (too fresh). | |
| 93 items, tombs, err = set.List(c) | |
| 94 So(err, ShouldBeNil) | |
| 95 So(items, ShouldBeNil) | |
| 96 So(tombs, ShouldBeNil) | |
| 97 | |
| 98 // Attempt to add it back (should be ignored). Add a bunch of ti mes to make | |
| 99 // sure to fill in many shards (this is pseudo-random). | |
| 100 for i := 0; i < 5; i++ { | |
| 101 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) | |
| 102 } | |
| 103 | |
| 104 // The listing still doesn't returns it. | |
| 105 items, tombs, err = set.List(c) | |
| 106 So(err, ShouldBeNil) | |
| 107 So(items, ShouldBeNil) | |
| 108 So(tombs, ShouldBeNil) | |
| 109 | |
| 110 // Popping it again doesn't work either. | |
| 111 err = datastore.RunInTransaction(c, func(c context.Context) erro r { | |
| 112 popped, err := pop(c, &set, []string{"abc"}, nil) | |
| 113 So(err, ShouldBeNil) | |
| 114 So(popped, ShouldBeNil) | |
| 115 return nil | |
| 116 }, nil) | |
| 117 So(err, ShouldBeNil) | |
| 118 | |
| 119 // Time passes, tombstone expires. | |
| 120 clock.Get(c).(testclock.TestClock).Add(2 * time.Minute) | |
| 121 | |
| 122 // Listing now returns expired tombstone. | |
| 123 items, tombs, err = set.List(c) | |
| 124 So(err, ShouldBeNil) | |
| 125 So(items, ShouldBeNil) | |
| 126 So(len(tombs), ShouldEqual, 1) | |
| 127 So(tombs[0].id, ShouldEqual, "abc") | |
| 128 So(len(tombs[0].storage), ShouldEqual, 3) // all shards | |
| 129 | |
| 130 // Cleanup storage keys. | |
| 131 So(set.CleanupStorage(c, tombs), ShouldBeNil) | |
| 132 | |
| 133 // Cleanup the tombstones themselves. | |
| 134 err = datastore.RunInTransaction(c, func(c context.Context) erro r { | |
| 135 popped, err := pop(c, &set, nil, tombs) | |
| 136 So(err, ShouldBeNil) | |
| 137 So(popped, ShouldBeNil) | |
| 138 return nil | |
| 139 }, nil) | |
| 140 So(err, ShouldBeNil) | |
| 141 | |
| 142 // No tombstones returned any longer. | |
| 143 items, tombs, err = set.List(c) | |
| 144 So(err, ShouldBeNil) | |
| 145 So(items, ShouldBeNil) | |
| 146 So(tombs, ShouldBeNil) | |
| 147 | |
| 148 // And the item can be added back now, since no trace of it is l eft. | |
| 149 So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) | |
| 150 | |
| 151 // Yep, it is there. | |
| 152 items, tombs, err = set.List(c) | |
| 153 So(err, ShouldBeNil) | |
| 154 So(items, ShouldResemble, []Item{{ID: "abc"}}) | |
| 155 So(tombs, ShouldBeNil) | |
| 156 }) | |
| 157 | |
| 158 Convey("stress", t, func() { | |
| 159 // Add 1000 items in parallel from N goroutines, and (also in pa rallel), | |
| 160 // run N instances of "List and pop all", collecting the result in single | |
| 161 // list. There should be no duplicates in the final list! | |
| 162 c := testingContext() | |
| 163 | |
| 164 set := Set{ | |
| 165 ID: "test", | |
| 166 ShardCount: 3, | |
| 167 TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), | |
| 168 TombstonesDelay: time.Minute, | |
|
tandrii(chromium)
2017/07/18 17:54:19
re: my comment about multiple consumers potentiall
Vadim Sh.
2017/07/24 00:30:18
It will all blow up. TombstonesDelay should be >>
| |
| 169 } | |
| 170 | |
| 171 producers := 3 | |
| 172 consumers := 5 | |
| 173 items := 100 | |
| 174 | |
| 175 wakeups := make(chan string) | |
| 176 | |
| 177 lock := sync.Mutex{} | |
| 178 consumed := []string{} | |
| 179 | |
| 180 for i := 0; i < producers; i++ { | |
| 181 go func() { | |
| 182 for j := 0; j < items; j++ { | |
| 183 set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}}) | |
| 184 // Wake up 3 consumers, so they "fight". | |
| 185 wakeups <- "wake" | |
| 186 wakeups <- "wake" | |
| 187 wakeups <- "wake" | |
| 188 } | |
| 189 for i := 0; i < consumers; i++ { | |
| 190 wakeups <- "done" | |
| 191 } | |
| 192 }() | |
| 193 } | |
| 194 | |
| 195 consume := func() { | |
| 196 items, _, _ := set.List(c) | |
| 197 if len(items) == 0 { | |
| 198 return | |
| 199 } | |
| 200 | |
| 201 keys := []string{} | |
| 202 for _, itm := range items { | |
| 203 keys = append(keys, itm.ID) | |
| 204 } | |
| 205 | |
| 206 // Try to pop all. | |
| 207 var popped []string | |
| 208 err := datastore.RunInTransaction(c, func(c context.Cont ext) error { | |
| 209 var err error | |
| 210 popped, err = pop(c, &set, keys, nil) | |
| 211 return err | |
| 212 }, nil) | |
| 213 | |
| 214 // Consider items consumed only if transaction has lande d. | |
| 215 if err == nil && len(popped) != 0 { | |
| 216 lock.Lock() | |
| 217 consumed = append(consumed, popped...) | |
| 218 lock.Unlock() | |
| 219 } | |
| 220 } | |
| 221 | |
| 222 wg := sync.WaitGroup{} | |
| 223 wg.Add(consumers) | |
| 224 for i := 0; i < consumers; i++ { | |
| 225 go func() { | |
| 226 defer wg.Done() | |
| 227 done := false | |
| 228 for !done { | |
| 229 done = (<-wakeups) == "done" | |
| 230 consume() | |
| 231 } | |
| 232 }() | |
| 233 } | |
| 234 | |
| 235 wg.Wait() // this waits for completion of the entire pipeline | |
| 236 | |
| 237 // Make sure 'consumed' is the initially produced set. | |
| 238 dedup := stringset.New(len(consumed)) | |
| 239 for _, itm := range consumed { | |
| 240 dedup.Add(itm) | |
| 241 } | |
| 242 So(dedup.Len(), ShouldEqual, len(consumed)) // no dups | |
| 243 So(len(consumed), ShouldEqual, items) // all are accounted for | |
| 244 }) | |
| 245 } | |
| OLD | NEW |