OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package tumble | |
6 | |
7 import ( | |
8 "encoding/base64" | |
9 "fmt" | |
10 "net/http" | |
11 "net/http/httptest" | |
12 "sort" | |
13 "strings" | |
14 "testing" | |
15 "time" | |
16 | |
17 "github.com/julienschmidt/httprouter" | |
18 "github.com/luci/gae/impl/memory" | |
19 "github.com/luci/gae/service/datastore" | |
20 "github.com/luci/gae/service/taskqueue" | |
21 "github.com/luci/luci-go/common/bit_field" | |
22 "github.com/luci/luci-go/common/clock/testclock" | |
23 "github.com/luci/luci-go/common/logging" | |
24 "github.com/luci/luci-go/common/logging/memlogger" | |
25 "github.com/luci/luci-go/common/stringset" | |
26 . "github.com/smartystreets/goconvey/convey" | |
27 "golang.org/x/net/context" | |
28 ) | |
29 | |
30 type User struct { | |
31 Name string `gae:"$id"` | |
32 } | |
33 | |
34 func (u *User) SendMessage(c context.Context, msg string, toUsers ...string) (*O utgoingMessage, error) { | |
35 sort.Strings(toUsers) | |
36 ds := datastore.Get(c) | |
37 k := ds.KeyForObj(u) | |
38 outMsg := &OutgoingMessage{ | |
39 FromUser: k, | |
40 Message: msg, | |
41 Recipients: toUsers, | |
42 Success: bf.Make(uint64(len(toUsers))), | |
43 Failure: bf.Make(uint64(len(toUsers))), | |
44 } | |
45 err := EnterTransaction(c, k, func(c context.Context) ([]Mutation, error ) { | |
46 ds := datastore.Get(c) | |
47 if err := ds.Put(outMsg); err != nil { | |
48 return nil, err | |
49 } | |
50 outKey := ds.KeyForObj(outMsg) | |
51 muts := make([]Mutation, len(toUsers)) | |
52 for i := range muts { | |
53 muts[i] = &SendMessage{outKey, toUsers[i]} | |
54 } | |
55 return muts, nil | |
56 }) | |
57 if err != nil { | |
58 outMsg = nil | |
59 } | |
60 return outMsg, err | |
61 } | |
62 | |
63 type OutgoingMessage struct { | |
64 // datastore-assigned | |
65 ID int64 `gae:"$id"` | |
66 FromUser *datastore.Key `gae:"$parent"` | |
67 | |
68 Message string `gae:",noindex"` | |
69 Recipients []string `gae:",noindex"` | |
70 | |
71 Success bf.BitField | |
72 Failure bf.BitField | |
73 } | |
74 | |
75 type IncomingMessage struct { | |
76 // OtherUser|OutgoingMessageID | |
77 ID string `gae:"$id"` | |
78 ForUser *datastore.Key `gae:"$parent"` | |
79 } | |
80 | |
81 type SendMessage struct { | |
82 Message *datastore.Key | |
83 ToUser string | |
84 } | |
85 | |
86 func (m *SendMessage) Root(ctx context.Context) *datastore.Key { | |
87 return datastore.Get(ctx).KeyForObj(&User{Name: m.ToUser}) | |
88 } | |
89 | |
90 func (m *SendMessage) RollForward(c context.Context) ([]Mutation, error) { | |
91 ds := datastore.Get(c) | |
92 u := &User{Name: m.ToUser} | |
93 if err := ds.Get(u); err != nil { | |
94 if err == datastore.ErrNoSuchEntity { | |
95 return []Mutation{&WriteReceipt{m.Message, m.ToUser, fal se}}, nil | |
96 } | |
97 return nil, err | |
98 } | |
99 im := &IncomingMessage{ | |
100 ID: fmt.Sprintf("%s|%d", m.Message.Parent().StringID(), m.M essage.IntID()), | |
101 ForUser: ds.KeyForObj(&User{Name: m.ToUser}), | |
102 } | |
103 err := ds.Get(im) | |
104 if err == datastore.ErrNoSuchEntity { | |
105 err = ds.Put(im) | |
106 return []Mutation{&WriteReceipt{m.Message, m.ToUser, true}}, err | |
107 } | |
108 return nil, err | |
109 } | |
110 | |
111 type WriteReceipt struct { | |
112 Message *datastore.Key | |
113 Recipient string | |
114 Success bool | |
115 } | |
116 | |
117 func (w *WriteReceipt) Root(ctx context.Context) *datastore.Key { | |
118 return w.Message.Root() | |
119 } | |
120 | |
121 func (w *WriteReceipt) RollForward(c context.Context) ([]Mutation, error) { | |
122 ds := datastore.Get(c) | |
123 m := &OutgoingMessage{ID: w.Message.IntID(), FromUser: w.Message.Parent( )} | |
124 if err := ds.Get(m); err != nil { | |
125 return nil, err | |
126 } | |
127 | |
128 idx := uint64(sort.SearchStrings(m.Recipients, w.Recipient)) | |
129 if w.Success { | |
130 m.Success.Set(idx) | |
131 } else { | |
132 m.Failure.Set(idx) | |
133 } | |
134 | |
135 return nil, ds.Put(m) | |
136 } | |
137 | |
138 func init() { | |
139 Register((*SendMessage)(nil)) | |
140 Register((*WriteReceipt)(nil)) | |
141 | |
142 dustSettleTimeout = 0 | |
143 } | |
144 | |
145 func TestHighLevel(t *testing.T) { | |
146 t.Parallel() | |
147 | |
148 Convey("Tumble", t, func() { | |
149 Convey("Check registration", func() { | |
150 So(registry, ShouldContainKey, "*tumble.SendMessage") | |
151 }) | |
152 | |
153 Convey("Good", func() { | |
154 ctx := memory.Use(memlogger.Use(context.Background())) | |
155 ctx, clk := testclock.UseTime(ctx, testclock.TestTimeUTC ) | |
156 cfg := GetConfig(ctx) | |
157 ds := datastore.Get(ctx) | |
158 tq := taskqueue.Get(ctx) | |
159 l := logging.Get(ctx).(*memlogger.MemLogger) | |
160 _ = l | |
161 | |
162 tq.Testable().CreateQueue(cfg.Name) | |
163 | |
164 ds.Testable().AddIndexes(&datastore.IndexDefinition{ | |
165 Kind: "tumble.Mutation", | |
166 SortBy: []datastore.IndexColumn{ | |
167 {Property: "ExpandedShard"}, | |
168 {Property: "TargetRoot"}, | |
169 }, | |
170 }) | |
171 ds.Testable().CatchupIndexes() | |
172 | |
173 iterate := func() int { | |
174 ret := 0 | |
175 tsks := tq.Testable().GetScheduledTasks()[cfg.Na me] | |
176 for _, tsk := range tsks { | |
177 if tsk.ETA.After(clk.Now()) { | |
178 continue | |
179 } | |
180 toks := strings.Split(tsk.Path, "/") | |
181 rec := httptest.NewRecorder() | |
182 ProcessShardHandler(ctx, rec, &http.Requ est{ | |
183 Header: http.Header{"X-AppEngine -QueueName": []string{cfg.Name}}, | |
184 }, httprouter.Params{ | |
185 {Key: "shard_id", Value: toks[4] }, | |
186 {Key: "timestamp", Value: toks[6 ]}, | |
187 }) | |
188 So(rec.Code, ShouldEqual, 200) | |
189 So(tq.Delete(tsk, cfg.Name), ShouldBeNil ) | |
190 ret++ | |
191 } | |
192 return ret | |
193 } | |
194 | |
195 cron := func() { | |
196 rec := httptest.NewRecorder() | |
197 FireAllTasksHandler(ctx, rec, &http.Request{ | |
198 Header: http.Header{"X-Appengine-Cron": []string{"true"}}, | |
199 }) | |
200 So(rec.Code, ShouldEqual, 200) | |
201 } | |
202 | |
203 charlie := &User{Name: "charlie"} | |
204 So(ds.Put(charlie), ShouldBeNil) | |
205 | |
206 Convey("can't send to someone who doesn't exist", func() { | |
207 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "lennon") | |
208 So(err, ShouldBeNil) | |
209 | |
210 // need to advance clock and catch up indexes | |
211 So(iterate(), ShouldEqual, 0) | |
212 clk.Add(time.Second * 10) | |
213 | |
214 // need to catch up indexes | |
215 So(iterate(), ShouldEqual, 1) | |
216 | |
217 cron() | |
218 ds.Testable().CatchupIndexes() | |
219 clk.Add(time.Second * 10) | |
220 | |
221 So(iterate(), ShouldEqual, cfg.NumShards) | |
222 ds.Testable().CatchupIndexes() | |
223 clk.Add(time.Second * 10) | |
224 | |
225 So(iterate(), ShouldEqual, 1) | |
226 | |
227 So(ds.Get(outMsg), ShouldBeNil) | |
228 So(outMsg.Failure.All(true), ShouldBeTrue) | |
229 }) | |
230 | |
231 Convey("sending to yourself could be done in one iterati on if you're lucky", func() { | |
232 ds.Testable().Consistent(true) | |
233 | |
234 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "charlie") | |
235 So(err, ShouldBeNil) | |
236 | |
237 clk.Add(time.Second * 10) | |
238 | |
239 So(iterate(), ShouldEqual, 1) | |
240 | |
241 So(ds.Get(outMsg), ShouldBeNil) | |
242 So(outMsg.Success.All(true), ShouldBeTrue) | |
243 }) | |
244 | |
245 Convey("sending to 200 people is no big deal", func() { | |
246 users := make([]User, 200) | |
247 recipients := make([]string, 200) | |
248 for i := range recipients { | |
249 name := base64.StdEncoding.EncodeToStrin g([]byte{byte(i)}) | |
250 recipients[i] = name | |
251 users[i].Name = name | |
252 } | |
253 So(ds.PutMulti(users), ShouldBeNil) | |
254 | |
255 outMsg, err := charlie.SendMessage(ctx, "Hey the re", recipients...) | |
256 So(err, ShouldBeNil) | |
257 | |
258 // do all the SendMessages | |
259 ds.Testable().CatchupIndexes() | |
260 clk.Add(time.Second * 10) | |
261 So(iterate(), ShouldEqual, cfg.NumShards) | |
262 | |
263 // do all the WriteReceipts | |
264 l.Reset() | |
265 ds.Testable().CatchupIndexes() | |
266 clk.Add(time.Second * 10) | |
267 So(iterate(), ShouldEqual, 1) | |
268 | |
269 // hacky proof that all 200 incoming message rec iepts were buffered | |
270 // appropriately. | |
271 toFind := stringset.NewFromSlice( | |
272 "successfully processed 128 mutations, a dding 0 more", | |
273 "successfully processed 72 mutations, ad ding 0 more") | |
iannucci
2015/10/10 17:51:53
I bumped this over the batch size limit to show th
| |
274 for _, msg := range l.Messages() { | |
275 if msg.Level == logging.Info && toFind.H as(msg.Msg) { | |
276 toFind.Del(msg.Msg) | |
277 } | |
278 } | |
279 So(toFind.Len(), ShouldEqual, 0) | |
280 | |
281 So(ds.Get(outMsg), ShouldBeNil) | |
282 So(outMsg.Success.All(true), ShouldBeTrue) | |
283 So(outMsg.Success.Size(), ShouldEqual, 200) | |
284 | |
285 }) | |
286 | |
287 }) | |
288 | |
289 }) | |
290 } | |
OLD | NEW |