Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(147)

Side by Side Diff: appengine/tumble/process.go

Issue 1395293002: Add "tumble" distributed transaction processing service for appengine. (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package tumble
6
7 import (
8 "bytes"
9 "fmt"
10 "math"
11 "net/http"
12 "strconv"
13 "sync/atomic"
14 "time"
15
16 "github.com/julienschmidt/httprouter"
17 "github.com/luci/gae/filter/txnBuf"
18 "github.com/luci/gae/service/datastore"
19 "github.com/luci/gae/service/datastore/serialize"
20 "github.com/luci/gae/service/memcache"
21 "github.com/luci/luci-go/appengine/memlock"
22 "github.com/luci/luci-go/common/clock"
23 "github.com/luci/luci-go/common/errors"
24 "github.com/luci/luci-go/common/logging"
25 "github.com/luci/luci-go/common/parallel"
26 "github.com/luci/luci-go/common/stringset"
27 "golang.org/x/net/context"
28 )
29
30 // expandedShardBounds returns the boundary of the expandedShard query that
31 // currently corresponds to this shard number. If Shard is < 0 or > NumShards
32 // (the currently configured number of shards), this will return a low > high.
33 // Otherwise low < high.
34 func expandedShardBounds(c context.Context, shard uint64) (low, high int64) {
35 cfg := GetConfig(c)
36
37 if shard < 0 || uint64(shard) >= cfg.NumShards {
38 logging.Warningf(c, "Invalid shard: %d", shard)
39 // return inverted bounds
40 return 0, -1
41 }
42
43 expandedShardsPerShard := int64(math.MaxUint64 / cfg.NumShards)
44 low = math.MinInt64 + (int64(shard) * expandedShardsPerShard)
45 if uint64(shard) == cfg.NumShards-1 {
46 high = math.MaxInt64
47 } else {
48 high = low + expandedShardsPerShard
49 }
50 return
51 }
52
53 var dustSettleTimeout = 2 * time.Second
54
55 // ProcessShardHandler is a http handler suitable for installation into
56 // a httprouter. It expects `logging` and `luci/gae` services to be installed
57 // into the context.
58 //
59 // ProcessShardHandler verifies that its being run as a taskqueue task and that
60 // the following parameters exist and are well-formed:
61 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds.
62 // * shard_id: decimal-encoded shard identifier.
63 //
64 // ProcessShardHandler then invokes ProcessShard with the parsed parameters.
65 func ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Requ est, p httprouter.Params) {
66 if _, ok := r.Header["X-AppEngine-QueueName"]; !ok {
Vadim Sh. 2015/10/12 20:05:14 ah.. you do it here... still, consider a middlewar
67 rw.WriteHeader(http.StatusUnauthorized)
68 fmt.Fprintf(rw, "process_task must be called from taskqueue")
69 return
70 }
71
72 tstampStr := p.ByName("timestamp")
73 sidStr := p.ByName("shard_id")
74
75 tstamp, err := strconv.ParseInt(tstampStr, 10, 64)
76 if err != nil {
77 logging.Errorf(c, "bad timestamp %q", tstampStr)
78 rw.WriteHeader(http.StatusNotFound)
79 fmt.Fprintf(rw, "bad timestamp")
80 return
81 }
82
83 sid, err := strconv.ParseUint(sidStr, 10, 64)
84 if err != nil {
85 logging.Errorf(c, "bad shardID %q", tstampStr)
86 rw.WriteHeader(http.StatusNotFound)
87 fmt.Fprintf(rw, "bad shardID")
88 return
89 }
90
91 err = ProcessShard(c, time.Unix(tstamp, 0).UTC(), sid)
92 if err != nil {
Vadim Sh. 2015/10/12 20:05:14 in luci-cron I use errors.IsTransient (and errors.
iannucci 2015/10/13 02:39:46 The error here is purely advisory; cron and/or tas
93 rw.WriteHeader(http.StatusInternalServerError)
94 fmt.Fprintf(rw, "error: %s", err)
95 } else {
96 rw.Write([]byte("ok"))
97 }
98 }
99
100 // ProcessShard is the tumble backend endpoint. This accepts a shard number whic h
101 // is expected to be < GlobalConfig.NumShards.
102 func ProcessShard(c context.Context, timestamp time.Time, shard uint64) error {
103 low, high := expandedShardBounds(c, shard)
104 if low > high {
105 return nil
106 }
107
108 l := logging.Get(logging.SetField(c, "shard", shard))
109
110 cfg := GetConfig(c)
111
112 lockKey := fmt.Sprintf("%s.%d.lock", cfg.Name, shard)
113 clientID := fmt.Sprintf("%d_%d", timestamp.Unix(), shard)
114
115 // this last key allows buffered tasks to early exit if some other shard
116 // processor has already processed past this task's target timestamp.
117 lastKey := fmt.Sprintf("%s.%d.last", cfg.Name, shard)
118 mc := memcache.Get(c)
119 lastItm, err := mc.Get(lastKey)
120 if err != nil {
121 if err != memcache.ErrCacheMiss {
122 l.Warningf("couldn't obtain last timestamp: %s", err)
123 }
124 } else {
125 val := lastItm.Value()
126 last, err := serialize.ReadTime(bytes.NewBuffer(val))
127 if err != nil {
128 l.Warningf("could not decode timestamp %v: %s", val, err )
129 } else {
130 last = last.Add(cfg.TemporalRoundFactor)
131 if timestamp.After(last) {
132 l.Infof("early exit, %s > %s", timestamp, last)
133 return nil
134 }
135 }
136 }
137 err = nil
138
139 q := datastore.NewQuery("tumble.Mutation").
140 Gte("ExpandedShard", low).Lte("ExpandedShard", high).
141 Project("TargetRoot").Distinct(true).
142 Limit(cfg.ProcessMaxBatchSize)
143
144 banSets := map[string]stringset.Set{}
145
146 limitSemaphore := make(chan struct{}, cfg.NumGoroutines)
147
148 for try := 0; try < 2; try++ {
149 err = memlock.TryWithLock(c, lockKey, clientID, func(c context.C ontext) error {
150 l.Infof("Got lock (try %d)", try)
151
152 for {
153 processCounters := []*int64{}
154 err := parallel.FanOutIn(func(ch chan<- func() e rror) {
155 err := datastore.Get(c).Run(q, func(pm d atastore.PropertyMap, _ datastore.CursorCB) bool {
156 root := pm["TargetRoot"][0].Valu e().(*datastore.Key)
157 encRoot := root.Encode()
158
159 // TODO(riannucci): make banSets remove keys from the banSet which
160 // weren't hit. Once they stop s howing up, they'll never show up
161 // again.
162
163 bs := banSets[encRoot]
164 if bs == nil {
165 bs = stringset.New(0)
166 banSets[encRoot] = bs
167 }
168 counter := new(int64)
169 processCounters = append(process Counters, counter)
170
171 ch <- func() error {
172 limitSemaphore <- struct {}{}
173 defer func() {
174 <-limitSemaphore
175 }()
176 return processRoot(c, ro ot, bs, counter)
177 }
178
179 select {
180 case <-c.Done():
181 l.Warningf("Lost lock!")
182 return false
183 default:
184 return true
185 }
186 })
187 if err != nil {
188 l.Errorf("Failure to query: %s", err)
189 ch <- func() error {
190 return err
191 }
192 }
193 })
194 if err != nil {
195 return err
196 }
197 numProcessed := int64(0)
198 for _, n := range processCounters {
199 numProcessed += *n
200 }
201 l.Infof("cumulatively processed %d items", numPr ocessed)
202 if numProcessed == 0 {
203 break
204 }
205
206 err = mc.Set(mc.NewItem(lastKey).SetValue(serial ize.ToBytes(clock.Now(c))))
207 if err != nil {
208 l.Warningf("could not update last proces s memcache key %s: %s", lastKey, err)
209 }
210
211 clock.Sleep(c, dustSettleTimeout)
212 }
213 return nil
214 })
215 if err != memlock.ErrFailedToLock {
216 break
217 }
218 l.Infof("Couldn't obtain lock (try %d) (sleeping 2s)", try+1)
219 clock.Sleep(c, time.Second*2)
220 }
221 if err == memlock.ErrFailedToLock {
222 l.Infof("Couldn't obtain lock (giving up): %s", err)
223 err = nil
224 }
225 return err
226 }
227
228 func getBatchByRoot(c context.Context, root *datastore.Key, banSet stringset.Set ) ([]*realMutation, error) {
229 cfg := GetConfig(c)
230 ds := datastore.Get(c)
231 q := datastore.NewQuery("tumble.Mutation").Eq("TargetRoot", root)
232 toFetch := make([]*realMutation, 0, cfg.ProcessMaxBatchSize)
233 err := ds.Run(q, func(k *datastore.Key, _ datastore.CursorCB) bool {
234 if !banSet.Has(k.Encode()) {
235 toFetch = append(toFetch, &realMutation{
236 ID: k.StringID(),
237 Parent: k.Parent(),
238 })
239 }
240 return len(toFetch) < cap(toFetch)
241 })
242 return toFetch, err
243 }
244
245 func loadFilteredMutations(c context.Context, rms []*realMutation) ([]*datastore .Key, []Mutation, error) {
246 ds := datastore.Get(c)
247
248 mutKeys := make([]*datastore.Key, 0, len(rms))
249 muts := make([]Mutation, 0, len(rms))
250 err := ds.GetMulti(rms)
251 me, ok := err.(errors.MultiError)
252 if !ok && err != nil {
253 return nil, nil, err
254 }
255
256 for i, rm := range rms {
257 err = nil
258 if me != nil {
259 err = me[i]
260 }
261 if err == nil {
262 m, err := rm.GetMutation()
263 if err != nil {
264 logging.Errorf(c, "couldn't load mutation: %s", err)
265 continue
266 }
267 muts = append(muts, m)
268 mutKeys = append(mutKeys, ds.KeyForObj(rm))
269 } else if err != datastore.ErrNoSuchEntity {
270 return nil, nil, me
271 }
272 }
273
274 return mutKeys, muts, nil
275 }
276
277 func processRoot(c context.Context, root *datastore.Key, banSet stringset.Set, c ounter *int64) error {
278 l := logging.Get(c)
279
280 toFetch, err := getBatchByRoot(c, root, banSet)
281 if err != nil || len(toFetch) == 0 {
282 return err
283 }
284
285 mutKeys, muts, err := loadFilteredMutations(c, toFetch)
286 if err != nil {
287 return err
288 }
289
290 select {
291 case <-c.Done():
292 l.Warningf("Lost lock during processRoot")
293 default:
294 }
295
296 allShards := map[uint64]struct{}{}
297
298 toDel := make([]*datastore.Key, 0, len(muts))
299 numMuts := uint64(0)
300 err = datastore.Get(txnBuf.FilterRDS(c)).RunInTransaction(func(c context .Context) error {
301 toDel = toDel[:0]
302 numMuts = 0
303
304 for i, m := range muts {
305 shards, numNewMuts, err := enterTransactionInternal(c, r oot, m.RollForward)
306 if err != nil {
307 l.Errorf("Executing decoded gob(%T) failed: %q: %+v", m, err, m)
308 continue
309 }
310 toDel = append(toDel, mutKeys[i])
311 numMuts += uint64(numNewMuts)
312 for shard := range shards {
313 allShards[shard] = struct{}{}
314 }
315 }
316
317 return nil
318 }, nil)
319 if err != nil {
320 l.Errorf("failed running transaction: %s", err)
321 return err
322 }
323 fireTasks(c, allShards)
324 l.Infof("successfully processed %d mutations, adding %d more", len(toDel ), numMuts)
325
326 if len(toDel) > 0 {
327 atomic.StoreInt64(counter, int64(len(toDel)))
328
329 for _, k := range toDel {
330 banSet.Add(k.Encode())
331 }
332 if err := datastore.Get(c).DeleteMulti(toDel); err != nil {
333 l.Warningf("error deleting finished mutations: %s", err)
334 }
335 }
336
337 return nil
338 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698