Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(216)

Side by Side Diff: scheduler/appengine/engine/dsset/dsset.go

Issue 2981143002: Add 'dsset' structure. (Closed)
Patch Set: Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package dsset implements a particular flavor of datastore-backed set.
16 //
17 // Due to its internal structure, it requires some maintenance on behalf of the
18 // caller to periodically cleanup removed items (aka tombstones).
19 //
20 // Items added to set should have IDs unique in time (or at least unique for
tandrii(chromium) 2017/07/18 15:55:14 1. I **feel**: "the" set, and "the" duration. But
Vadim Sh. 2017/07/24 00:30:16 Done.
21 // duration of some configurable time interval, as defined by 'TombstonesDelay'
22 // property). It means removed items can be added back to the set right away.
23 //
24 // Properties:
25 // * Batch 'Add' with configurable QPS limit.
26 // * Transactional consistent 'Pop' (1 QPS limit).
27 // * Non-transactional consistent 'List' (1 QPS limit).
28 // * Popped items can't be re-added until their tombstones expire.
29 //
30 // These properties make dsset suitable for multiple producers/single consumer
tandrii(chromium) 2017/07/18 15:55:15 nit: no need for slash in MPSC
Vadim Sh. 2017/07/24 00:30:16 Done.
31 // queues, where order of items is not important and each item has an identifier
32 // unique in time.
tandrii(chromium) 2017/07/18 15:55:14 ditto for "in time". How about "unique in each tim
Vadim Sh. 2017/07/24 00:30:17 Done.
33 //
34 // Structurally dsset consists of N+1 entity groups:
35 // * N separate entity groups that contain N shards of the set.
36 // * 1 entity group (with a configurable root) that holds tombstones.
37 //
38 // It is safe to increase number of shards at any time. Decreasing number of
39 // shards is dangerous (but can be done with some more coding).
40 //
41 // More shards make:
42 // * Add() less contentious (so it can support more QPS).
43 // * List() and CleanupStorage() slower and more expensive.
44 // * Pop() is not affected by number of shards.
45 package dsset
46
47 import (
48 "fmt"
49 "sync"
50 "time"
51
52 "golang.org/x/net/context"
53
54 "github.com/luci/gae/service/datastore"
55 "github.com/luci/luci-go/common/clock"
56 "github.com/luci/luci-go/common/data/rand/mathrand"
57 "github.com/luci/luci-go/common/data/stringset"
58 "github.com/luci/luci-go/common/errors"
59 "github.com/luci/luci-go/common/logging"
60 "github.com/luci/luci-go/common/retry/transient"
61 )
62
63 // Set holds a set of Items and uses Tombstones to achieve idempotency of Add.
64 //
65 // Produces just call Add(...).
tandrii(chromium) 2017/07/18 15:55:14 Producers
Vadim Sh. 2017/07/24 00:30:16 Done.
66 //
67 // A consumer need to run more elaborate algorithm that ensures atomicity of
tandrii(chromium) 2017/07/18 15:55:15 consider: s/A consumer need/The consumer (there sh
Vadim Sh. 2017/07/24 00:30:16 Done.
68 // 'Pop' and takes care of cleanup up of the garbage. This requires a mix of
tandrii(chromium) 2017/07/18 15:55:15 cleanup up -> cleanup OR cleaning up
Vadim Sh. 2017/07/24 00:30:17 Done.
69 // transactional and non-transactional actions:
70 //
71 // items, tombstones, err := set.List(ctx)
72 // if err != nil {
73 // return err
74 // }
75 //
76 // if len(items) == 0 && len(tombstones) == 0 {
77 // return nil
78 // }
79 //
80 // if err := set.CleanupStorage(tombstones); err != nil {
81 // return err
82 // }
83 //
84 // ... Fetch any additional info associated with 'items' ...
85 //
86 // return datastore.RunInTransaction(ctx, func(ctx context.Context) error {
87 // op, err := set.BeginPop(ctx)
88 // if err != nil {
89 // return err
90 // }
91 // op.CleanupTombstones(tombstones)
92 // for _, itm := range items {
93 // if op.Pop(item.ID) {
94 // // The item was indeed in the set and we've just removed it!
95 // } else {
96 // // Some other transaction has popped it already.
97 // }
98 // }
99 // return op.Submit()
100 // }, nil)
101 type Set struct {
102 ID string // global ID, used to construct datastore keys
103 ShardCount int // number of entity groups to use for sto rage
104 TombstonesRoot *datastore.Key // tombstones entity parent key
105 TombstonesDelay time.Duration // how long to keep tombstones in the set
106 }
107
108 // Item is what's stored in the set.
109 type Item struct {
110 ID string // unique in time identifier of the item
111 Value []byte // arbitrary value (<1 MB, but preferably much smaller)
112 }
113
114 // Tombstone is a reference to a deleted item that still lingers in the set.
115 //
116 // Tombstones exist to make sure recently popped items do not reappear in the
117 // set if producers attempt to re-add them.
118 //
119 // Its fields are intentionally private to force correct usage CleanupStorage
tandrii(chromium) 2017/07/18 15:55:14 usage of
Vadim Sh. 2017/07/24 00:30:17 Done.
120 // and CleanupTombstones.
121 type Tombstone struct {
122 set string // parent set name
tandrii(chromium) 2017/07/18 15:55:15 parent set ID, right?
Vadim Sh. 2017/07/24 00:30:17 Yes. 'ID' used to be 'Name', I changed it midway a
123 id string // deleted item ID
124 storage []*datastore.Key // itemEntity's to delete
125 }
126
127 // Add adds a bunch of items to the set.
tandrii(chromium) 2017/07/18 15:55:15 s/set./set. idempotently.
Vadim Sh. 2017/07/24 00:30:16 Done.
128 //
129 // Writes to some single entity group (not known in advance). If called outside
130 // of a transaction and the call fails, may add only some subset of items.
131 // Running inside a transaction makes this operation atomic.
132 //
133 // If items with given keys are already in the set, or has been deleted
tandrii(chromium) 2017/07/18 15:55:14 s/has/have
Vadim Sh. 2017/07/24 00:30:16 Done.
134 // recently, they won't be re-added. No error is returned in this case.
135 //
136 // Returns only transient errors.
137 func (s *Set) Add(c context.Context, items []Item) error {
138 // TODO: batch + parallel
tandrii(chromium) 2017/07/18 15:55:15 what's batch for? If # of items added is too large
Vadim Sh. 2017/07/24 00:30:17 Yes. There's a limit on number of entities in sing
tandrii(chromium) 2017/07/31 18:58:46 Acknowledged.
139
140 // Pick a random shard and add all new items there. If this is a retry, they
141 // may exist in some other shard already. We don't care, they'll be
142 // deduplicated in 'List'. If added items has been popped already (they have
tandrii(chromium) 2017/07/18 15:55:14 s/has/have
Vadim Sh. 2017/07/24 00:30:15 Done.
143 // tombstones), 'List' will omit them as well.
144 shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount))
145 entities := make([]itemEntity, len(items))
146 for i, itm := range items {
147 entities[i] = itemEntity{
148 ID: itm.ID,
149 Parent: shardRoot,
150 Value: itm.Value,
151 }
152 }
153 return transient.Tag.Apply(datastore.Put(c, entities))
154 }
155
156 // List returns all items that are currently in the set (in arbitrary order),
157 // as well as set of tombstones that points to items that were deleted
tandrii(chromium) 2017/07/18 15:55:15 as well as A set
Vadim Sh. 2017/07/24 00:30:16 Done.
158 // sufficiently long ago and can be cleaned up now.
159 //
160 // Must be called outside of transactions (panics otherwise). Reads many entity
161 // groups, including TombstonesRoot one.
162 //
163 // The set of tombstones to cleanup can be passed to CleanupStorage, and
164 // later to CleanupTombstones (during pop operation), in that order. Not doing
165 // so will lead to accumulation of garbage in the set that will slow down List
166 // and Pop.
167 //
168 // Returns only transient errors.
169 func (s *Set) List(c context.Context) (items []Item, cleanup []Tombstone, err er ror) {
170 if datastore.CurrentTransaction(c) != nil {
171 panic("dsset.Set.List must be called outside of a transaction")
172 }
173
174 // Fetch all shards (via consistent ancestor queries) and all tombstones .
175
176 shards := make([][]*itemEntity, s.ShardCount)
177 tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
178
179 wg := sync.WaitGroup{}
180 errs := errors.NewLazyMultiError(s.ShardCount + 1)
181
182 wg.Add(1)
183 go func() {
184 defer wg.Done()
185 if err := datastore.Get(c, &tombsEntity); err != nil && err != d atastore.ErrNoSuchEntity {
186 errs.Assign(0, err)
187 }
188 }()
189
190 wg.Add(s.ShardCount)
191 for i := 0; i < s.ShardCount; i++ {
192 go func(i int) {
193 defer wg.Done()
194 q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRo ot(c, i))
195 errs.Assign(i+1, datastore.GetAll(c, q, &shards[i]))
196 }(i)
197 }
198
199 wg.Wait()
200 if err := errs.Get(); err != nil {
201 return nil, nil, transient.Tag.Apply(err)
202 }
203
204 logging.Infof(c, "%s: %d tombs", s.ID, len(tombsEntity.Tombstones))
205 for i, shard := range shards {
206 logging.Infof(c, "%s: shard #%d - %d items", s.ID, i, len(shard) )
207 }
208
209 // A set of items we pretend not to see. Initially all tombstoned ones.
210 //
211 // Since we are iterating over tombstone list anyway, find all sufficien tly
212 // old tombstones to return them to the caller, so they can be cleaned u p.
213 ignore := stringset.New(len(tombsEntity.Tombstones))
214 oldestTombs := map[string]*Tombstone{} // key is item ID
215 now := clock.Now(c).UTC()
216 for _, t := range tombsEntity.Tombstones {
217 ignore.Add(t.ID)
218 if now.Sub(t.Tombstoned) > s.TombstonesDelay {
219 oldestTombs[t.ID] = &Tombstone{set: s.ID, id: t.ID}
220 }
221 }
222
223 // Join all shards, throwing away tombstoned and duplicated items. For o ldest
224 // tombstones collect references to entities that need to be deleted. Th ere
225 // may be more than one such entity per ID due to possibility of differe nt
226 // shards hosting same item (if Add call was retried)!
tandrii(chromium) 2017/07/18 17:54:18 the same item
Vadim Sh. 2017/07/24 00:30:18 Done.
227 for _, shard := range shards {
228 for _, itemEntity := range shard {
229 if !ignore.Has(itemEntity.ID) {
230 items = append(items, Item{
231 ID: itemEntity.ID,
232 Value: itemEntity.Value,
233 })
234 ignore.Add(itemEntity.ID)
235 }
236 if tomb := oldestTombs[itemEntity.ID]; tomb != nil {
237 tomb.storage = append(tomb.storage, datastore.Ke yForObj(c, itemEntity))
238 }
239 }
240 }
241
242 if len(oldestTombs) != 0 {
243 cleanup = make([]Tombstone, 0, len(oldestTombs))
244 for _, tomb := range oldestTombs {
245 cleanup = append(cleanup, *tomb)
246 }
247 }
248
249 return items, cleanup, nil
250 }
251
252 // CleanupStorage deletes entities used to store items under given tombstones.
tandrii(chromium) 2017/07/18 17:54:18 nit: I'd call it GC :)
Vadim Sh. 2017/07/24 00:30:17 There are two "kinds" of GC: one removes itemEntit
253 //
254 // Touches many entity groups. Must be called outside of transactions.
255 // Idempotent.
256 //
257 // This MUST be called before tombstones are popped. Failure to do so will make
tandrii(chromium) 2017/07/18 17:54:18 is this worth enforcing? If so, then we could reco
Vadim Sh. 2017/07/24 00:30:16 Done, sort of.
258 // items reappear in the set.
259 //
260 // Returns only transient errors.
261 func (s *Set) CleanupStorage(c context.Context, cleanup []Tombstone) error {
262 if datastore.CurrentTransaction(c) != nil {
263 panic("dsset.Set.CleanupStorage must be called outside of a tran saction")
264 }
265
266 keys := []*datastore.Key{}
267 for _, old := range cleanup {
268 if old.set != s.ID {
269 panic("passed Tombstone from some other set")
270 }
271 keys = append(keys, old.storage...)
272 }
273
274 // TODO: parallel
275
276 merr := errors.MultiError{}
277 for len(keys) > 0 {
278 count := 100
279 if count > len(keys) {
280 count = len(keys)
281 }
282 logging.Infof(c, "Cleaning up storage for %q by removing %d keys ", s.ID, count)
283 if err := datastore.Delete(c, keys[:count]); err != nil {
284 logging.Warningf(c, "RPC failed - %s", err)
285 merr = append(merr, err)
286 }
287 keys = keys[count:]
288 }
289
290 if len(merr) == 0 {
291 return nil
292 }
293
294 return transient.Tag.Apply(merr)
295 }
296
297 // PopOp is an in-progress Pop operation.
298 //
299 // See BeginPop.
300 type PopOp struct {
301 ctx context.Context // datastore context to use for this op
302 set string // id of the set
303 now time.Time // popping time for all popped items
304 entity *tombstonesEntity // entity with tombstones
305 tombs map[string]tombstone // tombstones in a map form
306 dirty bool // true if the tombstone map was modified
307 }
308
309 // BeginPop initiates Pop operation.
310 //
311 // Pop operation is used to transactionally remove items from the set, as well
312 // as cleanup old tombstones. It must be finished with Submit.
313 //
314 // Requires a transaction. Modifies TombstonesRoot entity group (and only it).
315 //
316 // Returns only transient errors.
317 func (s *Set) BeginPop(c context.Context) (*PopOp, error) {
318 if datastore.CurrentTransaction(c) == nil {
319 panic("dsset.Set.BeginPop must be called inside a transaction")
320 }
321
322 entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot}
323 if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNo SuchEntity {
324 return nil, transient.Tag.Apply(err)
325 }
326
327 // The data in tombstonesEntity, in map form.
328 tombs := make(map[string]tombstone, len(entity.Tombstones))
329 for _, t := range entity.Tombstones {
330 tombs[t.ID] = t
331 }
332
333 return &PopOp{
334 ctx: c,
335 set: s.ID,
336 now: clock.Now(c).UTC(),
337 entity: entity,
338 tombs: tombs,
339 dirty: false,
340 }, nil
341 }
342
343 // AlreadyPopped returns true if the given item has already been popped, perhaps
344 // in another transaction.
345 //
346 // Returns false for items that are still in the set, or items that have never
347 // been in the set to being with. Use List (prior to starting the transaction)
tandrii(chromium) 2017/07/18 17:54:18 to beGIN with
Vadim Sh. 2017/07/24 00:30:17 Done.
348 // to distinguish this two cases.
tandrii(chromium) 2017/07/18 17:54:18 these
Vadim Sh. 2017/07/24 00:30:17 Done.
349 func (p *PopOp) AlreadyPopped(id string) bool {
350 _, hasTomb := p.tombs[id]
351 return hasTomb
352 }
353
354 // Pop pop the item from the set and returns true if it was indeed
tandrii(chromium) 2017/07/18 17:54:18 Pop pops
Vadim Sh. 2017/07/24 00:30:17 Done.
355 // popped.
356 //
357 // Returns false if this item has been popped before (perhaps in another
358 // transaction).
359 //
360 // Returns true if item existed in the set and was popped, or it never existed
361 // there at all. Popping items that have never been in the set leads to weird
362 // behavior, don't do this. Use List (prior to starting the transaction)
363 // to figure out what items can be potentially popped.
364 func (p *PopOp) Pop(id string) bool {
365 if _, hasTomb := p.tombs[id]; hasTomb {
366 return false
367 }
368 p.tombs[id] = tombstone{ID: id, Tombstoned: p.now}
tandrii(chromium) 2017/07/18 17:54:19 Suppose List is called > TombstonesDelay seconds b
Vadim Sh. 2017/07/24 00:30:16 It is single consumer mostly because 1 QPS limit o
369 p.dirty = true
370 return true
371 }
372
373 // CleanupTombstones removes old tombstones.
374 //
375 // This must be called only after set's CleanupStorage method, otherwise removed
376 // items may reappear again.
377 func (p *PopOp) CleanupTombstones(tombstones []Tombstone) {
378 for _, old := range tombstones {
379 if old.set != p.set {
380 panic("passed Tombstone from some other set")
381 }
382 if _, hasTomb := p.tombs[old.id]; hasTomb {
383 delete(p.tombs, old.id)
384 p.dirty = true
385 }
386 }
387 }
388
389 // Submit completes this pop operation but submitting changes to datastore.
tandrii(chromium) 2017/07/18 15:55:14 s/but/by
Vadim Sh. 2017/07/24 00:30:15 Done.
390 //
391 // The operation is unusable after this.
tandrii(chromium) 2017/07/18 17:54:19 wdyt about adding a field "submitted"=false by def
Vadim Sh. 2017/07/24 00:30:17 I did it initially, but the removed to simplify co
392 func (p *PopOp) Submit() error {
393 if p.dirty {
394 p.entity.Tombstones = p.entity.Tombstones[:0]
395 for _, tomb := range p.tombs {
396 p.entity.Tombstones = append(p.entity.Tombstones, tomb)
397 }
398 if err := datastore.Put(p.ctx, p.entity); err != nil {
399 return transient.Tag.Apply(err)
400 }
401 }
402 return nil
403 }
404
405 ////////////////////////////////////////////////////////////////////////////////
406
407 type itemEntity struct {
408 _kind string `gae:"$kind,dsset.Item"`
409
410 ID string `gae:"$id"`
411 Parent *datastore.Key `gae:"$parent"`
412 Value []byte `gae:",noindex"`
413 }
414
415 type tombstonesEntity struct {
416 _kind string `gae:"$kind,dsset.Tombstones"`
417
418 ID string `gae:"$id"`
419 Parent *datastore.Key `gae:"$parent"`
420 Tombstones []tombstone `gae:",noindex"`
421 }
422
423 type tombstone struct {
424 ID string // ID of tombstoned item
425 Tombstoned time.Time // when it was popped
426 }
427
428 // shardRoot returns entity group key to use for a given shard.
429 func (s *Set) shardRoot(c context.Context, n int) *datastore.Key {
430 return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil)
431 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698