OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2017 The LUCI Authors. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 // Package dsset implements a particular flavor of datastore-backed set. | |
16 // | |
17 // Due to its internal structure, it requires some maintenance on behalf of the | |
18 // caller to periodically cleanup removed items (aka tombstones). | |
19 // | |
20 // Items added to the set should have unique IDs, at least for the duration of | |
21 // some configurable time interval, as defined by TombstonesDelay property. | |
22 // It means removed items can't be added back to the set right away (the set | |
23 // will think they are already there). This is required to make 'Add' operation | |
24 // idempotent. | |
25 // | |
26 // TombstonesDelay is assumed to be much larger than time scale of all "fast" | |
27 // processes in the system, in particular all List+Pop processes. For example, | |
28 // if List+Pop is expected to take 1 min, TombstonesDelay should be >> 1 min | |
29 // (e.g. 5 min). Setting TombstonesDelay to very large value is harmful though, | |
30 // since it may slow down 'List' and 'Pop' (by allowing more garbage that will | |
31 // have to be filtered out). | |
32 // | |
33 // Properties (where N is current size of the set): | |
34 // * Batch 'Add' with configurable QPS limit, O(1) performance. | |
35 // * Transactional consistent 'Pop' (1 QPS limit), O(N) performance. | |
36 // * Non-transactional consistent 'List' (1 QPS limit), O(N) performance. | |
37 // * Popped items can't be re-added until their tombstones expire. | |
38 // | |
39 // These properties make dsset suitable for multiple producers, single consumer | |
40 // queues, where order of items is not important, each item has a unique | |
41 // identifier, and the queue size is small. | |
42 // | |
43 // Structurally dsset consists of N+1 entity groups: | |
44 // * N separate entity groups that contain N shards of the set. | |
45 // * 1 entity group (with a configurable root) that holds tombstones. | |
46 // | |
47 // It is safe to increase number of shards at any time. Decreasing number of | |
48 // shards is dangerous (but can be done with some more coding). | |
49 // | |
50 // More shards make: | |
51 // * Add() less contentious (so it can support more QPS). | |
52 // * List() and CleanupStorage() slower and more expensive. | |
53 // * Pop() is not affected by number of shards. | |
54 package dsset | |
55 | |
56 import ( | |
57 "fmt" | |
58 "sync" | |
59 "time" | |
60 | |
61 "golang.org/x/net/context" | |
62 | |
63 "github.com/luci/gae/service/datastore" | |
64 "github.com/luci/luci-go/common/clock" | |
65 "github.com/luci/luci-go/common/data/rand/mathrand" | |
66 "github.com/luci/luci-go/common/data/stringset" | |
67 "github.com/luci/luci-go/common/errors" | |
68 "github.com/luci/luci-go/common/retry/transient" | |
69 ) | |
70 | |
71 // batchSize is total number of items to pass to PutMulti or DeleteMulti RPCs. | |
72 const batchSize = 500 | |
73 | |
74 // Set holds a set of Items and uses Tombstones to achieve idempotency of Add. | |
75 // | |
76 // Producers just call Add(...). | |
77 // | |
78 // The consumer must run more elaborate algorithm that ensures atomicity of | |
79 // 'Pop' and takes care of cleaning up of the garbage. This requires a mix of | |
80 // transactional and non-transactional actions: | |
81 // | |
82 // listing, err := set.List(ctx) | |
83 // if err != nil || listing.Empty() { | |
84 // return err | |
85 // } | |
86 // | |
87 // if err := dsset.CleanupStorage(ctx, listing.Tombstones); err != nil { | |
88 // return err | |
89 // } | |
90 // | |
91 // ... Fetch any additional info associated with 'listing.Items' ... | |
92 // | |
93 // var tombstones []*dsset.Tombstone | |
94 // err = datastore.RunInTransaction(ctx, func(ctx context.Context) error { | |
95 // op, err := set.BeginPop(ctx, listing) | |
96 // if err != nil { | |
97 // return err | |
98 // } | |
99 // for _, itm := range items { | |
100 // if op.Pop(item.ID) { | |
101 // // The item was indeed in the set and we've just removed it! | |
102 // } else { | |
103 // // Some other transaction has popped it already. | |
104 // } | |
105 // } | |
106 // tombstones, err = dsset.FinishPop(ctx, op) | |
107 // return err | |
108 // }, nil) | |
109 // if err == nil { | |
110 // dsset.CleanupStorage(ctx, tombstones) // best-effort cleanup | |
111 // } | |
112 // return err | |
113 type Set struct { | |
114 ID string // global ID, used to construct datastore keys | |
115 ShardCount int // number of entity groups to use for sto rage | |
116 TombstonesRoot *datastore.Key // tombstones entity parent key | |
117 TombstonesDelay time.Duration // how long to keep tombstones in the set | |
118 } | |
119 | |
120 // Item is what's stored in the set. | |
121 type Item struct { | |
122 ID string // unique in time identifier of the item | |
123 Value []byte // arbitrary value (<1 MB, but preferably much smaller) | |
124 } | |
125 | |
126 // Listing is returned by 'List' call. | |
127 // | |
128 // It contains actual listing of items in the set, as well as a bunch of service | |
129 // information used by other operations ('CleanupStorage' and 'Pop') to keep | |
130 // the set in a garbage-free and consistent state. | |
131 // | |
132 // The only way to construct a correct Listing is to call 'List' method. | |
133 // | |
134 // See comments for Set struct and List method for more info. | |
135 type Listing struct { | |
136 Items []Item // all items in the set, in arbitrary order | |
137 Tombstones []*Tombstone // tombstones that can be cleaned up now | |
138 | |
139 set string // parent set ID | |
140 producedAt time.Time // when 'List' call was initiated | |
141 idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanu p | |
142 } | |
143 | |
144 // Empty is true if both 'Items' and 'Tombstones' are empty | |
145 func (l *Listing) Empty() bool { | |
146 return len(l.Items) == 0 && len(l.Tombstones) == 0 | |
147 } | |
148 | |
149 // Tombstone is a reference to a deleted item that still lingers in the set. | |
150 // | |
151 // Tombstones exist to make sure recently popped items do not reappear in the | |
152 // set if producers attempt to re-add them. | |
153 // | |
154 // Its fields are intentionally private to force correct usage of Set's methods. | |
155 type Tombstone struct { | |
tandrii(chromium)
2017/07/31 18:58:46
I think we don't actually need to expose individua
| |
156 id string // deleted item ID | |
157 storage []*datastore.Key // itemEntity's to delete in 'CleanupStorage' | |
158 old bool // true if tombstone should be popped in 'Pop ' | |
159 cleanedUp bool // true if 'CleanupStorage' processed the tom bstone | |
160 } | |
161 | |
162 // Add idempotently adds a bunch of items to the set. | |
163 // | |
164 // If items with given keys are already in the set, or have been deleted | |
165 // recently, they won't be re-added. No error is returned in this case. When | |
166 // retrying the call like that, the caller is responsible to pass exact same | |
167 // Item.Value, otherwise 'List' may return random variant of the added item. | |
168 // | |
169 // Writes to some single entity group (not known in advance). If called outside | |
170 // of a transaction and the call fails, may add only some subset of items. | |
171 // Running inside a transaction makes this operation atomic. | |
172 // | |
173 // Returns only transient errors. | |
174 func (s *Set) Add(c context.Context, items []Item) error { | |
175 // Pick a random shard and add all new items there. If this is a retry, they | |
176 // may exist in some other shard already. We don't care, they'll be | |
177 // deduplicated in 'List'. If added items have been popped already (they have | |
178 // tombstones), 'List' will omit them as well. | |
179 shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount)) | |
180 entities := make([]itemEntity, len(items)) | |
181 for i, itm := range items { | |
182 entities[i] = itemEntity{ | |
183 ID: itm.ID, | |
184 Parent: shardRoot, | |
185 Value: itm.Value, | |
186 } | |
187 } | |
188 return transient.Tag.Apply(batchOp(len(entities), func(start, end int) e rror { | |
189 return datastore.Put(c, entities[start:end]) | |
190 })) | |
191 } | |
192 | |
193 // List returns all items that are currently in the set (in arbitrary order), | |
194 // as well as a set of tombstones that points to items that were previously | |
195 // popped and can be cleaned up now. | |
196 // | |
197 // Must be called outside of transactions (panics otherwise). Reads many entity | |
198 // groups, including TombstonesRoot one. | |
199 // | |
200 // The set of tombstones to cleanup can be passed to 'CleanupStorage', and | |
201 // later to 'BeginPop' (as party of the listing), in that order. Not doing | |
202 // so will lead to accumulation of garbage in the set that will slow down 'List' | |
203 // and 'Pop'. | |
204 // | |
205 // Returns only transient errors. | |
206 func (s *Set) List(c context.Context) (*Listing, error) { | |
207 if datastore.CurrentTransaction(c) != nil { | |
208 panic("dsset.Set.List must be called outside of a transaction") | |
209 } | |
210 now := clock.Now(c).UTC() | |
211 | |
212 // Fetch all shards (via consistent ancestor queries) and all tombstones . | |
213 | |
214 shards := make([][]*itemEntity, s.ShardCount) | |
215 tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
216 | |
217 wg := sync.WaitGroup{} | |
218 wg.Add(1 + s.ShardCount) | |
219 errs := errors.NewLazyMultiError(s.ShardCount + 1) | |
220 | |
221 go func() { | |
222 defer wg.Done() | |
223 if err := datastore.Get(c, &tombsEntity); err != nil && err != d atastore.ErrNoSuchEntity { | |
224 errs.Assign(0, err) | |
225 } | |
226 }() | |
227 | |
228 for i := 0; i < s.ShardCount; i++ { | |
229 go func(i int) { | |
230 defer wg.Done() | |
231 q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRo ot(c, i)) | |
232 errs.Assign(i+1, datastore.GetAll(c, q, &shards[i])) | |
233 }(i) | |
234 } | |
235 | |
236 wg.Wait() | |
237 if err := errs.Get(); err != nil { | |
238 return nil, transient.Tag.Apply(err) | |
239 } | |
240 | |
241 // Mapping "item ID" => "list of entities to delete to remove it". This is | |
242 // eventually used by 'CleanupStorage'. Under normal circumstances, the list | |
243 // has only one item, but there can be more if 'Add' call was retried (s o the | |
244 // item ends up in multiple different shards). | |
245 idToKeys := map[string][]*datastore.Key{} | |
246 for _, shard := range shards { | |
247 for _, e := range shard { | |
248 idToKeys[e.ID] = append(idToKeys[e.ID], datastore.KeyFor Obj(c, e)) | |
249 } | |
250 } | |
251 | |
252 // A set of items we pretend not to see. Initially all tombstoned ones. | |
253 // | |
254 // Since we are iterating over tombstone list anyway, find all sufficien tly | |
255 // old tombstones or tombstones that still have storage associated with them. | |
256 // We return them to the caller, so they can be cleaned up: | |
257 // * 'CleanupStorage' makes sure 'storage' entities are deleted. | |
258 // * 'BeginPop' completely erases old tombstones. | |
259 var tombs []*Tombstone | |
260 ignore := stringset.New(len(tombsEntity.Tombstones)) | |
261 for _, t := range tombsEntity.Tombstones { | |
262 ignore.Add(t.ID) | |
263 old := now.Sub(t.Tombstoned) > s.TombstonesDelay | |
264 if storage := idToKeys[t.ID]; len(storage) > 0 || old { | |
265 tombs = append(tombs, &Tombstone{ | |
266 id: t.ID, | |
267 storage: storage, | |
268 old: old, // if true, BeginPop will delete t his tombstone | |
269 }) | |
270 } | |
271 } | |
272 | |
273 // Join all shards, throwing away tombstoned and duplicated items. | |
274 var items []Item | |
275 for _, shard := range shards { | |
276 for _, e := range shard { | |
277 if !ignore.Has(e.ID) { | |
278 items = append(items, Item{ | |
279 ID: e.ID, | |
280 Value: e.Value, | |
281 }) | |
282 ignore.Add(e.ID) | |
283 } | |
284 } | |
285 } | |
286 | |
287 return &Listing{ | |
288 Items: items, | |
289 Tombstones: tombs, | |
290 set: s.ID, | |
291 producedAt: now, | |
292 idToKeys: idToKeys, | |
293 }, nil | |
294 } | |
295 | |
296 // PopOp is an in-progress 'Pop' operation. | |
297 // | |
298 // See BeginPop. | |
299 type PopOp struct { | |
300 ctx context.Context // datastore context to use for thi s op | |
301 txn datastore.Transaction // a transaction that started Begin Pop | |
302 now time.Time // popping time for all popped item s | |
303 dirty bool // true if the tombstone map was mo dified | |
304 finished bool // true if finished already | |
305 entity *tombstonesEntity // entity with tombstones | |
306 tombs map[string]tombstone // entity.Tombstones in a map form | |
307 idToKeys map[string][]*datastore.Key // ID -> datastore keys to cleanup | |
308 popped []*Tombstone // new tombstones for popped items | |
309 } | |
310 | |
311 // BeginPop initiates 'Pop' operation. | |
312 // | |
313 // Pop operation is used to transactionally remove items from the set, as well | |
314 // as cleanup old tombstones. It must be finished with 'dsset.FinishPop', even | |
315 // if no items have been popped: the internal state still can change in this | |
316 // case, since 'BeginPop' cleans up old tombstones. Even more, it is necessary | |
317 // to do 'Pop' if listing contains non-empty set of tombstones (regardless of | |
318 // whether the caller wants to actually pop any items from the set). This is | |
319 // part of the required set maintenance. | |
320 // | |
321 // Requires a transaction. Modifies TombstonesRoot entity group (and only it). | |
322 // | |
323 // Returns only transient errors. Such errors usually mean that the entire pop | |
324 // sequence ('List' + 'Pop') should be retried. | |
325 func (s *Set) BeginPop(c context.Context, listing *Listing) (*PopOp, error) { | |
326 if listing.set != s.ID { | |
327 panic("passed Listing from another set") | |
328 } | |
329 txn := datastore.CurrentTransaction(c) | |
330 if txn == nil { | |
331 panic("dsset.Set.BeginPop must be called inside a transaction") | |
332 } | |
333 | |
334 now := clock.Now(c).UTC() | |
335 if age := now.Sub(listing.producedAt); age > s.TombstonesDelay { | |
336 return nil, transient.Tag.Apply(fmt.Errorf("the listing is stale (%s > %s)", age, s.TombstonesDelay)) | |
337 } | |
338 | |
339 entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
340 if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNo SuchEntity { | |
341 return nil, transient.Tag.Apply(err) | |
342 } | |
343 | |
344 // The data in tombstonesEntity, in map form. | |
345 tombs := make(map[string]tombstone, len(entity.Tombstones)) | |
346 for _, t := range entity.Tombstones { | |
347 tombs[t.ID] = t | |
348 } | |
349 | |
350 // Throw away old tombstones right away. | |
351 dirty := false | |
352 for _, tomb := range listing.Tombstones { | |
353 if tomb.old { | |
354 if !tomb.cleanedUp { | |
355 panic("trying to remove Tombstone that wasn't cl eaned up") | |
356 } | |
357 if _, hasTomb := tombs[tomb.id]; hasTomb { | |
358 delete(tombs, tomb.id) | |
359 dirty = true | |
360 } | |
361 } | |
362 } | |
363 | |
364 return &PopOp{ | |
365 ctx: c, | |
366 txn: txn, | |
367 now: now, | |
368 dirty: dirty, | |
369 entity: entity, | |
370 tombs: tombs, | |
371 idToKeys: listing.idToKeys, | |
372 }, nil | |
373 } | |
374 | |
375 // CanPop returns true if the given item can be popped from the set. | |
376 // | |
377 // Returns false if this item has been popped before (perhaps in another | |
378 // transaction), or it's not in the the listing passed to BeginPop. | |
379 func (p *PopOp) CanPop(id string) bool { | |
380 if _, hasTomb := p.tombs[id]; hasTomb { | |
381 return false // already popped by someone else | |
382 } | |
383 if _, present := p.idToKeys[id]; present { | |
384 return true // listed in the set | |
385 } | |
386 return false | |
387 } | |
388 | |
389 // Pop removed the item from the set and returns true if it was there. | |
390 // | |
391 // Returns false if this item has been popped before (perhaps in another | |
392 // transaction), or it's not in the the listing passed to BeginPop. | |
393 func (p *PopOp) Pop(id string) bool { | |
394 if p.finished { | |
395 panic("the operation has already been finished") | |
396 } | |
397 if !p.CanPop(id) { | |
398 return false | |
399 } | |
400 p.tombs[id] = tombstone{ID: id, Tombstoned: p.now} | |
401 p.popped = append(p.popped, &Tombstone{ | |
402 id: id, | |
403 storage: p.idToKeys[id], | |
404 old: false, // BeingPop will ignore this fresh tombstone | |
405 }) | |
406 p.dirty = true | |
407 return true | |
408 } | |
409 | |
410 // makeTombstonesEntity is used internally by FinishPop. | |
411 func (p *PopOp) makeTombstonesEntity() *tombstonesEntity { | |
412 p.entity.Tombstones = p.entity.Tombstones[:0] | |
413 for _, tomb := range p.tombs { | |
414 p.entity.Tombstones = append(p.entity.Tombstones, tomb) | |
415 } | |
416 return p.entity | |
417 } | |
418 | |
419 //////////////////////////////////////////////////////////////////////////////// | |
420 | |
421 // FinishPop completes one or more pop operations (for different sets) by | |
422 // submitting changes to datastore. | |
423 // | |
424 // Must be called within same transaction that called BeginPop. | |
tandrii(chromium)
2017/07/31 18:58:46
the same
| |
425 // | |
426 // It returns a list of tombstones for popped items. The storage used by the | |
427 // items can be reclaimed right away by calling 'CleanupStorage'. It is fine | |
428 // not to do so, 'List' will eventually return all tombstones that need cleaning | |
429 // anyway. Calling 'CleanupStorage' as best effort is still beneficial though, | |
430 // since it will reduce the amount of garbage in the set. | |
431 // | |
432 // Returns only transient errors. | |
433 func FinishPop(ctx context.Context, ops ...*PopOp) (tombs []*Tombstone, err erro r) { | |
434 txn := datastore.CurrentTransaction(ctx) | |
435 | |
436 entities := []*tombstonesEntity{} | |
437 tombsCount := 0 | |
438 for _, op := range ops { | |
439 if op.finished { | |
440 panic("the operation has already been finished") | |
441 } | |
442 if op.txn != txn { | |
443 panic("wrong transaction") | |
444 } | |
445 if op.dirty { | |
446 entities = append(entities, op.makeTombstonesEntity()) | |
447 tombsCount += len(op.popped) | |
448 } | |
449 } | |
450 | |
451 if err := datastore.Put(ctx, entities); err != nil { | |
452 return nil, transient.Tag.Apply(err) | |
453 } | |
454 | |
455 if tombsCount != 0 { | |
456 tombs = make([]*Tombstone, 0, tombsCount) | |
457 } | |
458 for _, op := range ops { | |
459 tombs = append(tombs, op.popped...) | |
460 op.finished = true | |
461 } | |
462 | |
463 return tombs, nil | |
464 } | |
465 | |
466 // CleanupStorage deletes entities used to store items under given tombstones. | |
467 // | |
468 // This is datastore's MultiDelete RPC in disguise. Touches many entity groups. | |
469 // Must be called outside of transactions. Idempotent. | |
470 // | |
471 // Can handle tombstones from multiple different sets at once. This is preferred | |
472 // over calling 'CleanupStorage' multiple times (once per set), since it | |
473 // collapses multiple datastore RPCs into one. | |
474 // | |
475 // This MUST be called before tombstones returned by 'List' are removed in | |
476 // 'Pop'. Failure to do so will make items reappear in the set. | |
477 // | |
478 // Returns only transient errors. There's no way to know which items were | |
479 // removed and which weren't in case of an error. | |
480 func CleanupStorage(c context.Context, cleanup ...[]*Tombstone) error { | |
481 if datastore.CurrentTransaction(c) != nil { | |
482 panic("dsset.CleanupStorage must be called outside of a transact ion") | |
483 } | |
484 | |
485 keys := []*datastore.Key{} | |
486 for _, tombs := range cleanup { | |
487 for _, tomb := range tombs { | |
488 keys = append(keys, tomb.storage...) | |
489 } | |
490 } | |
491 | |
492 err := batchOp(len(keys), func(start, end int) error { | |
493 return datastore.Delete(c, keys[start:end]) | |
494 }) | |
495 if err != nil { | |
496 return transient.Tag.Apply(err) | |
497 } | |
498 | |
499 for _, tombs := range cleanup { | |
500 for _, tomb := range tombs { | |
501 tomb.cleanedUp = true | |
502 tomb.storage = nil | |
503 } | |
504 } | |
505 return nil | |
506 } | |
507 | |
508 //////////////////////////////////////////////////////////////////////////////// | |
509 | |
510 type itemEntity struct { | |
511 _kind string `gae:"$kind,dsset.Item"` | |
512 | |
513 ID string `gae:"$id"` | |
514 Parent *datastore.Key `gae:"$parent"` | |
515 Value []byte `gae:",noindex"` | |
516 } | |
517 | |
518 type tombstonesEntity struct { | |
519 _kind string `gae:"$kind,dsset.Tombstones"` | |
520 | |
521 ID string `gae:"$id"` | |
522 Parent *datastore.Key `gae:"$parent"` | |
523 Tombstones []tombstone `gae:",noindex"` | |
524 } | |
525 | |
526 type tombstone struct { | |
527 ID string // ID of tombstoned item | |
528 Tombstoned time.Time // when it was popped | |
529 } | |
530 | |
531 // shardRoot returns entity group key to use for a given shard. | |
532 func (s *Set) shardRoot(c context.Context, n int) *datastore.Key { | |
533 return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil) | |
534 } | |
535 | |
536 // batchOp splits 'total' into batches and calls 'op' in parallel. | |
537 // | |
538 // Doesn't preserve order of returned errors! Don't try to deconstruct the | |
539 // returned multi error, the position of individual errors there does not | |
540 // correlate with the original array. | |
541 func batchOp(total int, op func(start, end int) error) error { | |
542 switch { | |
543 case total == 0: | |
544 return nil | |
545 case total <= batchSize: | |
546 return op(0, total) | |
547 } | |
548 | |
549 errs := make(chan error) | |
550 ops := 0 | |
551 offset := 0 | |
552 for total > 0 { | |
553 count := batchSize | |
554 if count > total { | |
555 count = total | |
556 } | |
557 go func(start, end int) { | |
558 errs <- op(start, end) | |
559 }(offset, offset+count) | |
560 offset += count | |
561 total -= count | |
562 ops++ | |
563 } | |
564 | |
565 var all errors.MultiError | |
566 for i := 0; i < ops; i++ { | |
567 err := <-errs | |
568 if merr, yep := err.(errors.MultiError); yep { | |
569 for _, e := range merr { | |
570 if e != nil { | |
571 all = append(all, e) | |
572 } | |
573 } | |
574 } else if err != nil { | |
575 all = append(all, err) | |
576 } | |
577 } | |
578 | |
579 if len(all) == 0 { | |
580 return nil | |
581 } | |
582 return all | |
583 } | |
OLD | NEW |