| Index: scheduler/appengine/engine/dsset/dsset_test.go
|
| diff --git a/scheduler/appengine/engine/dsset/dsset_test.go b/scheduler/appengine/engine/dsset/dsset_test.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..95381a538e6f647ce2b5f966dfc9641d04127130
|
| --- /dev/null
|
| +++ b/scheduler/appengine/engine/dsset/dsset_test.go
|
| @@ -0,0 +1,279 @@
|
| +// Copyright 2017 The LUCI Authors.
|
| +//
|
| +// Licensed under the Apache License, Version 2.0 (the "License");
|
| +// you may not use this file except in compliance with the License.
|
| +// You may obtain a copy of the License at
|
| +//
|
| +// http://www.apache.org/licenses/LICENSE-2.0
|
| +//
|
| +// Unless required by applicable law or agreed to in writing, software
|
| +// distributed under the License is distributed on an "AS IS" BASIS,
|
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +// See the License for the specific language governing permissions and
|
| +// limitations under the License.
|
| +
|
| +package dsset
|
| +
|
| +import (
|
| + "fmt"
|
| + "math/rand"
|
| + "sync"
|
| + "testing"
|
| + "time"
|
| +
|
| + "golang.org/x/net/context"
|
| +
|
| + "github.com/luci/gae/impl/memory"
|
| + "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/clock/testclock"
|
| + "github.com/luci/luci-go/common/data/rand/mathrand"
|
| + "github.com/luci/luci-go/common/data/stringset"
|
| +
|
| + . "github.com/smartystreets/goconvey/convey"
|
| +)
|
| +
|
| +func testingContext() context.Context {
|
| + c := memory.Use(context.Background())
|
| + c = clock.Set(c, testclock.New(time.Unix(1442270520, 0).UTC()))
|
| + c = mathrand.Set(c, rand.New(rand.NewSource(1000)))
|
| + return c
|
| +}
|
| +
|
| +// pop pops a bunch of items from the set and returns items that were popped.
|
| +func pop(c context.Context, s *Set, listing *Listing, ids []string) (popped []string, tombs []*Tombstone, err error) {
|
| + op, err := s.BeginPop(c, listing)
|
| + if err != nil {
|
| + return nil, nil, err
|
| + }
|
| + for _, id := range ids {
|
| + if op.Pop(id) {
|
| + popped = append(popped, id)
|
| + }
|
| + }
|
| + if tombs, err = FinishPop(c, op); err != nil {
|
| + return nil, nil, err
|
| + }
|
| + return popped, tombs, nil
|
| +}
|
| +
|
| +func TestSet(t *testing.T) {
|
| + t.Parallel()
|
| +
|
| + Convey("item one lifecycle", t, func() {
|
| + c := testingContext()
|
| +
|
| + set := Set{
|
| + ID: "test",
|
| + ShardCount: 3,
|
| + TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
|
| + TombstonesDelay: time.Minute,
|
| + }
|
| +
|
| + // Add one item.
|
| + So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
|
| +
|
| + // The item is returned by the listing.
|
| + listing, err := set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
|
| + So(listing.Tombstones, ShouldBeNil)
|
| +
|
| + // Pop it!
|
| + var cleanup []*Tombstone
|
| + err = datastore.RunInTransaction(c, func(c context.Context) error {
|
| + popped, tombs, err := pop(c, &set, listing, []string{"abc"})
|
| + So(err, ShouldBeNil)
|
| + So(popped, ShouldResemble, []string{"abc"})
|
| + So(len(tombs), ShouldEqual, 1)
|
| + So(tombs[0].id, ShouldEqual, "abc")
|
| + So(len(tombs[0].storage), ShouldEqual, 1)
|
| + cleanup = tombs
|
| + return nil
|
| + }, nil)
|
| + So(err, ShouldBeNil)
|
| +
|
| + // The listing no longer returns it, but we have a fresh tombstone that can
|
| + // be cleaned up.
|
| + listing, err = set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldBeNil)
|
| + So(len(listing.Tombstones), ShouldEqual, 1)
|
| + So(listing.Tombstones[0].id, ShouldEqual, "abc")
|
| +
|
| + // Cleaning up the storage using tombstones from Pop works.
|
| + So(CleanupStorage(c, cleanup), ShouldBeNil)
|
| +
|
| + // The listing no longer returns the item, and there's no tombstones to
|
| + // cleanup.
|
| + listing, err = set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldBeNil)
|
| + So(listing.Tombstones, ShouldBeNil)
|
| +
|
| + // Attempt to add it back (should be ignored). Add a bunch of times to make
|
| + // sure to fill in many shards (this is pseudo-random).
|
| + for i := 0; i < 5; i++ {
|
| + So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
|
| + }
|
| +
|
| + // The listing still doesn't returns it, but we now have a tombstone to
|
| + // cleanup (again).
|
| + listing, err = set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldBeNil)
|
| + So(len(listing.Tombstones), ShouldEqual, 1)
|
| + So(listing.Tombstones[0].old, ShouldBeFalse)
|
| + So(len(listing.Tombstones[0].storage), ShouldEqual, 3) // all shards
|
| +
|
| + // Popping it again doesn't work either.
|
| + err = datastore.RunInTransaction(c, func(c context.Context) error {
|
| + popped, tombs, err := pop(c, &set, listing, []string{"abc"})
|
| + So(err, ShouldBeNil)
|
| + So(popped, ShouldBeNil)
|
| + So(tombs, ShouldBeNil)
|
| + return nil
|
| + }, nil)
|
| + So(err, ShouldBeNil)
|
| +
|
| + // Cleaning up the storage, again. This should make List stop returning
|
| + // the tombstone (since it has no storage items associated with it and it's
|
| + // not ready to be evicted yet).
|
| + So(CleanupStorage(c, listing.Tombstones), ShouldBeNil)
|
| + listing, err = set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldBeNil)
|
| + So(listing.Tombstones, ShouldBeNil)
|
| +
|
| + // Time passes, tombstone expires.
|
| + clock.Get(c).(testclock.TestClock).Add(2 * time.Minute)
|
| +
|
| + // Listing now returns expired tombstone.
|
| + listing, err = set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldBeNil)
|
| + So(len(listing.Tombstones), ShouldEqual, 1)
|
| + So(len(listing.Tombstones[0].storage), ShouldEqual, 0) // cleaned already
|
| +
|
| + // Cleanup storage keys.
|
| + So(CleanupStorage(c, listing.Tombstones), ShouldBeNil)
|
| +
|
| + // Cleanup the tombstones themselves.
|
| + err = datastore.RunInTransaction(c, func(c context.Context) error {
|
| + popped, tombs, err := pop(c, &set, listing, nil)
|
| + So(err, ShouldBeNil)
|
| + So(popped, ShouldBeNil)
|
| + So(tombs, ShouldBeNil)
|
| + return nil
|
| + }, nil)
|
| + So(err, ShouldBeNil)
|
| +
|
| + // No tombstones returned any longer.
|
| + listing, err = set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldBeNil)
|
| + So(listing.Tombstones, ShouldBeNil)
|
| +
|
| + // And the item can be added back now, since no trace of it is left.
|
| + So(set.Add(c, []Item{{ID: "abc"}}), ShouldBeNil)
|
| +
|
| + // Yep, it is there.
|
| + listing, err = set.List(c)
|
| + So(err, ShouldBeNil)
|
| + So(listing.Items, ShouldResemble, []Item{{ID: "abc"}})
|
| + So(listing.Tombstones, ShouldBeNil)
|
| + })
|
| +
|
| + Convey("stress", t, func() {
|
| + // Add 1000 items in parallel from N goroutines, and (also in parallel),
|
| + // run N instances of "List and pop all", collecting the result in single
|
| + // list. There should be no duplicates in the final list!
|
| + c := testingContext()
|
| +
|
| + set := Set{
|
| + ID: "test",
|
| + ShardCount: 3,
|
| + TombstonesRoot: datastore.NewKey(c, "Root", "root", 0, nil),
|
| + TombstonesDelay: time.Minute,
|
| + }
|
| +
|
| + producers := 3
|
| + consumers := 5
|
| + items := 100
|
| +
|
| + wakeups := make(chan string)
|
| +
|
| + lock := sync.Mutex{}
|
| + consumed := []string{}
|
| +
|
| + for i := 0; i < producers; i++ {
|
| + go func() {
|
| + for j := 0; j < items; j++ {
|
| + set.Add(c, []Item{{ID: fmt.Sprintf("%d", j)}})
|
| + // Wake up 3 consumers, so they "fight".
|
| + wakeups <- "wake"
|
| + wakeups <- "wake"
|
| + wakeups <- "wake"
|
| + }
|
| + for i := 0; i < consumers; i++ {
|
| + wakeups <- "done"
|
| + }
|
| + }()
|
| + }
|
| +
|
| + consume := func() {
|
| + listing, err := set.List(c)
|
| + if err != nil || len(listing.Items) == 0 {
|
| + return
|
| + }
|
| +
|
| + keys := []string{}
|
| + for _, itm := range listing.Items {
|
| + keys = append(keys, itm.ID)
|
| + }
|
| +
|
| + // Try to pop all.
|
| + var popped []string
|
| + var tombs []*Tombstone
|
| + err = datastore.RunInTransaction(c, func(c context.Context) error {
|
| + var err error
|
| + popped, tombs, err = pop(c, &set, listing, keys)
|
| + return err
|
| + }, nil)
|
| + // Best-effort storage cleanup on success.
|
| + if err == nil {
|
| + CleanupStorage(c, tombs)
|
| + }
|
| +
|
| + // Consider items consumed only if transaction has landed.
|
| + if err == nil && len(popped) != 0 {
|
| + lock.Lock()
|
| + consumed = append(consumed, popped...)
|
| + lock.Unlock()
|
| + }
|
| + }
|
| +
|
| + wg := sync.WaitGroup{}
|
| + wg.Add(consumers)
|
| + for i := 0; i < consumers; i++ {
|
| + go func() {
|
| + defer wg.Done()
|
| + done := false
|
| + for !done {
|
| + done = (<-wakeups) == "done"
|
| + consume()
|
| + }
|
| + }()
|
| + }
|
| +
|
| + wg.Wait() // this waits for completion of the entire pipeline
|
| +
|
| + // Make sure 'consumed' is the initially produced set.
|
| + dedup := stringset.New(len(consumed))
|
| + for _, itm := range consumed {
|
| + dedup.Add(itm)
|
| + }
|
| + So(dedup.Len(), ShouldEqual, len(consumed)) // no dups
|
| + So(len(consumed), ShouldEqual, items) // all are accounted for
|
| + })
|
| +}
|
|
|