Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3250)

Unified Diff: scheduler/appengine/engine/dsset/dsset_test.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: scheduler/appengine/engine/dsset/dsset_test.go
diff --git a/scheduler/appengine/engine/dsset/dsset_test.go b/scheduler/appengine/engine/dsset/dsset_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..14ccd12b35efa1c2e2ea8478e2ec54b7c405f238
--- /dev/null
+++ b/scheduler/appengine/engine/dsset/dsset_test.go
@@ -0,0 +1,245 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dsset
+
+import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
+ "golang.org/x/net/context"
+
+ "github.com/luci/gae/impl/memory"
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/data/rand/mathrand"
+ "github.com/luci/luci-go/common/data/stringset"
+
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+func testingContext() context.Context {
+ c := memory.Use(context.Background())
+ c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC()))
+ c = mathrand.Set(c, rand.New(rand.NewSource(1000)))
+ return c
+}
+
+// pop pops a bunch of items from the set and returns items that were popped.
+func pop(c context.Context, s *Set, ids []string, cleanup []Tombstone) (popped []string, err error) {
+ op, err := s.BeginPop(c)
+ if err != nil {
+ return nil, err
+ }
+ op.CleanupTombstones(cleanup)
+ for _, id := range ids {
+ if op.Pop(id) {
+ popped = append(popped, id)
+ }
+ }
+ if err := op.Submit(); err != nil {
+ return nil, err
+ }
+ return popped, nil
+}
+
+func TestSet(t *testing.T) {
+ t.Parallel()
+
+ Convey("item one lifecycle", t, func() {
+ c := testingContext()
+
+ set := Set{
+ ID: "test",
+ ShardCount: 3,
+ TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
+ TombstonesDelay: time.Minute,
+ }
+
+ // Add one item.
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
+
+ // The item is returned by the listing.
+ items, tombs, err := set.List(c)
+ So(err, ShouldBeNil)
+ So(items, ShouldResemble, []Item{{ID: "abc"}})
+ So(tombs, ShouldBeNil)
+
+ // Pop it!
+ err = datastore.RunInTransaction(c, func(c context.Context) error {
+ popped, err := pop(c, &set, []string{"abc"}, nil)
+ So(err, ShouldBeNil)
+ So(popped, ShouldResemble, []string{"abc"})
+ return nil
+ }, nil)
+ So(err, ShouldBeNil)
+
+ // The listing no longer returns it, but no tomb to cleanup yet (too fresh).
+ items, tombs, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(items, ShouldBeNil)
+ So(tombs, ShouldBeNil)
+
+ // Attempt to add it back (should be ignored). Add a bunch of times to make
+ // sure to fill in many shards (this is pseudo-random).
+ for i := 0; i < 5; i++ {
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
+ }
+
+ // The listing still doesn't returns it.
+ items, tombs, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(items, ShouldBeNil)
+ So(tombs, ShouldBeNil)
+
+ // Popping it again doesn't work either.
+ err = datastore.RunInTransaction(c, func(c context.Context) error {
+ popped, err := pop(c, &set, []string{"abc"}, nil)
+ So(err, ShouldBeNil)
+ So(popped, ShouldBeNil)
+ return nil
+ }, nil)
+ So(err, ShouldBeNil)
+
+ // Time passes, tombstone expires.
+ clock.Get(c).(testclock.TestClock).Add(2 * time.Minute)
+
+ // Listing now returns expired tombstone.
+ items, tombs, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(items, ShouldBeNil)
+ So(len(tombs), ShouldEqual, 1)
+ So(tombs[0].id, ShouldEqual, "abc")
+ So(len(tombs[0].storage), ShouldEqual, 3) // all shards
+
+ // Cleanup storage keys.
+ So(set.CleanupStorage(c, tombs), ShouldBeNil)
+
+ // Cleanup the tombstones themselves.
+ err = datastore.RunInTransaction(c, func(c context.Context) error {
+ popped, err := pop(c, &set, nil, tombs)
+ So(err, ShouldBeNil)
+ So(popped, ShouldBeNil)
+ return nil
+ }, nil)
+ So(err, ShouldBeNil)
+
+ // No tombstones returned any longer.
+ items, tombs, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(items, ShouldBeNil)
+ So(tombs, ShouldBeNil)
+
+ // And the item can be added back now, since no trace of it is left.
+ So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
+
+ // Yep, it is there.
+ items, tombs, err = set.List(c)
+ So(err, ShouldBeNil)
+ So(items, ShouldResemble, []Item{{ID: "abc"}})
+ So(tombs, ShouldBeNil)
+ })
+
+ Convey("stress", t, func() {
+ // Add 1000 items in parallel from N goroutines, and (also in parallel),
+ // run N instances of "List and pop all", collecting the result in single
+ // list. There should be no duplicates in the final list!
+ c := testingContext()
+
+ set := Set{
+ ID: "test",
+ ShardCount: 3,
+ TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
+ TombstonesDelay: time.Minute,
tandrii(chromium) 2017/07/18 17:54:19 re: my comment about multiple consumers potentiall
Vadim Sh. 2017/07/24 00:30:18 It will all blow up. TombstonesDelay should be >>
+ }
+
+ producers := 3
+ consumers := 5
+ items := 100
+
+ wakeups := make(chan string)
+
+ lock := sync.Mutex{}
+ consumed := []string{}
+
+ for i := 0; i < producers; i++ {
+ go func() {
+ for j := 0; j < items; j++ {
+ set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}})
+ // Wake up 3 consumers, so they "fight".
+ wakeups <- "wake"
+ wakeups <- "wake"
+ wakeups <- "wake"
+ }
+ for i := 0; i < consumers; i++ {
+ wakeups <- "done"
+ }
+ }()
+ }
+
+ consume := func() {
+ items, _, _ := set.List(c)
+ if len(items) == 0 {
+ return
+ }
+
+ keys := []string{}
+ for _, itm := range items {
+ keys = append(keys, itm.ID)
+ }
+
+ // Try to pop all.
+ var popped []string
+ err := datastore.RunInTransaction(c, func(c context.Context) error {
+ var err error
+ popped, err = pop(c, &set, keys, nil)
+ return err
+ }, nil)
+
+ // Consider items consumed only if transaction has landed.
+ if err == nil && len(popped) != 0 {
+ lock.Lock()
+ consumed = append(consumed, popped...)
+ lock.Unlock()
+ }
+ }
+
+ wg := sync.WaitGroup{}
+ wg.Add(consumers)
+ for i := 0; i < consumers; i++ {
+ go func() {
+ defer wg.Done()
+ done := false
+ for !done {
+ done = (<-wakeups) == "done"
+ consume()
+ }
+ }()
+ }
+
+ wg.Wait() // this waits for completion of the entire pipeline
+
+ // Make sure 'consumed' is the initially produced set.
+ dedup := stringset.New(len(consumed))
+ for _, itm := range consumed {
+ dedup.Add(itm)
+ }
+ So(dedup.Len(), ShouldEqual, len(consumed)) // no dups
+ So(len(consumed), ShouldEqual, items) // all are accounted for
+ })
+}

Powered by Google App Engine
This is Rietveld 408576698