Chromium Code Reviews| Index: appengine/tumble/process.go |
| diff --git a/appengine/tumble/process.go b/appengine/tumble/process.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..a9b4f48671b95da51437d86180ef770a013c575f |
| --- /dev/null |
| +++ b/appengine/tumble/process.go |
| @@ -0,0 +1,338 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package tumble |
| + |
| +import ( |
| + "bytes" |
| + "fmt" |
| + "math" |
| + "net/http" |
| + "strconv" |
| + "sync/atomic" |
| + "time" |
| + |
| + "github.com/julienschmidt/httprouter" |
| + "github.com/luci/gae/filter/txnBuf" |
| + "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/datastore/serialize" |
| + "github.com/luci/gae/service/memcache" |
| + "github.com/luci/luci-go/appengine/memlock" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/errors" |
| + "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/common/parallel" |
| + "github.com/luci/luci-go/common/stringset" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +// expandedShardBounds returns the boundary of the expandedShard query that |
| +// currently corresponds to this shard number. If Shard is < 0 or > NumShards |
| +// (the currently configured number of shards), this will return a low > high. |
| +// Otherwise low < high. |
| +func expandedShardBounds(c context.Context, shard uint64) (low, high int64) { |
| + cfg := GetConfig(c) |
| + |
| + if shard < 0 || uint64(shard) >= cfg.NumShards { |
| + logging.Warningf(c, "Invalid shard: %d", shard) |
| + // return inverted bounds |
| + return 0, -1 |
| + } |
| + |
| + expandedShardsPerShard := int64(math.MaxUint64 / cfg.NumShards) |
| + low = math.MinInt64 + (int64(shard) * expandedShardsPerShard) |
| + if uint64(shard) == cfg.NumShards-1 { |
| + high = math.MaxInt64 |
| + } else { |
| + high = low + expandedShardsPerShard |
| + } |
| + return |
| +} |
| + |
| +var dustSettleTimeout = 2 * time.Second |
| + |
| +// ProcessShardHandler is a http handler suitable for installation into |
| +// a httprouter. It expects `logging` and `luci/gae` services to be installed |
| +// into the context. |
| +// |
| +// ProcessShardHandler verifies that its being run as a taskqueue task and that |
| +// the following parameters exist and are well-formed: |
| +// * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. |
| +// * shard_id: decimal-encoded shard identifier. |
| +// |
| +// ProcessShardHandler then invokes ProcessShard with the parsed parameters. |
| +func ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| + if _, ok := r.Header["X-AppEngine-QueueName"]; !ok { |
|
Vadim Sh.
2015/10/12 20:05:14
ah.. you do it here... still, consider a middlewar
|
| + rw.WriteHeader(http.StatusUnauthorized) |
| + fmt.Fprintf(rw, "process_task must be called from taskqueue") |
| + return |
| + } |
| + |
| + tstampStr := p.ByName("timestamp") |
| + sidStr := p.ByName("shard_id") |
| + |
| + tstamp, err := strconv.ParseInt(tstampStr, 10, 64) |
| + if err != nil { |
| + logging.Errorf(c, "bad timestamp %q", tstampStr) |
| + rw.WriteHeader(http.StatusNotFound) |
| + fmt.Fprintf(rw, "bad timestamp") |
| + return |
| + } |
| + |
| + sid, err := strconv.ParseUint(sidStr, 10, 64) |
| + if err != nil { |
| + logging.Errorf(c, "bad shardID %q", tstampStr) |
| + rw.WriteHeader(http.StatusNotFound) |
| + fmt.Fprintf(rw, "bad shardID") |
| + return |
| + } |
| + |
| + err = ProcessShard(c, time.Unix(tstamp, 0).UTC(), sid) |
| + if err != nil { |
|
Vadim Sh.
2015/10/12 20:05:14
in luci-cron I use errors.IsTransient (and errors.
iannucci
2015/10/13 02:39:46
The error here is purely advisory; cron and/or tas
|
| + rw.WriteHeader(http.StatusInternalServerError) |
| + fmt.Fprintf(rw, "error: %s", err) |
| + } else { |
| + rw.Write([]byte("ok")) |
| + } |
| +} |
| + |
| +// ProcessShard is the tumble backend endpoint. This accepts a shard number which |
| +// is expected to be < GlobalConfig.NumShards. |
| +func ProcessShard(c context.Context, timestamp time.Time, shard uint64) error { |
| + low, high := expandedShardBounds(c, shard) |
| + if low > high { |
| + return nil |
| + } |
| + |
| + l := logging.Get(logging.SetField(c, "shard", shard)) |
| + |
| + cfg := GetConfig(c) |
| + |
| + lockKey := fmt.Sprintf("%s.%d.lock", cfg.Name, shard) |
| + clientID := fmt.Sprintf("%d_%d", timestamp.Unix(), shard) |
| + |
| + // this last key allows buffered tasks to early exit if some other shard |
| + // processor has already processed past this task's target timestamp. |
| + lastKey := fmt.Sprintf("%s.%d.last", cfg.Name, shard) |
| + mc := memcache.Get(c) |
| + lastItm, err := mc.Get(lastKey) |
| + if err != nil { |
| + if err != memcache.ErrCacheMiss { |
| + l.Warningf("couldn't obtain last timestamp: %s", err) |
| + } |
| + } else { |
| + val := lastItm.Value() |
| + last, err := serialize.ReadTime(bytes.NewBuffer(val)) |
| + if err != nil { |
| + l.Warningf("could not decode timestamp %v: %s", val, err) |
| + } else { |
| + last = last.Add(cfg.TemporalRoundFactor) |
| + if timestamp.After(last) { |
| + l.Infof("early exit, %s > %s", timestamp, last) |
| + return nil |
| + } |
| + } |
| + } |
| + err = nil |
| + |
| + q := datastore.NewQuery("tumble.Mutation"). |
| + Gte("ExpandedShard", low).Lte("ExpandedShard", high). |
| + Project("TargetRoot").Distinct(true). |
| + Limit(cfg.ProcessMaxBatchSize) |
| + |
| + banSets := map[string]stringset.Set{} |
| + |
| + limitSemaphore := make(chan struct{}, cfg.NumGoroutines) |
| + |
| + for try := 0; try < 2; try++ { |
| + err = memlock.TryWithLock(c, lockKey, clientID, func(c context.Context) error { |
| + l.Infof("Got lock (try %d)", try) |
| + |
| + for { |
| + processCounters := []*int64{} |
| + err := parallel.FanOutIn(func(ch chan<- func() error) { |
| + err := datastore.Get(c).Run(q, func(pm datastore.PropertyMap, _ datastore.CursorCB) bool { |
| + root := pm["TargetRoot"][0].Value().(*datastore.Key) |
| + encRoot := root.Encode() |
| + |
| + // TODO(riannucci): make banSets remove keys from the banSet which |
| + // weren't hit. Once they stop showing up, they'll never show up |
| + // again. |
| + |
| + bs := banSets[encRoot] |
| + if bs == nil { |
| + bs = stringset.New(0) |
| + banSets[encRoot] = bs |
| + } |
| + counter := new(int64) |
| + processCounters = append(processCounters, counter) |
| + |
| + ch <- func() error { |
| + limitSemaphore <- struct{}{} |
| + defer func() { |
| + <-limitSemaphore |
| + }() |
| + return processRoot(c, root, bs, counter) |
| + } |
| + |
| + select { |
| + case <-c.Done(): |
| + l.Warningf("Lost lock!") |
| + return false |
| + default: |
| + return true |
| + } |
| + }) |
| + if err != nil { |
| + l.Errorf("Failure to query: %s", err) |
| + ch <- func() error { |
| + return err |
| + } |
| + } |
| + }) |
| + if err != nil { |
| + return err |
| + } |
| + numProcessed := int64(0) |
| + for _, n := range processCounters { |
| + numProcessed += *n |
| + } |
| + l.Infof("cumulatively processed %d items", numProcessed) |
| + if numProcessed == 0 { |
| + break |
| + } |
| + |
| + err = mc.Set(mc.NewItem(lastKey).SetValue(serialize.ToBytes(clock.Now(c)))) |
| + if err != nil { |
| + l.Warningf("could not update last process memcache key %s: %s", lastKey, err) |
| + } |
| + |
| + clock.Sleep(c, dustSettleTimeout) |
| + } |
| + return nil |
| + }) |
| + if err != memlock.ErrFailedToLock { |
| + break |
| + } |
| + l.Infof("Couldn't obtain lock (try %d) (sleeping 2s)", try+1) |
| + clock.Sleep(c, time.Second*2) |
| + } |
| + if err == memlock.ErrFailedToLock { |
| + l.Infof("Couldn't obtain lock (giving up): %s", err) |
| + err = nil |
| + } |
| + return err |
| +} |
| + |
| +func getBatchByRoot(c context.Context, root *datastore.Key, banSet stringset.Set) ([]*realMutation, error) { |
| + cfg := GetConfig(c) |
| + ds := datastore.Get(c) |
| + q := datastore.NewQuery("tumble.Mutation").Eq("TargetRoot", root) |
| + toFetch := make([]*realMutation, 0, cfg.ProcessMaxBatchSize) |
| + err := ds.Run(q, func(k *datastore.Key, _ datastore.CursorCB) bool { |
| + if !banSet.Has(k.Encode()) { |
| + toFetch = append(toFetch, &realMutation{ |
| + ID: k.StringID(), |
| + Parent: k.Parent(), |
| + }) |
| + } |
| + return len(toFetch) < cap(toFetch) |
| + }) |
| + return toFetch, err |
| +} |
| + |
| +func loadFilteredMutations(c context.Context, rms []*realMutation) ([]*datastore.Key, []Mutation, error) { |
| + ds := datastore.Get(c) |
| + |
| + mutKeys := make([]*datastore.Key, 0, len(rms)) |
| + muts := make([]Mutation, 0, len(rms)) |
| + err := ds.GetMulti(rms) |
| + me, ok := err.(errors.MultiError) |
| + if !ok && err != nil { |
| + return nil, nil, err |
| + } |
| + |
| + for i, rm := range rms { |
| + err = nil |
| + if me != nil { |
| + err = me[i] |
| + } |
| + if err == nil { |
| + m, err := rm.GetMutation() |
| + if err != nil { |
| + logging.Errorf(c, "couldn't load mutation: %s", err) |
| + continue |
| + } |
| + muts = append(muts, m) |
| + mutKeys = append(mutKeys, ds.KeyForObj(rm)) |
| + } else if err != datastore.ErrNoSuchEntity { |
| + return nil, nil, me |
| + } |
| + } |
| + |
| + return mutKeys, muts, nil |
| +} |
| + |
| +func processRoot(c context.Context, root *datastore.Key, banSet stringset.Set, counter *int64) error { |
| + l := logging.Get(c) |
| + |
| + toFetch, err := getBatchByRoot(c, root, banSet) |
| + if err != nil || len(toFetch) == 0 { |
| + return err |
| + } |
| + |
| + mutKeys, muts, err := loadFilteredMutations(c, toFetch) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + select { |
| + case <-c.Done(): |
| + l.Warningf("Lost lock during processRoot") |
| + default: |
| + } |
| + |
| + allShards := map[uint64]struct{}{} |
| + |
| + toDel := make([]*datastore.Key, 0, len(muts)) |
| + numMuts := uint64(0) |
| + err = datastore.Get(txnBuf.FilterRDS(c)).RunInTransaction(func(c context.Context) error { |
| + toDel = toDel[:0] |
| + numMuts = 0 |
| + |
| + for i, m := range muts { |
| + shards, numNewMuts, err := enterTransactionInternal(c, root, m.RollForward) |
| + if err != nil { |
| + l.Errorf("Executing decoded gob(%T) failed: %q: %+v", m, err, m) |
| + continue |
| + } |
| + toDel = append(toDel, mutKeys[i]) |
| + numMuts += uint64(numNewMuts) |
| + for shard := range shards { |
| + allShards[shard] = struct{}{} |
| + } |
| + } |
| + |
| + return nil |
| + }, nil) |
| + if err != nil { |
| + l.Errorf("failed running transaction: %s", err) |
| + return err |
| + } |
| + fireTasks(c, allShards) |
| + l.Infof("successfully processed %d mutations, adding %d more", len(toDel), numMuts) |
| + |
| + if len(toDel) > 0 { |
| + atomic.StoreInt64(counter, int64(len(toDel))) |
| + |
| + for _, k := range toDel { |
| + banSet.Add(k.Encode()) |
| + } |
| + if err := datastore.Get(c).DeleteMulti(toDel); err != nil { |
| + l.Warningf("error deleting finished mutations: %s", err) |
| + } |
| + } |
| + |
| + return nil |
| +} |