Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package tumble | |
| 6 | |
| 7 import ( | |
| 8 "encoding/base64" | |
| 9 "fmt" | |
| 10 "net/http" | |
| 11 "net/http/httptest" | |
| 12 "sort" | |
| 13 "strings" | |
| 14 "testing" | |
| 15 "time" | |
| 16 | |
| 17 "github.com/julienschmidt/httprouter" | |
| 18 "github.com/luci/gae/impl/memory" | |
| 19 "github.com/luci/gae/service/datastore" | |
| 20 "github.com/luci/gae/service/taskqueue" | |
| 21 "github.com/luci/luci-go/common/bit_field" | |
| 22 "github.com/luci/luci-go/common/clock/testclock" | |
| 23 "github.com/luci/luci-go/common/logging" | |
| 24 "github.com/luci/luci-go/common/logging/memlogger" | |
| 25 "github.com/luci/luci-go/common/stringset" | |
| 26 . "github.com/smartystreets/goconvey/convey" | |
| 27 "golang.org/x/net/context" | |
| 28 ) | |
| 29 | |
| 30 type User struct { | |
| 31 Name string `gae:"$id"` | |
| 32 } | |
| 33 | |
| 34 func (u *User) SendMessage(c context.Context, msg string, toUsers ...string) (*O utgoingMessage, error) { | |
| 35 sort.Strings(toUsers) | |
| 36 ds := datastore.Get(c) | |
| 37 k := ds.KeyForObj(u) | |
| 38 outMsg := &OutgoingMessage{ | |
| 39 FromUser: k, | |
| 40 Message: msg, | |
| 41 Recipients: toUsers, | |
| 42 Success: bf.Make(uint64(len(toUsers))), | |
| 43 Failure: bf.Make(uint64(len(toUsers))), | |
| 44 } | |
| 45 err := EnterTransaction(c, k, func(c context.Context) ([]Mutation, error ) { | |
| 46 ds := datastore.Get(c) | |
| 47 if err := ds.Put(outMsg); err != nil { | |
| 48 return nil, err | |
| 49 } | |
| 50 outKey := ds.KeyForObj(outMsg) | |
| 51 muts := make([]Mutation, len(toUsers)) | |
| 52 for i := range muts { | |
| 53 muts[i] = &SendMessage{outKey, toUsers[i]} | |
| 54 } | |
| 55 return muts, nil | |
| 56 }) | |
| 57 if err != nil { | |
| 58 outMsg = nil | |
| 59 } | |
| 60 return outMsg, err | |
| 61 } | |
| 62 | |
| 63 type OutgoingMessage struct { | |
| 64 // datastore-assigned | |
| 65 ID int64 `gae:"$id"` | |
| 66 FromUser *datastore.Key `gae:"$parent"` | |
| 67 | |
| 68 Message string `gae:",noindex"` | |
| 69 Recipients []string `gae:",noindex"` | |
| 70 | |
| 71 Success bf.BitField | |
| 72 Failure bf.BitField | |
| 73 } | |
| 74 | |
| 75 type IncomingMessage struct { | |
| 76 // OtherUser|OutgoingMessageID | |
| 77 ID string `gae:"$id"` | |
| 78 ForUser *datastore.Key `gae:"$parent"` | |
| 79 } | |
| 80 | |
| 81 type SendMessage struct { | |
| 82 Message *datastore.Key | |
| 83 ToUser string | |
| 84 } | |
| 85 | |
| 86 func (m *SendMessage) Root(ctx context.Context) *datastore.Key { | |
| 87 return datastore.Get(ctx).KeyForObj(&User{Name: m.ToUser}) | |
| 88 } | |
| 89 | |
| 90 func (m *SendMessage) RollForward(c context.Context) ([]Mutation, error) { | |
| 91 ds := datastore.Get(c) | |
| 92 u := &User{Name: m.ToUser} | |
| 93 if err := ds.Get(u); err != nil { | |
| 94 if err == datastore.ErrNoSuchEntity { | |
| 95 return []Mutation{&WriteReceipt{m.Message, m.ToUser, fal se}}, nil | |
| 96 } | |
| 97 return nil, err | |
| 98 } | |
| 99 im := &IncomingMessage{ | |
| 100 ID: fmt.Sprintf("%s|%d", m.Message.Parent().StringID(), m.M essage.IntID()), | |
| 101 ForUser: ds.KeyForObj(&User{Name: m.ToUser}), | |
| 102 } | |
| 103 err := ds.Get(im) | |
| 104 if err == datastore.ErrNoSuchEntity { | |
| 105 err = ds.Put(im) | |
| 106 return []Mutation{&WriteReceipt{m.Message, m.ToUser, true}}, err | |
| 107 } | |
| 108 return nil, err | |
| 109 } | |
| 110 | |
| 111 type WriteReceipt struct { | |
| 112 Message *datastore.Key | |
| 113 Recipient string | |
| 114 Success bool | |
| 115 } | |
| 116 | |
| 117 func (w *WriteReceipt) Root(ctx context.Context) *datastore.Key { | |
| 118 return w.Message.Root() | |
| 119 } | |
| 120 | |
| 121 func (w *WriteReceipt) RollForward(c context.Context) ([]Mutation, error) { | |
| 122 ds := datastore.Get(c) | |
| 123 m := &OutgoingMessage{ID: w.Message.IntID(), FromUser: w.Message.Parent( )} | |
| 124 if err := ds.Get(m); err != nil { | |
| 125 return nil, err | |
| 126 } | |
| 127 | |
| 128 idx := uint64(sort.SearchStrings(m.Recipients, w.Recipient)) | |
| 129 if w.Success { | |
| 130 m.Success.Set(idx) | |
| 131 } else { | |
| 132 m.Failure.Set(idx) | |
| 133 } | |
| 134 | |
| 135 return nil, ds.Put(m) | |
| 136 } | |
| 137 | |
| 138 func init() { | |
| 139 Register((*SendMessage)(nil)) | |
| 140 Register((*WriteReceipt)(nil)) | |
| 141 | |
| 142 dustSettleTimeout = 0 | |
| 143 } | |
| 144 | |
| 145 func TestHighLevel(t *testing.T) { | |
| 146 t.Parallel() | |
| 147 | |
| 148 Convey("Tumble", t, func() { | |
| 149 Convey("Check registration", func() { | |
| 150 So(registry, ShouldContainKey, "*tumble.SendMessage") | |
| 151 }) | |
| 152 | |
| 153 Convey("Good", func() { | |
| 154 ctx := memory.Use(memlogger.Use(context.Background())) | |
| 155 ctx, clk := testclock.UseTime(ctx, testclock.TestTimeUTC ) | |
| 156 cfg := GetConfig(ctx) | |
| 157 ds := datastore.Get(ctx) | |
| 158 tq := taskqueue.Get(ctx) | |
| 159 l := logging.Get(ctx).(*memlogger.MemLogger) | |
| 160 _ = l | |
| 161 | |
| 162 tq.Testable().CreateQueue(cfg.Name) | |
| 163 | |
| 164 ds.Testable().AddIndexes(&datastore.IndexDefinition{ | |
| 165 Kind: "tumble.Mutation", | |
| 166 SortBy: []datastore.IndexColumn{ | |
| 167 {Property: "ExpandedShard"}, | |
| 168 {Property: "TargetRoot"}, | |
| 169 }, | |
| 170 }) | |
| 171 ds.Testable().CatchupIndexes() | |
| 172 | |
| 173 iterate := func() int { | |
| 174 ret := 0 | |
| 175 tsks := tq.Testable().GetScheduledTasks()[cfg.Na me] | |
| 176 for _, tsk := range tsks { | |
| 177 if tsk.ETA.After(clk.Now()) { | |
| 178 continue | |
| 179 } | |
| 180 toks := strings.Split(tsk.Path, "/") | |
| 181 rec := httptest.NewRecorder() | |
| 182 ProcessShardHandler(ctx, rec, &http.Requ est{ | |
| 183 Header: http.Header{"X-AppEngine -QueueName": []string{cfg.Name}}, | |
| 184 }, httprouter.Params{ | |
| 185 {Key: "shard_id", Value: toks[4] }, | |
| 186 {Key: "timestamp", Value: toks[6 ]}, | |
| 187 }) | |
| 188 So(rec.Code, ShouldEqual, 200) | |
| 189 So(tq.Delete(tsk, cfg.Name), ShouldBeNil ) | |
| 190 ret++ | |
| 191 } | |
| 192 return ret | |
| 193 } | |
| 194 | |
| 195 cron := func() { | |
| 196 rec := httptest.NewRecorder() | |
| 197 FireAllTasksHandler(ctx, rec, &http.Request{ | |
| 198 Header: http.Header{"X-Appengine-Cron": []string{"true"}}, | |
| 199 }) | |
| 200 So(rec.Code, ShouldEqual, 200) | |
| 201 } | |
| 202 | |
| 203 charlie := &User{Name: "charlie"} | |
| 204 So(ds.Put(charlie), ShouldBeNil) | |
| 205 | |
| 206 Convey("can't send to someone who doesn't exist", func() { | |
| 207 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "lennon") | |
| 208 So(err, ShouldBeNil) | |
| 209 | |
| 210 // need to advance clock and catch up indexes | |
| 211 So(iterate(), ShouldEqual, 0) | |
| 212 clk.Add(time.Second * 10) | |
| 213 | |
| 214 // need to catch up indexes | |
| 215 So(iterate(), ShouldEqual, 1) | |
| 216 | |
| 217 cron() | |
| 218 ds.Testable().CatchupIndexes() | |
| 219 clk.Add(time.Second * 10) | |
| 220 | |
| 221 So(iterate(), ShouldEqual, cfg.NumShards) | |
| 222 ds.Testable().CatchupIndexes() | |
| 223 clk.Add(time.Second * 10) | |
| 224 | |
| 225 So(iterate(), ShouldEqual, 1) | |
| 226 | |
| 227 So(ds.Get(outMsg), ShouldBeNil) | |
| 228 So(outMsg.Failure.All(true), ShouldBeTrue) | |
| 229 }) | |
| 230 | |
| 231 Convey("sending to yourself could be done in one iterati on if you're lucky", func() { | |
| 232 ds.Testable().Consistent(true) | |
| 233 | |
| 234 outMsg, err := charlie.SendMessage(ctx, "Hey the re", "charlie") | |
| 235 So(err, ShouldBeNil) | |
| 236 | |
| 237 clk.Add(time.Second * 10) | |
| 238 | |
| 239 So(iterate(), ShouldEqual, 1) | |
| 240 | |
| 241 So(ds.Get(outMsg), ShouldBeNil) | |
| 242 So(outMsg.Success.All(true), ShouldBeTrue) | |
| 243 }) | |
| 244 | |
| 245 Convey("sending to 200 people is no big deal", func() { | |
| 246 users := make([]User, 200) | |
| 247 recipients := make([]string, 200) | |
| 248 for i := range recipients { | |
| 249 name := base64.StdEncoding.EncodeToStrin g([]byte{byte(i)}) | |
| 250 recipients[i] = name | |
| 251 users[i].Name = name | |
| 252 } | |
| 253 So(ds.PutMulti(users), ShouldBeNil) | |
| 254 | |
| 255 outMsg, err := charlie.SendMessage(ctx, "Hey the re", recipients...) | |
| 256 So(err, ShouldBeNil) | |
| 257 | |
| 258 // do all the SendMessages | |
| 259 ds.Testable().CatchupIndexes() | |
| 260 clk.Add(time.Second * 10) | |
| 261 So(iterate(), ShouldEqual, cfg.NumShards) | |
| 262 | |
| 263 // do all the WriteReceipts | |
| 264 l.Reset() | |
| 265 ds.Testable().CatchupIndexes() | |
| 266 clk.Add(time.Second * 10) | |
| 267 So(iterate(), ShouldEqual, 1) | |
| 268 | |
| 269 // hacky proof that all 200 incoming message rec iepts were buffered | |
| 270 // appropriately. | |
| 271 toFind := stringset.NewFromSlice( | |
| 272 "successfully processed 128 mutations, a dding 0 more", | |
| 273 "successfully processed 72 mutations, ad ding 0 more") | |
|
iannucci
2015/10/10 17:51:53
I bumped this over the batch size limit to show th
| |
| 274 for _, msg := range l.Messages() { | |
| 275 if msg.Level == logging.Info && toFind.H as(msg.Msg) { | |
| 276 toFind.Del(msg.Msg) | |
| 277 } | |
| 278 } | |
| 279 So(toFind.Len(), ShouldEqual, 0) | |
| 280 | |
| 281 So(ds.Get(outMsg), ShouldBeNil) | |
| 282 So(outMsg.Success.All(true), ShouldBeTrue) | |
| 283 So(outMsg.Success.Size(), ShouldEqual, 200) | |
| 284 | |
| 285 }) | |
| 286 | |
| 287 }) | |
| 288 | |
| 289 }) | |
| 290 } | |
| OLD | NEW |