Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package tumble | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "fmt" | |
| 10 "math" | |
| 11 "net/http" | |
| 12 "strconv" | |
| 13 "sync/atomic" | |
| 14 "time" | |
| 15 | |
| 16 "github.com/julienschmidt/httprouter" | |
| 17 "github.com/luci/gae/filter/txnBuf" | |
| 18 "github.com/luci/gae/service/datastore" | |
| 19 "github.com/luci/gae/service/datastore/serialize" | |
| 20 "github.com/luci/gae/service/memcache" | |
| 21 "github.com/luci/luci-go/appengine/memlock" | |
| 22 "github.com/luci/luci-go/common/clock" | |
| 23 "github.com/luci/luci-go/common/errors" | |
| 24 "github.com/luci/luci-go/common/logging" | |
| 25 "github.com/luci/luci-go/common/parallel" | |
| 26 "github.com/luci/luci-go/common/stringset" | |
| 27 "golang.org/x/net/context" | |
| 28 ) | |
| 29 | |
| 30 // expandedShardBounds returns the boundary of the expandedShard query that | |
| 31 // currently corresponds to this shard number. If Shard is < 0 or > NumShards | |
| 32 // (the currently configured number of shards), this will return a low > high. | |
| 33 // Otherwise low < high. | |
| 34 func expandedShardBounds(c context.Context, shard uint64) (low, high int64) { | |
| 35 cfg := GetConfig(c) | |
| 36 | |
| 37 if shard < 0 || uint64(shard) >= cfg.NumShards { | |
| 38 logging.Warningf(c, "Invalid shard: %d", shard) | |
| 39 // return inverted bounds | |
| 40 return 0, -1 | |
| 41 } | |
| 42 | |
| 43 expandedShardsPerShard := int64(math.MaxUint64 / cfg.NumShards) | |
| 44 low = math.MinInt64 + (int64(shard) * expandedShardsPerShard) | |
| 45 if uint64(shard) == cfg.NumShards-1 { | |
| 46 high = math.MaxInt64 | |
| 47 } else { | |
| 48 high = low + expandedShardsPerShard | |
| 49 } | |
| 50 return | |
| 51 } | |
| 52 | |
| 53 var dustSettleTimeout = 2 * time.Second | |
| 54 | |
| 55 // ProcessShardHandler is a http handler suitable for installation into | |
| 56 // a httprouter. It expects `logging` and `luci/gae` services to be installed | |
| 57 // into the context. | |
| 58 // | |
| 59 // ProcessShardHandler verifies that its being run as a taskqueue task and that | |
| 60 // the following parameters exist and are well-formed: | |
| 61 // * timestamp: decimal-encoded UNIX/UTC timestamp in seconds. | |
| 62 // * shard_id: decimal-encoded shard identifier. | |
| 63 // | |
| 64 // ProcessShardHandler then invokes ProcessShard with the parsed parameters. | |
| 65 func ProcessShardHandler(c context.Context, rw http.ResponseWriter, r *http.Requ est, p httprouter.Params) { | |
| 66 if _, ok := r.Header["X-AppEngine-QueueName"]; !ok { | |
|
Vadim Sh.
2015/10/12 20:05:14
ah.. you do it here... still, consider a middlewar
| |
| 67 rw.WriteHeader(http.StatusUnauthorized) | |
| 68 fmt.Fprintf(rw, "process_task must be called from taskqueue") | |
| 69 return | |
| 70 } | |
| 71 | |
| 72 tstampStr := p.ByName("timestamp") | |
| 73 sidStr := p.ByName("shard_id") | |
| 74 | |
| 75 tstamp, err := strconv.ParseInt(tstampStr, 10, 64) | |
| 76 if err != nil { | |
| 77 logging.Errorf(c, "bad timestamp %q", tstampStr) | |
| 78 rw.WriteHeader(http.StatusNotFound) | |
| 79 fmt.Fprintf(rw, "bad timestamp") | |
| 80 return | |
| 81 } | |
| 82 | |
| 83 sid, err := strconv.ParseUint(sidStr, 10, 64) | |
| 84 if err != nil { | |
| 85 logging.Errorf(c, "bad shardID %q", tstampStr) | |
| 86 rw.WriteHeader(http.StatusNotFound) | |
| 87 fmt.Fprintf(rw, "bad shardID") | |
| 88 return | |
| 89 } | |
| 90 | |
| 91 err = ProcessShard(c, time.Unix(tstamp, 0).UTC(), sid) | |
| 92 if err != nil { | |
|
Vadim Sh.
2015/10/12 20:05:14
in luci-cron I use errors.IsTransient (and errors.
iannucci
2015/10/13 02:39:46
The error here is purely advisory; cron and/or tas
| |
| 93 rw.WriteHeader(http.StatusInternalServerError) | |
| 94 fmt.Fprintf(rw, "error: %s", err) | |
| 95 } else { | |
| 96 rw.Write([]byte("ok")) | |
| 97 } | |
| 98 } | |
| 99 | |
| 100 // ProcessShard is the tumble backend endpoint. This accepts a shard number whic h | |
| 101 // is expected to be < GlobalConfig.NumShards. | |
| 102 func ProcessShard(c context.Context, timestamp time.Time, shard uint64) error { | |
| 103 low, high := expandedShardBounds(c, shard) | |
| 104 if low > high { | |
| 105 return nil | |
| 106 } | |
| 107 | |
| 108 l := logging.Get(logging.SetField(c, "shard", shard)) | |
| 109 | |
| 110 cfg := GetConfig(c) | |
| 111 | |
| 112 lockKey := fmt.Sprintf("%s.%d.lock", cfg.Name, shard) | |
| 113 clientID := fmt.Sprintf("%d_%d", timestamp.Unix(), shard) | |
| 114 | |
| 115 // this last key allows buffered tasks to early exit if some other shard | |
| 116 // processor has already processed past this task's target timestamp. | |
| 117 lastKey := fmt.Sprintf("%s.%d.last", cfg.Name, shard) | |
| 118 mc := memcache.Get(c) | |
| 119 lastItm, err := mc.Get(lastKey) | |
| 120 if err != nil { | |
| 121 if err != memcache.ErrCacheMiss { | |
| 122 l.Warningf("couldn't obtain last timestamp: %s", err) | |
| 123 } | |
| 124 } else { | |
| 125 val := lastItm.Value() | |
| 126 last, err := serialize.ReadTime(bytes.NewBuffer(val)) | |
| 127 if err != nil { | |
| 128 l.Warningf("could not decode timestamp %v: %s", val, err ) | |
| 129 } else { | |
| 130 last = last.Add(cfg.TemporalRoundFactor) | |
| 131 if timestamp.After(last) { | |
| 132 l.Infof("early exit, %s > %s", timestamp, last) | |
| 133 return nil | |
| 134 } | |
| 135 } | |
| 136 } | |
| 137 err = nil | |
| 138 | |
| 139 q := datastore.NewQuery("tumble.Mutation"). | |
| 140 Gte("ExpandedShard", low).Lte("ExpandedShard", high). | |
| 141 Project("TargetRoot").Distinct(true). | |
| 142 Limit(cfg.ProcessMaxBatchSize) | |
| 143 | |
| 144 banSets := map[string]stringset.Set{} | |
| 145 | |
| 146 limitSemaphore := make(chan struct{}, cfg.NumGoroutines) | |
| 147 | |
| 148 for try := 0; try < 2; try++ { | |
| 149 err = memlock.TryWithLock(c, lockKey, clientID, func(c context.C ontext) error { | |
| 150 l.Infof("Got lock (try %d)", try) | |
| 151 | |
| 152 for { | |
| 153 processCounters := []*int64{} | |
| 154 err := parallel.FanOutIn(func(ch chan<- func() e rror) { | |
| 155 err := datastore.Get(c).Run(q, func(pm d atastore.PropertyMap, _ datastore.CursorCB) bool { | |
| 156 root := pm["TargetRoot"][0].Valu e().(*datastore.Key) | |
| 157 encRoot := root.Encode() | |
| 158 | |
| 159 // TODO(riannucci): make banSets remove keys from the banSet which | |
| 160 // weren't hit. Once they stop s howing up, they'll never show up | |
| 161 // again. | |
| 162 | |
| 163 bs := banSets[encRoot] | |
| 164 if bs == nil { | |
| 165 bs = stringset.New(0) | |
| 166 banSets[encRoot] = bs | |
| 167 } | |
| 168 counter := new(int64) | |
| 169 processCounters = append(process Counters, counter) | |
| 170 | |
| 171 ch <- func() error { | |
| 172 limitSemaphore <- struct {}{} | |
| 173 defer func() { | |
| 174 <-limitSemaphore | |
| 175 }() | |
| 176 return processRoot(c, ro ot, bs, counter) | |
| 177 } | |
| 178 | |
| 179 select { | |
| 180 case <-c.Done(): | |
| 181 l.Warningf("Lost lock!") | |
| 182 return false | |
| 183 default: | |
| 184 return true | |
| 185 } | |
| 186 }) | |
| 187 if err != nil { | |
| 188 l.Errorf("Failure to query: %s", err) | |
| 189 ch <- func() error { | |
| 190 return err | |
| 191 } | |
| 192 } | |
| 193 }) | |
| 194 if err != nil { | |
| 195 return err | |
| 196 } | |
| 197 numProcessed := int64(0) | |
| 198 for _, n := range processCounters { | |
| 199 numProcessed += *n | |
| 200 } | |
| 201 l.Infof("cumulatively processed %d items", numPr ocessed) | |
| 202 if numProcessed == 0 { | |
| 203 break | |
| 204 } | |
| 205 | |
| 206 err = mc.Set(mc.NewItem(lastKey).SetValue(serial ize.ToBytes(clock.Now(c)))) | |
| 207 if err != nil { | |
| 208 l.Warningf("could not update last proces s memcache key %s: %s", lastKey, err) | |
| 209 } | |
| 210 | |
| 211 clock.Sleep(c, dustSettleTimeout) | |
| 212 } | |
| 213 return nil | |
| 214 }) | |
| 215 if err != memlock.ErrFailedToLock { | |
| 216 break | |
| 217 } | |
| 218 l.Infof("Couldn't obtain lock (try %d) (sleeping 2s)", try+1) | |
| 219 clock.Sleep(c, time.Second*2) | |
| 220 } | |
| 221 if err == memlock.ErrFailedToLock { | |
| 222 l.Infof("Couldn't obtain lock (giving up): %s", err) | |
| 223 err = nil | |
| 224 } | |
| 225 return err | |
| 226 } | |
| 227 | |
| 228 func getBatchByRoot(c context.Context, root *datastore.Key, banSet stringset.Set ) ([]*realMutation, error) { | |
| 229 cfg := GetConfig(c) | |
| 230 ds := datastore.Get(c) | |
| 231 q := datastore.NewQuery("tumble.Mutation").Eq("TargetRoot", root) | |
| 232 toFetch := make([]*realMutation, 0, cfg.ProcessMaxBatchSize) | |
| 233 err := ds.Run(q, func(k *datastore.Key, _ datastore.CursorCB) bool { | |
| 234 if !banSet.Has(k.Encode()) { | |
| 235 toFetch = append(toFetch, &realMutation{ | |
| 236 ID: k.StringID(), | |
| 237 Parent: k.Parent(), | |
| 238 }) | |
| 239 } | |
| 240 return len(toFetch) < cap(toFetch) | |
| 241 }) | |
| 242 return toFetch, err | |
| 243 } | |
| 244 | |
| 245 func loadFilteredMutations(c context.Context, rms []*realMutation) ([]*datastore .Key, []Mutation, error) { | |
| 246 ds := datastore.Get(c) | |
| 247 | |
| 248 mutKeys := make([]*datastore.Key, 0, len(rms)) | |
| 249 muts := make([]Mutation, 0, len(rms)) | |
| 250 err := ds.GetMulti(rms) | |
| 251 me, ok := err.(errors.MultiError) | |
| 252 if !ok && err != nil { | |
| 253 return nil, nil, err | |
| 254 } | |
| 255 | |
| 256 for i, rm := range rms { | |
| 257 err = nil | |
| 258 if me != nil { | |
| 259 err = me[i] | |
| 260 } | |
| 261 if err == nil { | |
| 262 m, err := rm.GetMutation() | |
| 263 if err != nil { | |
| 264 logging.Errorf(c, "couldn't load mutation: %s", err) | |
| 265 continue | |
| 266 } | |
| 267 muts = append(muts, m) | |
| 268 mutKeys = append(mutKeys, ds.KeyForObj(rm)) | |
| 269 } else if err != datastore.ErrNoSuchEntity { | |
| 270 return nil, nil, me | |
| 271 } | |
| 272 } | |
| 273 | |
| 274 return mutKeys, muts, nil | |
| 275 } | |
| 276 | |
| 277 func processRoot(c context.Context, root *datastore.Key, banSet stringset.Set, c ounter *int64) error { | |
| 278 l := logging.Get(c) | |
| 279 | |
| 280 toFetch, err := getBatchByRoot(c, root, banSet) | |
| 281 if err != nil || len(toFetch) == 0 { | |
| 282 return err | |
| 283 } | |
| 284 | |
| 285 mutKeys, muts, err := loadFilteredMutations(c, toFetch) | |
| 286 if err != nil { | |
| 287 return err | |
| 288 } | |
| 289 | |
| 290 select { | |
| 291 case <-c.Done(): | |
| 292 l.Warningf("Lost lock during processRoot") | |
| 293 default: | |
| 294 } | |
| 295 | |
| 296 allShards := map[uint64]struct{}{} | |
| 297 | |
| 298 toDel := make([]*datastore.Key, 0, len(muts)) | |
| 299 numMuts := uint64(0) | |
| 300 err = datastore.Get(txnBuf.FilterRDS(c)).RunInTransaction(func(c context .Context) error { | |
| 301 toDel = toDel[:0] | |
| 302 numMuts = 0 | |
| 303 | |
| 304 for i, m := range muts { | |
| 305 shards, numNewMuts, err := enterTransactionInternal(c, r oot, m.RollForward) | |
| 306 if err != nil { | |
| 307 l.Errorf("Executing decoded gob(%T) failed: %q: %+v", m, err, m) | |
| 308 continue | |
| 309 } | |
| 310 toDel = append(toDel, mutKeys[i]) | |
| 311 numMuts += uint64(numNewMuts) | |
| 312 for shard := range shards { | |
| 313 allShards[shard] = struct{}{} | |
| 314 } | |
| 315 } | |
| 316 | |
| 317 return nil | |
| 318 }, nil) | |
| 319 if err != nil { | |
| 320 l.Errorf("failed running transaction: %s", err) | |
| 321 return err | |
| 322 } | |
| 323 fireTasks(c, allShards) | |
| 324 l.Infof("successfully processed %d mutations, adding %d more", len(toDel ), numMuts) | |
| 325 | |
| 326 if len(toDel) > 0 { | |
| 327 atomic.StoreInt64(counter, int64(len(toDel))) | |
| 328 | |
| 329 for _, k := range toDel { | |
| 330 banSet.Add(k.Encode()) | |
| 331 } | |
| 332 if err := datastore.Get(c).DeleteMulti(toDel); err != nil { | |
| 333 l.Warningf("error deleting finished mutations: %s", err) | |
| 334 } | |
| 335 } | |
| 336 | |
| 337 return nil | |
| 338 } | |
| OLD | NEW |