Chromium Code Reviews| Index: scheduler/appengine/engine/dsset/dsset_test.go |
| diff --git a/scheduler/appengine/engine/dsset/dsset_test.go b/scheduler/appengine/engine/dsset/dsset_test.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..14ccd12b35efa1c2e2ea8478e2ec54b7c405f238 |
| --- /dev/null |
| +++ b/scheduler/appengine/engine/dsset/dsset_test.go |
| @@ -0,0 +1,245 @@ |
| +// Copyright 2017 The LUCI Authors. |
| +// |
| +// Licensed under the Apache License, Version 2.0 (the "License"); |
| +// you may not use this file except in compliance with the License. |
| +// You may obtain a copy of the License at |
| +// |
| +// http://www.apache.org/licenses/LICENSE-2.0 |
| +// |
| +// Unless required by applicable law or agreed to in writing, software |
| +// distributed under the License is distributed on an "AS IS" BASIS, |
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| +// See the License for the specific language governing permissions and |
| +// limitations under the License. |
| + |
| +package dsset |
| + |
| +import ( |
| + "fmt" |
| + "math/rand" |
| + "sync" |
| + "testing" |
| + "time" |
| + |
| + "golang.org/x/net/context" |
| + |
| + "github.com/luci/gae/impl/memory" |
| + "github.com/luci/gae/service/datastore" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/clock/testclock" |
| + "github.com/luci/luci-go/common/data/rand/mathrand" |
| + "github.com/luci/luci-go/common/data/stringset" |
| + |
| + . "github.com/smartystreets/goconvey/convey" |
| +) |
| + |
| +func testingContext() context.Context { |
| + c := memory.Use(context.Background()) |
| + c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC())) |
| + c = mathrand.Set(c, rand.New(rand.NewSource(1000))) |
| + return c |
| +} |
| + |
| +// pop pops a bunch of items from the set and returns items that were popped. |
| +func pop(c context.Context, s *Set, ids []string, cleanup []Tombstone) (popped []string, err error) { |
| + op, err := s.BeginPop(c) |
| + if err != nil { |
| + return nil, err |
| + } |
| + op.CleanupTombstones(cleanup) |
| + for _, id := range ids { |
| + if op.Pop(id) { |
| + popped = append(popped, id) |
| + } |
| + } |
| + if err := op.Submit(); err != nil { |
| + return nil, err |
| + } |
| + return popped, nil |
| +} |
| + |
| +func TestSet(t *testing.T) { |
| + t.Parallel() |
| + |
| + Convey("item one lifecycle", t, func() { |
| + c := testingContext() |
| + |
| + set := Set{ |
| + ID: "test", |
| + ShardCount: 3, |
| + TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), |
| + TombstonesDelay: time.Minute, |
| + } |
| + |
| + // Add one item. |
| + So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
| + |
| + // The item is returned by the listing. |
| + items, tombs, err := set.List(c) |
| + So(err, ShouldBeNil) |
| + So(items, ShouldResemble, []Item{{ID: "abc"}}) |
| + So(tombs, ShouldBeNil) |
| + |
| + // Pop it! |
| + err = datastore.RunInTransaction(c, func(c context.Context) error { |
| + popped, err := pop(c, &set, []string{"abc"}, nil) |
| + So(err, ShouldBeNil) |
| + So(popped, ShouldResemble, []string{"abc"}) |
| + return nil |
| + }, nil) |
| + So(err, ShouldBeNil) |
| + |
| + // The listing no longer returns it, but no tomb to cleanup yet (too fresh). |
| + items, tombs, err = set.List(c) |
| + So(err, ShouldBeNil) |
| + So(items, ShouldBeNil) |
| + So(tombs, ShouldBeNil) |
| + |
| + // Attempt to add it back (should be ignored). Add a bunch of times to make |
| + // sure to fill in many shards (this is pseudo-random). |
| + for i := 0; i < 5; i++ { |
| + So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
| + } |
| + |
| + // The listing still doesn't returns it. |
| + items, tombs, err = set.List(c) |
| + So(err, ShouldBeNil) |
| + So(items, ShouldBeNil) |
| + So(tombs, ShouldBeNil) |
| + |
| + // Popping it again doesn't work either. |
| + err = datastore.RunInTransaction(c, func(c context.Context) error { |
| + popped, err := pop(c, &set, []string{"abc"}, nil) |
| + So(err, ShouldBeNil) |
| + So(popped, ShouldBeNil) |
| + return nil |
| + }, nil) |
| + So(err, ShouldBeNil) |
| + |
| + // Time passes, tombstone expires. |
| + clock.Get(c).(testclock.TestClock).Add(2 * time.Minute) |
| + |
| + // Listing now returns expired tombstone. |
| + items, tombs, err = set.List(c) |
| + So(err, ShouldBeNil) |
| + So(items, ShouldBeNil) |
| + So(len(tombs), ShouldEqual, 1) |
| + So(tombs[0].id, ShouldEqual, "abc") |
| + So(len(tombs[0].storage), ShouldEqual, 3) // all shards |
| + |
| + // Cleanup storage keys. |
| + So(set.CleanupStorage(c, tombs), ShouldBeNil) |
| + |
| + // Cleanup the tombstones themselves. |
| + err = datastore.RunInTransaction(c, func(c context.Context) error { |
| + popped, err := pop(c, &set, nil, tombs) |
| + So(err, ShouldBeNil) |
| + So(popped, ShouldBeNil) |
| + return nil |
| + }, nil) |
| + So(err, ShouldBeNil) |
| + |
| + // No tombstones returned any longer. |
| + items, tombs, err = set.List(c) |
| + So(err, ShouldBeNil) |
| + So(items, ShouldBeNil) |
| + So(tombs, ShouldBeNil) |
| + |
| + // And the item can be added back now, since no trace of it is left. |
| + So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil) |
| + |
| + // Yep, it is there. |
| + items, tombs, err = set.List(c) |
| + So(err, ShouldBeNil) |
| + So(items, ShouldResemble, []Item{{ID: "abc"}}) |
| + So(tombs, ShouldBeNil) |
| + }) |
| + |
| + Convey("stress", t, func() { |
| + // Add 1000 items in parallel from N goroutines, and (also in parallel), |
| + // run N instances of "List and pop all", collecting the result in single |
| + // list. There should be no duplicates in the final list! |
| + c := testingContext() |
| + |
| + set := Set{ |
| + ID: "test", |
| + ShardCount: 3, |
| + TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil), |
| + TombstonesDelay: time.Minute, |
|
tandrii(chromium)
2017/07/18 17:54:19
re: my comment about multiple consumers potentiall
Vadim Sh.
2017/07/24 00:30:18
It will all blow up. TombstonesDelay should be >>
|
| + } |
| + |
| + producers := 3 |
| + consumers := 5 |
| + items := 100 |
| + |
| + wakeups := make(chan string) |
| + |
| + lock := sync.Mutex{} |
| + consumed := []string{} |
| + |
| + for i := 0; i < producers; i++ { |
| + go func() { |
| + for j := 0; j < items; j++ { |
| + set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}}) |
| + // Wake up 3 consumers, so they "fight". |
| + wakeups <- "wake" |
| + wakeups <- "wake" |
| + wakeups <- "wake" |
| + } |
| + for i := 0; i < consumers; i++ { |
| + wakeups <- "done" |
| + } |
| + }() |
| + } |
| + |
| + consume := func() { |
| + items, _, _ := set.List(c) |
| + if len(items) == 0 { |
| + return |
| + } |
| + |
| + keys := []string{} |
| + for _, itm := range items { |
| + keys = append(keys, itm.ID) |
| + } |
| + |
| + // Try to pop all. |
| + var popped []string |
| + err := datastore.RunInTransaction(c, func(c context.Context) error { |
| + var err error |
| + popped, err = pop(c, &set, keys, nil) |
| + return err |
| + }, nil) |
| + |
| + // Consider items consumed only if transaction has landed. |
| + if err == nil && len(popped) != 0 { |
| + lock.Lock() |
| + consumed = append(consumed, popped...) |
| + lock.Unlock() |
| + } |
| + } |
| + |
| + wg := sync.WaitGroup{} |
| + wg.Add(consumers) |
| + for i := 0; i < consumers; i++ { |
| + go func() { |
| + defer wg.Done() |
| + done := false |
| + for !done { |
| + done = (<-wakeups) == "done" |
| + consume() |
| + } |
| + }() |
| + } |
| + |
| + wg.Wait() // this waits for completion of the entire pipeline |
| + |
| + // Make sure 'consumed' is the initially produced set. |
| + dedup := stringset.New(len(consumed)) |
| + for _, itm := range consumed { |
| + dedup.Add(itm) |
| + } |
| + So(dedup.Len(), ShouldEqual, len(consumed)) // no dups |
| + So(len(consumed), ShouldEqual, items) // all are accounted for |
| + }) |
| +} |