Chromium Code Reviews| Index: appengine/tumble/example_test.go |
| diff --git a/appengine/tumble/example_test.go b/appengine/tumble/example_test.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..42ec7fa5d2a57ca6d55f1a73daf1ff75576860ae |
| --- /dev/null |
| +++ b/appengine/tumble/example_test.go |
| @@ -0,0 +1,290 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package tumble |
| + |
| +import ( |
| + "encoding/base64" |
| + "fmt" |
| + "net/http" |
| + "net/http/httptest" |
| + "sort" |
| + "strings" |
| + "testing" |
| + "time" |
| + |
| + "github.com/julienschmidt/httprouter" |
| + "github.com/luci/gae/impl/memory" |
| + "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/taskqueue" |
| + "github.com/luci/luci-go/common/bit_field" |
| + "github.com/luci/luci-go/common/clock/testclock" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/logging/memlogger" |
| + "github.com/luci/luci-go/common/stringset" |
| + . "github.com/smartystreets/goconvey/convey" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +type User struct { |
| + Name string `gae:"$id"` |
| +} |
| + |
| +func (u *User) SendMessage(c context.Context, msg string, toUsers ...string) (*OutgoingMessage, error) { |
| + sort.Strings(toUsers) |
| + ds := datastore.Get(c) |
| + k := ds.KeyForObj(u) |
| + outMsg := &OutgoingMessage{ |
| + FromUser: k, |
| + Message: msg, |
| + Recipients: toUsers, |
| + Success: bf.Make(uint64(len(toUsers))), |
| + Failure: bf.Make(uint64(len(toUsers))), |
| + } |
| + err := EnterTransaction(c, k, func(c context.Context) ([]Mutation, error) { |
| + ds := datastore.Get(c) |
| + if err := ds.Put(outMsg); err != nil { |
| + return nil, err |
| + } |
| + outKey := ds.KeyForObj(outMsg) |
| + muts := make([]Mutation, len(toUsers)) |
| + for i := range muts { |
| + muts[i] = &SendMessage{outKey, toUsers[i]} |
| + } |
| + return muts, nil |
| + }) |
| + if err != nil { |
| + outMsg = nil |
| + } |
| + return outMsg, err |
| +} |
| + |
| +type OutgoingMessage struct { |
| + // datastore-assigned |
| + ID int64 `gae:"$id"` |
| + FromUser *datastore.Key `gae:"$parent"` |
| + |
| + Message string `gae:",noindex"` |
| + Recipients []string `gae:",noindex"` |
| + |
| + Success bf.BitField |
| + Failure bf.BitField |
| +} |
| + |
| +type IncomingMessage struct { |
| + // OtherUser|OutgoingMessageID |
| + ID string `gae:"$id"` |
| + ForUser *datastore.Key `gae:"$parent"` |
| +} |
| + |
| +type SendMessage struct { |
| + Message *datastore.Key |
| + ToUser string |
| +} |
| + |
| +func (m *SendMessage) Root(ctx context.Context) *datastore.Key { |
| + return datastore.Get(ctx).KeyForObj(&User{Name: m.ToUser}) |
| +} |
| + |
| +func (m *SendMessage) RollForward(c context.Context) ([]Mutation, error) { |
| + ds := datastore.Get(c) |
| + u := &User{Name: m.ToUser} |
| + if err := ds.Get(u); err != nil { |
| + if err == datastore.ErrNoSuchEntity { |
| + return []Mutation{&WriteReceipt{m.Message, m.ToUser, false}}, nil |
| + } |
| + return nil, err |
| + } |
| + im := &IncomingMessage{ |
| + ID: fmt.Sprintf("%s|%d", m.Message.Parent().StringID(), m.Message.IntID()), |
| + ForUser: ds.KeyForObj(&User{Name: m.ToUser}), |
| + } |
| + err := ds.Get(im) |
| + if err == datastore.ErrNoSuchEntity { |
| + err = ds.Put(im) |
| + return []Mutation{&WriteReceipt{m.Message, m.ToUser, true}}, err |
| + } |
| + return nil, err |
| +} |
| + |
| +type WriteReceipt struct { |
| + Message *datastore.Key |
| + Recipient string |
| + Success bool |
| +} |
| + |
| +func (w *WriteReceipt) Root(ctx context.Context) *datastore.Key { |
| + return w.Message.Root() |
| +} |
| + |
| +func (w *WriteReceipt) RollForward(c context.Context) ([]Mutation, error) { |
| + ds := datastore.Get(c) |
| + m := &OutgoingMessage{ID: w.Message.IntID(), FromUser: w.Message.Parent()} |
| + if err := ds.Get(m); err != nil { |
| + return nil, err |
| + } |
| + |
| + idx := uint64(sort.SearchStrings(m.Recipients, w.Recipient)) |
| + if w.Success { |
| + m.Success.Set(idx) |
| + } else { |
| + m.Failure.Set(idx) |
| + } |
| + |
| + return nil, ds.Put(m) |
| +} |
| + |
| +func init() { |
| + Register((*SendMessage)(nil)) |
| + Register((*WriteReceipt)(nil)) |
| + |
| + dustSettleTimeout = 0 |
| +} |
| + |
| +func TestHighLevel(t *testing.T) { |
| + t.Parallel() |
| + |
| + Convey("Tumble", t, func() { |
| + Convey("Check registration", func() { |
| + So(registry, ShouldContainKey, "*tumble.SendMessage") |
| + }) |
| + |
| + Convey("Good", func() { |
| + ctx := memory.Use(memlogger.Use(context.Background())) |
| + ctx, clk := testclock.UseTime(ctx, testclock.TestTimeUTC) |
| + cfg := GetConfig(ctx) |
| + ds := datastore.Get(ctx) |
| + tq := taskqueue.Get(ctx) |
| + l := logging.Get(ctx).(*memlogger.MemLogger) |
| + _ = l |
| + |
| + tq.Testable().CreateQueue(cfg.Name) |
| + |
| + ds.Testable().AddIndexes(&datastore.IndexDefinition{ |
| + Kind: "tumble.Mutation", |
| + SortBy: []datastore.IndexColumn{ |
| + {Property: "ExpandedShard"}, |
| + {Property: "TargetRoot"}, |
| + }, |
| + }) |
| + ds.Testable().CatchupIndexes() |
| + |
| + iterate := func() int { |
| + ret := 0 |
| + tsks := tq.Testable().GetScheduledTasks()[cfg.Name] |
| + for _, tsk := range tsks { |
| + if tsk.ETA.After(clk.Now()) { |
| + continue |
| + } |
| + toks := strings.Split(tsk.Path, "/") |
| + rec := httptest.NewRecorder() |
| + ProcessShardHandler(ctx, rec, &http.Request{ |
| + Header: http.Header{"X-AppEngine-QueueName": []string{cfg.Name}}, |
| + }, httprouter.Params{ |
| + {Key: "shard_id", Value: toks[4]}, |
| + {Key: "timestamp", Value: toks[6]}, |
| + }) |
| + So(rec.Code, ShouldEqual, 200) |
| + So(tq.Delete(tsk, cfg.Name), ShouldBeNil) |
| + ret++ |
| + } |
| + return ret |
| + } |
| + |
| + cron := func() { |
| + rec := httptest.NewRecorder() |
| + FireAllTasksHandler(ctx, rec, &http.Request{ |
| + Header: http.Header{"X-Appengine-Cron": []string{"true"}}, |
| + }) |
| + So(rec.Code, ShouldEqual, 200) |
| + } |
| + |
| + charlie := &User{Name: "charlie"} |
| + So(ds.Put(charlie), ShouldBeNil) |
| + |
| + Convey("can't send to someone who doesn't exist", func() { |
| + outMsg, err := charlie.SendMessage(ctx, "Hey there", "lennon") |
| + So(err, ShouldBeNil) |
| + |
| + // need to advance clock and catch up indexes |
| + So(iterate(), ShouldEqual, 0) |
| + clk.Add(time.Second * 10) |
| + |
| + // need to catch up indexes |
| + So(iterate(), ShouldEqual, 1) |
| + |
| + cron() |
| + ds.Testable().CatchupIndexes() |
| + clk.Add(time.Second * 10) |
| + |
| + So(iterate(), ShouldEqual, cfg.NumShards) |
| + ds.Testable().CatchupIndexes() |
| + clk.Add(time.Second * 10) |
| + |
| + So(iterate(), ShouldEqual, 1) |
| + |
| + So(ds.Get(outMsg), ShouldBeNil) |
| + So(outMsg.Failure.All(true), ShouldBeTrue) |
| + }) |
| + |
| + Convey("sending to yourself could be done in one iteration if you're lucky", func() { |
| + ds.Testable().Consistent(true) |
| + |
| + outMsg, err := charlie.SendMessage(ctx, "Hey there", "charlie") |
| + So(err, ShouldBeNil) |
| + |
| + clk.Add(time.Second * 10) |
| + |
| + So(iterate(), ShouldEqual, 1) |
| + |
| + So(ds.Get(outMsg), ShouldBeNil) |
| + So(outMsg.Success.All(true), ShouldBeTrue) |
| + }) |
| + |
| + Convey("sending to 200 people is no big deal", func() { |
| + users := make([]User, 200) |
| + recipients := make([]string, 200) |
| + for i := range recipients { |
| + name := base64.StdEncoding.EncodeToString([]byte{byte(i)}) |
| + recipients[i] = name |
| + users[i].Name = name |
| + } |
| + So(ds.PutMulti(users), ShouldBeNil) |
| + |
| + outMsg, err := charlie.SendMessage(ctx, "Hey there", recipients...) |
| + So(err, ShouldBeNil) |
| + |
| + // do all the SendMessages |
| + ds.Testable().CatchupIndexes() |
| + clk.Add(time.Second * 10) |
| + So(iterate(), ShouldEqual, cfg.NumShards) |
| + |
| + // do all the WriteReceipts |
| + l.Reset() |
| + ds.Testable().CatchupIndexes() |
| + clk.Add(time.Second * 10) |
| + So(iterate(), ShouldEqual, 1) |
| + |
| + // hacky proof that all 200 incoming message reciepts were buffered |
| + // appropriately. |
| + toFind := stringset.NewFromSlice( |
| + "successfully processed 128 mutations, adding 0 more", |
| + "successfully processed 72 mutations, adding 0 more") |
|
iannucci
2015/10/10 17:51:53
I bumped this over the batch size limit to show th
|
| + for _, msg := range l.Messages() { |
| + if msg.Level == logging.Info && toFind.Has(msg.Msg) { |
| + toFind.Del(msg.Msg) |
| + } |
| + } |
| + So(toFind.Len(), ShouldEqual, 0) |
| + |
| + So(ds.Get(outMsg), ShouldBeNil) |
| + So(outMsg.Success.All(true), ShouldBeTrue) |
| + So(outMsg.Success.Size(), ShouldEqual, 200) |
| + |
| + }) |
| + |
| + }) |
| + |
| + }) |
| +} |