Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 // Package dsset implements a particular flavor of datastore-backed set. | |
| 16 // | |
| 17 // Due to its internal structure, it requires some maintenance on behalf of the | |
| 18 // caller to periodically cleanup removed items (aka tombstones). | |
| 19 // | |
| 20 // Items added to the set should have unique IDs, at least for the duration of | |
| 21 // some configurable time interval, as defined by TombstonesDelay property. | |
| 22 // It means removed items can't be added back to the set right away (the set | |
| 23 // will think they are already there). This is required to make 'Add' operation | |
| 24 // idempotent. | |
| 25 // | |
| 26 // TombstonesDelay is assumed to be much larger than time scale of all "fast" | |
| 27 // processes in the system, in particular all List+Pop processes. For example, | |
| 28 // if List+Pop is expected to take 1 min, TombstonesDelay should be >> 1 min | |
| 29 // (e.g. 5 min). Setting TombstonesDelay to very large value is harmful though, | |
| 30 // since it may slow down 'List' and 'Pop' (by allowing more garbage that will | |
| 31 // have to be filtered out). | |
| 32 // | |
| 33 // Properties (where N is current size of the set): | |
| 34 // * Batch 'Add' with configurable QPS limit, O(1) performance. | |
| 35 // * Transactional consistent 'Pop' (1 QPS limit), O(N) performance. | |
| 36 // * Non-transactional consistent 'List' (1 QPS limit), O(N) performance. | |
| 37 // * Popped items can't be re-added until their tombstones expire. | |
| 38 // | |
| 39 // These properties make dsset suitable for multiple producers, single consumer | |
| 40 // queues, where order of items is not important, each item has a unique | |
| 41 // identifier, and the queue size is small. | |
| 42 // | |
| 43 // Structurally dsset consists of N+1 entity groups: | |
| 44 // * N separate entity groups that contain N shards of the set. | |
| 45 // * 1 entity group (with a configurable root) that holds tombstones. | |
| 46 // | |
| 47 // It is safe to increase number of shards at any time. Decreasing number of | |
| 48 // shards is dangerous (but can be done with some more coding). | |
| 49 // | |
| 50 // More shards make: | |
| 51 // * Add() less contentious (so it can support more QPS). | |
| 52 // * List() and CleanupStorage() slower and more expensive. | |
| 53 // * Pop() is not affected by number of shards. | |
| 54 package dsset | |
| 55 | |
| 56 import ( | |
| 57 "fmt" | |
| 58 "sync" | |
| 59 "time" | |
| 60 | |
| 61 "golang.org/x/net/context" | |
| 62 | |
| 63 "github.com/luci/gae/service/datastore" | |
| 64 "github.com/luci/luci-go/common/clock" | |
| 65 "github.com/luci/luci-go/common/data/rand/mathrand" | |
| 66 "github.com/luci/luci-go/common/data/stringset" | |
| 67 "github.com/luci/luci-go/common/errors" | |
| 68 "github.com/luci/luci-go/common/retry/transient" | |
| 69 ) | |
| 70 | |
| 71 // batchSize is total number of items to pass to PutMulti or DeleteMulti RPCs. | |
| 72 const batchSize = 500 | |
| 73 | |
| 74 // Set holds a set of Items and uses Tombstones to achieve idempotency of Add. | |
| 75 // | |
| 76 // Producers just call Add(...). | |
| 77 // | |
| 78 // The consumer must run more elaborate algorithm that ensures atomicity of | |
| 79 // 'Pop' and takes care of cleaning up of the garbage. This requires a mix of | |
| 80 // transactional and non-transactional actions: | |
| 81 // | |
| 82 // listing, err := set.List(ctx) | |
| 83 // if err != nil || listing.Empty() { | |
| 84 // return err | |
| 85 // } | |
| 86 // | |
| 87 // if err := dsset.CleanupStorage(ctx, listing.Tombstones); err != nil { | |
| 88 // return err | |
| 89 // } | |
| 90 // | |
| 91 // ... Fetch any additional info associated with 'listing.Items' ... | |
| 92 // | |
| 93 // var tombstones []*dsset.Tombstone | |
| 94 // err = datastore.RunInTransaction(ctx, func(ctx context.Context) error { | |
| 95 // op, err := set.BeginPop(ctx, listing) | |
| 96 // if err != nil { | |
| 97 // return err | |
| 98 // } | |
| 99 // for _, itm := range items { | |
| 100 // if op.Pop(item.ID) { | |
| 101 // // The item was indeed in the set and we've just removed it! | |
| 102 // } else { | |
| 103 // // Some other transaction has popped it already. | |
| 104 // } | |
| 105 // } | |
| 106 // tombstones, err = dsset.FinishPop(ctx, op) | |
| 107 // return err | |
| 108 // }, nil) | |
| 109 // if err == nil { | |
| 110 // dsset.CleanupStorage(ctx, tombstones) // best-effort cleanup | |
| 111 // } | |
| 112 // return err | |
| 113 type Set struct { | |
| 114 ID string // global ID, used to construct datastore keys | |
| 115 ShardCount int // number of entity groups to use for sto rage | |
| 116 TombstonesRoot *datastore.Key // tombstones entity parent key | |
| 117 TombstonesDelay time.Duration // how long to keep tombstones in the set | |
| 118 } | |
| 119 | |
| 120 // Item is what's stored in the set. | |
| 121 type Item struct { | |
| 122 ID string // unique in time identifier of the item | |
| 123 Value []byte // arbitrary value (<1 MB, but preferably much smaller) | |
| 124 } | |
| 125 | |
| 126 // Listing is returned by 'List' call. | |
| 127 // | |
| 128 // It contains actual listing of items in the set, as well as a bunch of service | |
| 129 // information used by other operations ('CleanupStorage' and 'Pop') to keep | |
| 130 // the set in a garbage-free and consistent state. | |
| 131 // | |
| 132 // The only way to construct a correct Listing is to call 'List' method. | |
| 133 // | |
| 134 // See comments for Set struct and List method for more info. | |
| 135 type Listing struct { | |
| 136 Items []Item // all items in the set, in arbitrary order | |
| 137 Tombstones []*Tombstone // tombstones that can be cleaned up now | |
| 138 | |
| 139 set string // parent set ID | |
| 140 producedAt time.Time // when 'List' call was initiated | |
| 141 idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanu p | |
| 142 } | |
| 143 | |
| 144 // Empty is true if both 'Items' and 'Tombstones' are empty | |
| 145 func (l *Listing) Empty() bool { | |
| 146 return len(l.Items) == 0 && len(l.Tombstones) == 0 | |
| 147 } | |
| 148 | |
| 149 // Tombstone is a reference to a deleted item that still lingers in the set. | |
| 150 // | |
| 151 // Tombstones exist to make sure recently popped items do not reappear in the | |
| 152 // set if producers attempt to re-add them. | |
| 153 // | |
| 154 // Its fields are intentionally private to force correct usage of Set's methods. | |
| 155 type Tombstone struct { | |
|
tandrii(chromium)
2017/07/31 18:58:46
I think we don't actually need to expose individua
| |
| 156 id string // deleted item ID | |
| 157 storage []*datastore.Key // itemEntity's to delete in 'CleanupStorage' | |
| 158 old bool // true if tombstone should be popped in 'Pop ' | |
| 159 cleanedUp bool // true if 'CleanupStorage' processed the tom bstone | |
| 160 } | |
| 161 | |
| 162 // Add idempotently adds a bunch of items to the set. | |
| 163 // | |
| 164 // If items with given keys are already in the set, or have been deleted | |
| 165 // recently, they won't be re-added. No error is returned in this case. When | |
| 166 // retrying the call like that, the caller is responsible to pass exact same | |
| 167 // Item.Value, otherwise 'List' may return random variant of the added item. | |
| 168 // | |
| 169 // Writes to some single entity group (not known in advance). If called outside | |
| 170 // of a transaction and the call fails, may add only some subset of items. | |
| 171 // Running inside a transaction makes this operation atomic. | |
| 172 // | |
| 173 // Returns only transient errors. | |
| 174 func (s *Set) Add(c context.Context, items []Item) error { | |
| 175 // Pick a random shard and add all new items there. If this is a retry, they | |
| 176 // may exist in some other shard already. We don't care, they'll be | |
| 177 // deduplicated in 'List'. If added items have been popped already (they have | |
| 178 // tombstones), 'List' will omit them as well. | |
| 179 shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount)) | |
| 180 entities := make([]itemEntity, len(items)) | |
| 181 for i, itm := range items { | |
| 182 entities[i] = itemEntity{ | |
| 183 ID: itm.ID, | |
| 184 Parent: shardRoot, | |
| 185 Value: itm.Value, | |
| 186 } | |
| 187 } | |
| 188 return transient.Tag.Apply(batchOp(len(entities), func(start, end int) e rror { | |
| 189 return datastore.Put(c, entities[start:end]) | |
| 190 })) | |
| 191 } | |
| 192 | |
| 193 // List returns all items that are currently in the set (in arbitrary order), | |
| 194 // as well as a set of tombstones that points to items that were previously | |
| 195 // popped and can be cleaned up now. | |
| 196 // | |
| 197 // Must be called outside of transactions (panics otherwise). Reads many entity | |
| 198 // groups, including TombstonesRoot one. | |
| 199 // | |
| 200 // The set of tombstones to cleanup can be passed to 'CleanupStorage', and | |
| 201 // later to 'BeginPop' (as party of the listing), in that order. Not doing | |
| 202 // so will lead to accumulation of garbage in the set that will slow down 'List' | |
| 203 // and 'Pop'. | |
| 204 // | |
| 205 // Returns only transient errors. | |
| 206 func (s *Set) List(c context.Context) (*Listing, error) { | |
| 207 if datastore.CurrentTransaction(c) != nil { | |
| 208 panic("dsset.Set.List must be called outside of a transaction") | |
| 209 } | |
| 210 now := clock.Now(c).UTC() | |
| 211 | |
| 212 // Fetch all shards (via consistent ancestor queries) and all tombstones . | |
| 213 | |
| 214 shards := make([][]*itemEntity, s.ShardCount) | |
| 215 tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
| 216 | |
| 217 wg := sync.WaitGroup{} | |
| 218 wg.Add(1 + s.ShardCount) | |
| 219 errs := errors.NewLazyMultiError(s.ShardCount + 1) | |
| 220 | |
| 221 go func() { | |
| 222 defer wg.Done() | |
| 223 if err := datastore.Get(c, &tombsEntity); err != nil && err != d atastore.ErrNoSuchEntity { | |
| 224 errs.Assign(0, err) | |
| 225 } | |
| 226 }() | |
| 227 | |
| 228 for i := 0; i < s.ShardCount; i++ { | |
| 229 go func(i int) { | |
| 230 defer wg.Done() | |
| 231 q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRo ot(c, i)) | |
| 232 errs.Assign(i+1, datastore.GetAll(c, q, &shards[i])) | |
| 233 }(i) | |
| 234 } | |
| 235 | |
| 236 wg.Wait() | |
| 237 if err := errs.Get(); err != nil { | |
| 238 return nil, transient.Tag.Apply(err) | |
| 239 } | |
| 240 | |
| 241 // Mapping "item ID" => "list of entities to delete to remove it". This is | |
| 242 // eventually used by 'CleanupStorage'. Under normal circumstances, the list | |
| 243 // has only one item, but there can be more if 'Add' call was retried (s o the | |
| 244 // item ends up in multiple different shards). | |
| 245 idToKeys := map[string][]*datastore.Key{} | |
| 246 for _, shard := range shards { | |
| 247 for _, e := range shard { | |
| 248 idToKeys[e.ID] = append(idToKeys[e.ID], datastore.KeyFor Obj(c, e)) | |
| 249 } | |
| 250 } | |
| 251 | |
| 252 // A set of items we pretend not to see. Initially all tombstoned ones. | |
| 253 // | |
| 254 // Since we are iterating over tombstone list anyway, find all sufficien tly | |
| 255 // old tombstones or tombstones that still have storage associated with them. | |
| 256 // We return them to the caller, so they can be cleaned up: | |
| 257 // * 'CleanupStorage' makes sure 'storage' entities are deleted. | |
| 258 // * 'BeginPop' completely erases old tombstones. | |
| 259 var tombs []*Tombstone | |
| 260 ignore := stringset.New(len(tombsEntity.Tombstones)) | |
| 261 for _, t := range tombsEntity.Tombstones { | |
| 262 ignore.Add(t.ID) | |
| 263 old := now.Sub(t.Tombstoned) > s.TombstonesDelay | |
| 264 if storage := idToKeys[t.ID]; len(storage) > 0 || old { | |
| 265 tombs = append(tombs, &Tombstone{ | |
| 266 id: t.ID, | |
| 267 storage: storage, | |
| 268 old: old, // if true, BeginPop will delete t his tombstone | |
| 269 }) | |
| 270 } | |
| 271 } | |
| 272 | |
| 273 // Join all shards, throwing away tombstoned and duplicated items. | |
| 274 var items []Item | |
| 275 for _, shard := range shards { | |
| 276 for _, e := range shard { | |
| 277 if !ignore.Has(e.ID) { | |
| 278 items = append(items, Item{ | |
| 279 ID: e.ID, | |
| 280 Value: e.Value, | |
| 281 }) | |
| 282 ignore.Add(e.ID) | |
| 283 } | |
| 284 } | |
| 285 } | |
| 286 | |
| 287 return &Listing{ | |
| 288 Items: items, | |
| 289 Tombstones: tombs, | |
| 290 set: s.ID, | |
| 291 producedAt: now, | |
| 292 idToKeys: idToKeys, | |
| 293 }, nil | |
| 294 } | |
| 295 | |
| 296 // PopOp is an in-progress 'Pop' operation. | |
| 297 // | |
| 298 // See BeginPop. | |
| 299 type PopOp struct { | |
| 300 ctx context.Context // datastore context to use for thi s op | |
| 301 txn datastore.Transaction // a transaction that started Begin Pop | |
| 302 now time.Time // popping time for all popped item s | |
| 303 dirty bool // true if the tombstone map was mo dified | |
| 304 finished bool // true if finished already | |
| 305 entity *tombstonesEntity // entity with tombstones | |
| 306 tombs map[string]tombstone // entity.Tombstones in a map form | |
| 307 idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanup | |
| 308 popped []*Tombstone // new tombstones for popped items | |
| 309 } | |
| 310 | |
| 311 // BeginPop initiates 'Pop' operation. | |
| 312 // | |
| 313 // Pop operation is used to transactionally remove items from the set, as well | |
| 314 // as cleanup old tombstones. It must be finished with 'dsset.FinishPop', even | |
| 315 // if no items have been popped: the internal state still can change in this | |
| 316 // case, since 'BeginPop' cleans up old tombstones. Even more, it is necessary | |
| 317 // to do 'Pop' if listing contains non-empty set of tombstones (regardless of | |
| 318 // whether the caller wants to actually pop any items from the set). This is | |
| 319 // part of the required set maintenance. | |
| 320 // | |
| 321 // Requires a transaction. Modifies TombstonesRoot entity group (and only it). | |
| 322 // | |
| 323 // Returns only transient errors. Such errors usually mean that the entire pop | |
| 324 // sequence ('List' + 'Pop') should be retried. | |
| 325 func (s *Set) BeginPop(c context.Context, listing *Listing) (*PopOp, error) { | |
| 326 if listing.set != s.ID { | |
| 327 panic("passed Listing from another set") | |
| 328 } | |
| 329 txn := datastore.CurrentTransaction(c) | |
| 330 if txn == nil { | |
| 331 panic("dsset.Set.BeginPop must be called inside a transaction") | |
| 332 } | |
| 333 | |
| 334 now := clock.Now(c).UTC() | |
| 335 if age := now.Sub(listing.producedAt); age > s.TombstonesDelay { | |
| 336 return nil, transient.Tag.Apply(fmt.Errorf("the listing is stale (%s > %s)", age, s.TombstonesDelay)) | |
| 337 } | |
| 338 | |
| 339 entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
| 340 if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNo SuchEntity { | |
| 341 return nil, transient.Tag.Apply(err) | |
| 342 } | |
| 343 | |
| 344 // The data in tombstonesEntity, in map form. | |
| 345 tombs := make(map[string]tombstone, len(entity.Tombstones)) | |
| 346 for _, t := range entity.Tombstones { | |
| 347 tombs[t.ID] = t | |
| 348 } | |
| 349 | |
| 350 // Throw away old tombstones right away. | |
| 351 dirty := false | |
| 352 for _, tomb := range listing.Tombstones { | |
| 353 if tomb.old { | |
| 354 if !tomb.cleanedUp { | |
| 355 panic("trying to remove Tombstone that wasn't cl eaned up") | |
| 356 } | |
| 357 if _, hasTomb := tombs[tomb.id]; hasTomb { | |
| 358 delete(tombs, tomb.id) | |
| 359 dirty = true | |
| 360 } | |
| 361 } | |
| 362 } | |
| 363 | |
| 364 return &PopOp{ | |
| 365 ctx: c, | |
| 366 txn: txn, | |
| 367 now: now, | |
| 368 dirty: dirty, | |
| 369 entity: entity, | |
| 370 tombs: tombs, | |
| 371 idToKeys: listing.idToKeys, | |
| 372 }, nil | |
| 373 } | |
| 374 | |
| 375 // CanPop returns true if the given item can be popped from the set. | |
| 376 // | |
| 377 // Returns false if this item has been popped before (perhaps in another | |
| 378 // transaction), or it's not in the the listing passed to BeginPop. | |
| 379 func (p *PopOp) CanPop(id string) bool { | |
| 380 if _, hasTomb := p.tombs[id]; hasTomb { | |
| 381 return false // already popped by someone else | |
| 382 } | |
| 383 if _, present := p.idToKeys[id]; present { | |
| 384 return true // listed in the set | |
| 385 } | |
| 386 return false | |
| 387 } | |
| 388 | |
| 389 // Pop removed the item from the set and returns true if it was there. | |
| 390 // | |
| 391 // Returns false if this item has been popped before (perhaps in another | |
| 392 // transaction), or it's not in the the listing passed to BeginPop. | |
| 393 func (p *PopOp) Pop(id string) bool { | |
| 394 if p.finished { | |
| 395 panic("the operation has already been finished") | |
| 396 } | |
| 397 if !p.CanPop(id) { | |
| 398 return false | |
| 399 } | |
| 400 p.tombs[id] = tombstone{ID: id, Tombstoned: p.now} | |
| 401 p.popped = append(p.popped, &Tombstone{ | |
| 402 id: id, | |
| 403 storage: p.idToKeys[id], | |
| 404 old: false, // BeingPop will ignore this fresh tombstone | |
| 405 }) | |
| 406 p.dirty = true | |
| 407 return true | |
| 408 } | |
| 409 | |
| 410 // makeTombstonesEntity is used internally by FinishPop. | |
| 411 func (p *PopOp) makeTombstonesEntity() *tombstonesEntity { | |
| 412 p.entity.Tombstones = p.entity.Tombstones[:0] | |
| 413 for _, tomb := range p.tombs { | |
| 414 p.entity.Tombstones = append(p.entity.Tombstones, tomb) | |
| 415 } | |
| 416 return p.entity | |
| 417 } | |
| 418 | |
| 419 //////////////////////////////////////////////////////////////////////////////// | |
| 420 | |
| 421 // FinishPop completes one or more pop operations (for different sets) by | |
| 422 // submitting changes to datastore. | |
| 423 // | |
| 424 // Must be called within same transaction that called BeginPop. | |
|
tandrii(chromium)
2017/07/31 18:58:46
the same
| |
| 425 // | |
| 426 // It returns a list of tombstones for popped items. The storage used by the | |
| 427 // items can be reclaimed right away by calling 'CleanupStorage'. It is fine | |
| 428 // not to do so, 'List' will eventually return all tombstones that need cleaning | |
| 429 // anyway. Calling 'CleanupStorage' as best effort is still beneficial though, | |
| 430 // since it will reduce the amount of garbage in the set. | |
| 431 // | |
| 432 // Returns only transient errors. | |
| 433 func FinishPop(ctx context.Context, ops ...*PopOp) (tombs []*Tombstone, err erro r) { | |
| 434 txn := datastore.CurrentTransaction(ctx) | |
| 435 | |
| 436 entities := []*tombstonesEntity{} | |
| 437 tombsCount := 0 | |
| 438 for _, op := range ops { | |
| 439 if op.finished { | |
| 440 panic("the operation has already been finished") | |
| 441 } | |
| 442 if op.txn != txn { | |
| 443 panic("wrong transaction") | |
| 444 } | |
| 445 if op.dirty { | |
| 446 entities = append(entities, op.makeTombstonesEntity()) | |
| 447 tombsCount += len(op.popped) | |
| 448 } | |
| 449 } | |
| 450 | |
| 451 if err := datastore.Put(ctx, entities); err != nil { | |
| 452 return nil, transient.Tag.Apply(err) | |
| 453 } | |
| 454 | |
| 455 if tombsCount != 0 { | |
| 456 tombs = make([]*Tombstone, 0, tombsCount) | |
| 457 } | |
| 458 for _, op := range ops { | |
| 459 tombs = append(tombs, op.popped...) | |
| 460 op.finished = true | |
| 461 } | |
| 462 | |
| 463 return tombs, nil | |
| 464 } | |
| 465 | |
| 466 // CleanupStorage deletes entities used to store items under given tombstones. | |
| 467 // | |
| 468 // This is datastore's MultiDelete RPC in disguise. Touches many entity groups. | |
| 469 // Must be called outside of transactions. Idempotent. | |
| 470 // | |
| 471 // Can handle tombstones from multiple different sets at once. This is preferred | |
| 472 // over calling 'CleanupStorage' multiple times (once per set), since it | |
| 473 // collapses multiple datastore RPCs into one. | |
| 474 // | |
| 475 // This MUST be called before tombstones returned by 'List' are removed in | |
| 476 // 'Pop'. Failure to do so will make items reappear in the set. | |
| 477 // | |
| 478 // Returns only transient errors. There's no way to know which items were | |
| 479 // removed and which weren't in case of an error. | |
| 480 func CleanupStorage(c context.Context, cleanup ...[]*Tombstone) error { | |
| 481 if datastore.CurrentTransaction(c) != nil { | |
| 482 panic("dsset.CleanupStorage must be called outside of a transact ion") | |
| 483 } | |
| 484 | |
| 485 keys := []*datastore.Key{} | |
| 486 for _, tombs := range cleanup { | |
| 487 for _, tomb := range tombs { | |
| 488 keys = append(keys, tomb.storage...) | |
| 489 } | |
| 490 } | |
| 491 | |
| 492 err := batchOp(len(keys), func(start, end int) error { | |
| 493 return datastore.Delete(c, keys[start:end]) | |
| 494 }) | |
| 495 if err != nil { | |
| 496 return transient.Tag.Apply(err) | |
| 497 } | |
| 498 | |
| 499 for _, tombs := range cleanup { | |
| 500 for _, tomb := range tombs { | |
| 501 tomb.cleanedUp = true | |
| 502 tomb.storage = nil | |
| 503 } | |
| 504 } | |
| 505 return nil | |
| 506 } | |
| 507 | |
| 508 //////////////////////////////////////////////////////////////////////////////// | |
| 509 | |
| 510 type itemEntity struct { | |
| 511 _kind string `gae:"$kind,dsset.Item"` | |
| 512 | |
| 513 ID string `gae:"$id"` | |
| 514 Parent *datastore.Key `gae:"$parent"` | |
| 515 Value []byte `gae:",noindex"` | |
| 516 } | |
| 517 | |
| 518 type tombstonesEntity struct { | |
| 519 _kind string `gae:"$kind,dsset.Tombstones"` | |
| 520 | |
| 521 ID string `gae:"$id"` | |
| 522 Parent *datastore.Key `gae:"$parent"` | |
| 523 Tombstones []tombstone `gae:",noindex"` | |
| 524 } | |
| 525 | |
| 526 type tombstone struct { | |
| 527 ID string // ID of tombstoned item | |
| 528 Tombstoned time.Time // when it was popped | |
| 529 } | |
| 530 | |
| 531 // shardRoot returns entity group key to use for a given shard. | |
| 532 func (s *Set) shardRoot(c context.Context, n int) *datastore.Key { | |
| 533 return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil) | |
| 534 } | |
| 535 | |
| 536 // batchOp splits 'total' into batches and calls 'op' in parallel. | |
| 537 // | |
| 538 // Doesn't preserve order of returned errors! Don't try to deconstruct the | |
| 539 // returned multi error, the position of individual errors there does not | |
| 540 // correlate with the original array. | |
| 541 func batchOp(total int, op func(start, end int) error) error { | |
| 542 switch { | |
| 543 case total == 0: | |
| 544 return nil | |
| 545 case total <= batchSize: | |
| 546 return op(0, total) | |
| 547 } | |
| 548 | |
| 549 errs := make(chan error) | |
| 550 ops := 0 | |
| 551 offset := 0 | |
| 552 for total > 0 { | |
| 553 count := batchSize | |
| 554 if count > total { | |
| 555 count = total | |
| 556 } | |
| 557 go func(start, end int) { | |
| 558 errs <- op(start, end) | |
| 559 }(offset, offset+count) | |
| 560 offset += count | |
| 561 total -= count | |
| 562 ops++ | |
| 563 } | |
| 564 | |
| 565 var all errors.MultiError | |
| 566 for i := 0; i < ops; i++ { | |
| 567 err := <-errs | |
| 568 if merr, yep := err.(errors.MultiError); yep { | |
| 569 for _, e := range merr { | |
| 570 if e != nil { | |
| 571 all = append(all, e) | |
| 572 } | |
| 573 } | |
| 574 } else if err != nil { | |
| 575 all = append(all, err) | |
| 576 } | |
| 577 } | |
| 578 | |
| 579 if len(all) == 0 { | |
| 580 return nil | |
| 581 } | |
| 582 return all | |
| 583 } | |
| OLD | NEW |