Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1031)

Unified Diff: appengine/tumble/example_test.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/tumble/example_test.go
diff --git a/appengine/tumble/example_test.go b/appengine/tumble/example_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..42ec7fa5d2a57ca6d55f1a73daf1ff75576860ae
--- /dev/null
+++ b/appengine/tumble/example_test.go
@@ -0,0 +1,290 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package tumble
+
+import (
+ "encoding/base64"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/julienschmidt/httprouter"
+ "github.com/luci/gae/impl/memory"
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/taskqueue"
+ "github.com/luci/luci-go/common/bit_field"
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/logging"
+ "github.com/luci/luci-go/common/logging/memlogger"
+ "github.com/luci/luci-go/common/stringset"
+ . "github.com/smartystreets/goconvey/convey"
+ "golang.org/x/net/context"
+)
+
+type User struct {
+ Name string `gae:"$id"`
+}
+
+func (u *User) SendMessage(c context.Context, msg string, toUsers ...string) (*OutgoingMessage, error) {
+ sort.Strings(toUsers)
+ ds := datastore.Get(c)
+ k := ds.KeyForObj(u)
+ outMsg := &OutgoingMessage{
+ FromUser: k,
+ Message: msg,
+ Recipients: toUsers,
+ Success: bf.Make(uint64(len(toUsers))),
+ Failure: bf.Make(uint64(len(toUsers))),
+ }
+ err := EnterTransaction(c, k, func(c context.Context) ([]Mutation, error) {
+ ds := datastore.Get(c)
+ if err := ds.Put(outMsg); err != nil {
+ return nil, err
+ }
+ outKey := ds.KeyForObj(outMsg)
+ muts := make([]Mutation, len(toUsers))
+ for i := range muts {
+ muts[i] = &SendMessage{outKey, toUsers[i]}
+ }
+ return muts, nil
+ })
+ if err != nil {
+ outMsg = nil
+ }
+ return outMsg, err
+}
+
+type OutgoingMessage struct {
+ // datastore-assigned
+ ID int64 `gae:"$id"`
+ FromUser *datastore.Key `gae:"$parent"`
+
+ Message string `gae:",noindex"`
+ Recipients []string `gae:",noindex"`
+
+ Success bf.BitField
+ Failure bf.BitField
+}
+
+type IncomingMessage struct {
+ // OtherUser|OutgoingMessageID
+ ID string `gae:"$id"`
+ ForUser *datastore.Key `gae:"$parent"`
+}
+
+type SendMessage struct {
+ Message *datastore.Key
+ ToUser string
+}
+
+func (m *SendMessage) Root(ctx context.Context) *datastore.Key {
+ return datastore.Get(ctx).KeyForObj(&User{Name: m.ToUser})
+}
+
+func (m *SendMessage) RollForward(c context.Context) ([]Mutation, error) {
+ ds := datastore.Get(c)
+ u := &User{Name: m.ToUser}
+ if err := ds.Get(u); err != nil {
+ if err == datastore.ErrNoSuchEntity {
+ return []Mutation{&WriteReceipt{m.Message, m.ToUser, false}}, nil
+ }
+ return nil, err
+ }
+ im := &IncomingMessage{
+ ID: fmt.Sprintf("%s|%d", m.Message.Parent().StringID(), m.Message.IntID()),
+ ForUser: ds.KeyForObj(&User{Name: m.ToUser}),
+ }
+ err := ds.Get(im)
+ if err == datastore.ErrNoSuchEntity {
+ err = ds.Put(im)
+ return []Mutation{&WriteReceipt{m.Message, m.ToUser, true}}, err
+ }
+ return nil, err
+}
+
+type WriteReceipt struct {
+ Message *datastore.Key
+ Recipient string
+ Success bool
+}
+
+func (w *WriteReceipt) Root(ctx context.Context) *datastore.Key {
+ return w.Message.Root()
+}
+
+func (w *WriteReceipt) RollForward(c context.Context) ([]Mutation, error) {
+ ds := datastore.Get(c)
+ m := &OutgoingMessage{ID: w.Message.IntID(), FromUser: w.Message.Parent()}
+ if err := ds.Get(m); err != nil {
+ return nil, err
+ }
+
+ idx := uint64(sort.SearchStrings(m.Recipients, w.Recipient))
+ if w.Success {
+ m.Success.Set(idx)
+ } else {
+ m.Failure.Set(idx)
+ }
+
+ return nil, ds.Put(m)
+}
+
+func init() {
+ Register((*SendMessage)(nil))
+ Register((*WriteReceipt)(nil))
+
+ dustSettleTimeout = 0
+}
+
+func TestHighLevel(t *testing.T) {
+ t.Parallel()
+
+ Convey("Tumble", t, func() {
+ Convey("Check registration", func() {
+ So(registry, ShouldContainKey, "*tumble.SendMessage")
+ })
+
+ Convey("Good", func() {
+ ctx := memory.Use(memlogger.Use(context.Background()))
+ ctx, clk := testclock.UseTime(ctx, testclock.TestTimeUTC)
+ cfg := GetConfig(ctx)
+ ds := datastore.Get(ctx)
+ tq := taskqueue.Get(ctx)
+ l := logging.Get(ctx).(*memlogger.MemLogger)
+ _ = l
+
+ tq.Testable().CreateQueue(cfg.Name)
+
+ ds.Testable().AddIndexes(&datastore.IndexDefinition{
+ Kind: "tumble.Mutation",
+ SortBy: []datastore.IndexColumn{
+ {Property: "ExpandedShard"},
+ {Property: "TargetRoot"},
+ },
+ })
+ ds.Testable().CatchupIndexes()
+
+ iterate := func() int {
+ ret := 0
+ tsks := tq.Testable().GetScheduledTasks()[cfg.Name]
+ for _, tsk := range tsks {
+ if tsk.ETA.After(clk.Now()) {
+ continue
+ }
+ toks := strings.Split(tsk.Path, "/")
+ rec := httptest.NewRecorder()
+ ProcessShardHandler(ctx, rec, &http.Request{
+ Header: http.Header{"X-AppEngine-QueueName": []string{cfg.Name}},
+ }, httprouter.Params{
+ {Key: "shard_id", Value: toks[4]},
+ {Key: "timestamp", Value: toks[6]},
+ })
+ So(rec.Code, ShouldEqual, 200)
+ So(tq.Delete(tsk, cfg.Name), ShouldBeNil)
+ ret++
+ }
+ return ret
+ }
+
+ cron := func() {
+ rec := httptest.NewRecorder()
+ FireAllTasksHandler(ctx, rec, &http.Request{
+ Header: http.Header{"X-Appengine-Cron": []string{"true"}},
+ })
+ So(rec.Code, ShouldEqual, 200)
+ }
+
+ charlie := &User{Name: "charlie"}
+ So(ds.Put(charlie), ShouldBeNil)
+
+ Convey("can't send to someone who doesn't exist", func() {
+ outMsg, err := charlie.SendMessage(ctx, "Hey there", "lennon")
+ So(err, ShouldBeNil)
+
+ // need to advance clock and catch up indexes
+ So(iterate(), ShouldEqual, 0)
+ clk.Add(time.Second * 10)
+
+ // need to catch up indexes
+ So(iterate(), ShouldEqual, 1)
+
+ cron()
+ ds.Testable().CatchupIndexes()
+ clk.Add(time.Second * 10)
+
+ So(iterate(), ShouldEqual, cfg.NumShards)
+ ds.Testable().CatchupIndexes()
+ clk.Add(time.Second * 10)
+
+ So(iterate(), ShouldEqual, 1)
+
+ So(ds.Get(outMsg), ShouldBeNil)
+ So(outMsg.Failure.All(true), ShouldBeTrue)
+ })
+
+ Convey("sending to yourself could be done in one iteration if you're lucky", func() {
+ ds.Testable().Consistent(true)
+
+ outMsg, err := charlie.SendMessage(ctx, "Hey there", "charlie")
+ So(err, ShouldBeNil)
+
+ clk.Add(time.Second * 10)
+
+ So(iterate(), ShouldEqual, 1)
+
+ So(ds.Get(outMsg), ShouldBeNil)
+ So(outMsg.Success.All(true), ShouldBeTrue)
+ })
+
+ Convey("sending to 200 people is no big deal", func() {
+ users := make([]User, 200)
+ recipients := make([]string, 200)
+ for i := range recipients {
+ name := base64.StdEncoding.EncodeToString([]byte{byte(i)})
+ recipients[i] = name
+ users[i].Name = name
+ }
+ So(ds.PutMulti(users), ShouldBeNil)
+
+ outMsg, err := charlie.SendMessage(ctx, "Hey there", recipients...)
+ So(err, ShouldBeNil)
+
+ // do all the SendMessages
+ ds.Testable().CatchupIndexes()
+ clk.Add(time.Second * 10)
+ So(iterate(), ShouldEqual, cfg.NumShards)
+
+ // do all the WriteReceipts
+ l.Reset()
+ ds.Testable().CatchupIndexes()
+ clk.Add(time.Second * 10)
+ So(iterate(), ShouldEqual, 1)
+
+ // hacky proof that all 200 incoming message reciepts were buffered
+ // appropriately.
+ toFind := stringset.NewFromSlice(
+ "successfully processed 128 mutations, adding 0 more",
+ "successfully processed 72 mutations, adding 0 more")
iannucci 2015/10/10 17:51:53 I bumped this over the batch size limit to show th
+ for _, msg := range l.Messages() {
+ if msg.Level == logging.Info && toFind.Has(msg.Msg) {
+ toFind.Del(msg.Msg)
+ }
+ }
+ So(toFind.Len(), ShouldEqual, 0)
+
+ So(ds.Get(outMsg), ShouldBeNil)
+ So(outMsg.Success.All(true), ShouldBeTrue)
+ So(outMsg.Success.Size(), ShouldEqual, 200)
+
+ })
+
+ })
+
+ })
+}

Powered by Google App Engine
This is Rietveld 408576698