Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(544)

Side by Side Diff: appengine/tumble/example_test.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "encoding/base64"
9 "fmt"
10 "net/http"
11 "net/http/httptest"
12 "sort"
13 "strings"
14 "testing"
15 "time"
16
17 "github.com/julienschmidt/httprouter"
18 "github.com/luci/gae/impl/memory"
19 "github.com/luci/gae/service/datastore"
20 "github.com/luci/gae/service/taskqueue"
21 "github.com/luci/luci-go/common/bit_field"
22 "github.com/luci/luci-go/common/clock/testclock"
23 "github.com/luci/luci-go/common/logging"
24 "github.com/luci/luci-go/common/logging/memlogger"
25 "github.com/luci/luci-go/common/stringset"
26 . "github.com/smartystreets/goconvey/convey"
27 "golang.org/x/net/context"
28 )
29
30 type User struct {
31 Name string `gae:"$id"`
32 }
33
34 func (u *User) SendMessage(c context.Context, msg string, toUsers ...string) (*O utgoingMessage, error) {
35 sort.Strings(toUsers)
36 ds := datastore.Get(c)
37 k := ds.KeyForObj(u)
38 outMsg := &OutgoingMessage{
39 FromUser: k,
40 Message: msg,
41 Recipients: toUsers,
42 Success: bf.Make(uint64(len(toUsers))),
43 Failure: bf.Make(uint64(len(toUsers))),
44 }
45 err := EnterTransaction(c, k, func(c context.Context) ([]Mutation, error ) {
46 ds := datastore.Get(c)
47 if err := ds.Put(outMsg); err != nil {
48 return nil, err
49 }
50 outKey := ds.KeyForObj(outMsg)
51 muts := make([]Mutation, len(toUsers))
52 for i := range muts {
53 muts[i] = &SendMessage{outKey, toUsers[i]}
54 }
55 return muts, nil
56 })
57 if err != nil {
58 outMsg = nil
59 }
60 return outMsg, err
61 }
62
63 type OutgoingMessage struct {
64 // datastore-assigned
65 ID int64 `gae:"$id"`
66 FromUser *datastore.Key `gae:"$parent"`
67
68 Message string `gae:",noindex"`
69 Recipients []string `gae:",noindex"`
70
71 Success bf.BitField
72 Failure bf.BitField
73 }
74
75 type IncomingMessage struct {
76 // OtherUser|OutgoingMessageID
77 ID string `gae:"$id"`
78 ForUser *datastore.Key `gae:"$parent"`
79 }
80
81 type SendMessage struct {
82 Message *datastore.Key
83 ToUser string
84 }
85
86 func (m *SendMessage) Root(ctx context.Context) *datastore.Key {
87 return datastore.Get(ctx).KeyForObj(&User{Name: m.ToUser})
88 }
89
90 func (m *SendMessage) RollForward(c context.Context) ([]Mutation, error) {
91 ds := datastore.Get(c)
92 u := &User{Name: m.ToUser}
93 if err := ds.Get(u); err != nil {
94 if err == datastore.ErrNoSuchEntity {
95 return []Mutation{&WriteReceipt{m.Message, m.ToUser, fal se}}, nil
96 }
97 return nil, err
98 }
99 im := &IncomingMessage{
100 ID: fmt.Sprintf("%s|%d", m.Message.Parent().StringID(), m.M essage.IntID()),
101 ForUser: ds.KeyForObj(&User{Name: m.ToUser}),
102 }
103 err := ds.Get(im)
104 if err == datastore.ErrNoSuchEntity {
105 err = ds.Put(im)
106 return []Mutation{&WriteReceipt{m.Message, m.ToUser, true}}, err
107 }
108 return nil, err
109 }
110
111 type WriteReceipt struct {
112 Message *datastore.Key
113 Recipient string
114 Success bool
115 }
116
117 func (w *WriteReceipt) Root(ctx context.Context) *datastore.Key {
118 return w.Message.Root()
119 }
120
121 func (w *WriteReceipt) RollForward(c context.Context) ([]Mutation, error) {
122 ds := datastore.Get(c)
123 m := &OutgoingMessage{ID: w.Message.IntID(), FromUser: w.Message.Parent( )}
124 if err := ds.Get(m); err != nil {
125 return nil, err
126 }
127
128 idx := uint64(sort.SearchStrings(m.Recipients, w.Recipient))
129 if w.Success {
130 m.Success.Set(idx)
131 } else {
132 m.Failure.Set(idx)
133 }
134
135 return nil, ds.Put(m)
136 }
137
138 func init() {
139 Register((*SendMessage)(nil))
140 Register((*WriteReceipt)(nil))
141
142 dustSettleTimeout = 0
143 }
144
145 func TestHighLevel(t *testing.T) {
146 t.Parallel()
147
148 Convey("Tumble", t, func() {
149 Convey("Check registration", func() {
150 So(registry, ShouldContainKey, "*tumble.SendMessage")
151 })
152
153 Convey("Good", func() {
154 ctx := memory.Use(memlogger.Use(context.Background()))
155 ctx, clk := testclock.UseTime(ctx, testclock.TestTimeUTC )
156 cfg := GetConfig(ctx)
157 ds := datastore.Get(ctx)
158 tq := taskqueue.Get(ctx)
159 l := logging.Get(ctx).(*memlogger.MemLogger)
160 _ = l
161
162 tq.Testable().CreateQueue(cfg.Name)
163
164 ds.Testable().AddIndexes(&datastore.IndexDefinition{
165 Kind: "tumble.Mutation",
166 SortBy: []datastore.IndexColumn{
167 {Property: "ExpandedShard"},
168 {Property: "TargetRoot"},
169 },
170 })
171 ds.Testable().CatchupIndexes()
172
173 iterate := func() int {
174 ret := 0
175 tsks := tq.Testable().GetScheduledTasks()[cfg.Na me]
176 for _, tsk := range tsks {
177 if tsk.ETA.After(clk.Now()) {
178 continue
179 }
180 toks := strings.Split(tsk.Path, "/")
181 rec := httptest.NewRecorder()
182 ProcessShardHandler(ctx, rec, &http.Requ est{
183 Header: http.Header{"X-AppEngine -QueueName": []string{cfg.Name}},
184 }, httprouter.Params{
185 {Key: "shard_id", Value: toks[4] },
186 {Key: "timestamp", Value: toks[6 ]},
187 })
188 So(rec.Code, ShouldEqual, 200)
189 So(tq.Delete(tsk, cfg.Name), ShouldBeNil )
190 ret++
191 }
192 return ret
193 }
194
195 cron := func() {
196 rec := httptest.NewRecorder()
197 FireAllTasksHandler(ctx, rec, &http.Request{
198 Header: http.Header{"X-Appengine-Cron": []string{"true"}},
199 })
200 So(rec.Code, ShouldEqual, 200)
201 }
202
203 charlie := &User{Name: "charlie"}
204 So(ds.Put(charlie), ShouldBeNil)
205
206 Convey("can't send to someone who doesn't exist", func() {
207 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "lennon")
208 So(err, ShouldBeNil)
209
210 // need to advance clock and catch up indexes
211 So(iterate(), ShouldEqual, 0)
212 clk.Add(time.Second * 10)
213
214 // need to catch up indexes
215 So(iterate(), ShouldEqual, 1)
216
217 cron()
218 ds.Testable().CatchupIndexes()
219 clk.Add(time.Second * 10)
220
221 So(iterate(), ShouldEqual, cfg.NumShards)
222 ds.Testable().CatchupIndexes()
223 clk.Add(time.Second * 10)
224
225 So(iterate(), ShouldEqual, 1)
226
227 So(ds.Get(outMsg), ShouldBeNil)
228 So(outMsg.Failure.All(true), ShouldBeTrue)
229 })
230
231 Convey("sending to yourself could be done in one iterati on if you're lucky", func() {
232 ds.Testable().Consistent(true)
233
234 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "charlie")
235 So(err, ShouldBeNil)
236
237 clk.Add(time.Second * 10)
238
239 So(iterate(), ShouldEqual, 1)
240
241 So(ds.Get(outMsg), ShouldBeNil)
242 So(outMsg.Success.All(true), ShouldBeTrue)
243 })
244
245 Convey("sending to 200 people is no big deal", func() {
246 users := make([]User, 200)
247 recipients := make([]string, 200)
248 for i := range recipients {
249 name := base64.StdEncoding.EncodeToStrin g([]byte{byte(i)})
250 recipients[i] = name
251 users[i].Name = name
252 }
253 So(ds.PutMulti(users), ShouldBeNil)
254
255 outMsg, err := charlie.SendMessage(ctx, "Hey the re", recipients...)
256 So(err, ShouldBeNil)
257
258 // do all the SendMessages
259 ds.Testable().CatchupIndexes()
260 clk.Add(time.Second * 10)
261 So(iterate(), ShouldEqual, cfg.NumShards)
262
263 // do all the WriteReceipts
264 l.Reset()
265 ds.Testable().CatchupIndexes()
266 clk.Add(time.Second * 10)
267 So(iterate(), ShouldEqual, 1)
268
269 // hacky proof that all 200 incoming message rec iepts were buffered
270 // appropriately.
271 toFind := stringset.NewFromSlice(
272 "successfully processed 128 mutations, a dding 0 more",
273 "successfully processed 72 mutations, ad ding 0 more")
iannucci 2015/10/10 17:51:53 I bumped this over the batch size limit to show th
274 for _, msg := range l.Messages() {
275 if msg.Level == logging.Info && toFind.H as(msg.Msg) {
276 toFind.Del(msg.Msg)
277 }
278 }
279 So(toFind.Len(), ShouldEqual, 0)
280
281 So(ds.Get(outMsg), ShouldBeNil)
282 So(outMsg.Success.All(true), ShouldBeTrue)
283 So(outMsg.Success.Size(), ShouldEqual, 200)
284
285 })
286
287 })
288
289 })
290 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698