OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2017 The LUCI Authors. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 // Package dsset implements a particular flavor of datastore-backed set. | |
16 // | |
17 // Due to its internal structure, it requires some maintenance on behalf of the | |
18 // caller to periodically cleanup removed items (aka tombstones). | |
19 // | |
20 // Items added to set should have IDs unique in time (or at least unique for | |
tandrii(chromium)
2017/07/18 15:55:14
1. I **feel**: "the" set, and "the" duration. But
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
21 // duration of some configurable time interval, as defined by 'TombstonesDelay' | |
22 // property). It means removed items can be added back to the set right away. | |
23 // | |
24 // Properties: | |
25 // * Batch 'Add' with configurable QPS limit. | |
26 // * Transactional consistent 'Pop' (1 QPS limit). | |
27 // * Non-transactional consistent 'List' (1 QPS limit). | |
28 // * Popped items can't be re-added until their tombstones expire. | |
29 // | |
30 // These properties make dsset suitable for multiple producers/single consumer | |
tandrii(chromium)
2017/07/18 15:55:15
nit: no need for slash in MPSC
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
31 // queues, where order of items is not important and each item has an identifier | |
32 // unique in time. | |
tandrii(chromium)
2017/07/18 15:55:14
ditto for "in time". How about "unique in each tim
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
33 // | |
34 // Structurally dsset consists of N+1 entity groups: | |
35 // * N separate entity groups that contain N shards of the set. | |
36 // * 1 entity group (with a configurable root) that holds tombstones. | |
37 // | |
38 // It is safe to increase number of shards at any time. Decreasing number of | |
39 // shards is dangerous (but can be done with some more coding). | |
40 // | |
41 // More shards make: | |
42 // * Add() less contentious (so it can support more QPS). | |
43 // * List() and CleanupStorage() slower and more expensive. | |
44 // * Pop() is not affected by number of shards. | |
45 package dsset | |
46 | |
47 import ( | |
48 "fmt" | |
49 "sync" | |
50 "time" | |
51 | |
52 "golang.org/x/net/context" | |
53 | |
54 "github.com/luci/gae/service/datastore" | |
55 "github.com/luci/luci-go/common/clock" | |
56 "github.com/luci/luci-go/common/data/rand/mathrand" | |
57 "github.com/luci/luci-go/common/data/stringset" | |
58 "github.com/luci/luci-go/common/errors" | |
59 "github.com/luci/luci-go/common/logging" | |
60 "github.com/luci/luci-go/common/retry/transient" | |
61 ) | |
62 | |
63 // Set holds a set of Items and uses Tombstones to achieve idempotency of Add. | |
64 // | |
65 // Produces just call Add(...). | |
tandrii(chromium)
2017/07/18 15:55:14
Producers
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
66 // | |
67 // A consumer need to run more elaborate algorithm that ensures atomicity of | |
tandrii(chromium)
2017/07/18 15:55:15
consider: s/A consumer need/The consumer (there sh
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
68 // 'Pop' and takes care of cleanup up of the garbage. This requires a mix of | |
tandrii(chromium)
2017/07/18 15:55:15
cleanup up -> cleanup OR cleaning up
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
69 // transactional and non-transactional actions: | |
70 // | |
71 // items, tombstones, err := set.List(ctx) | |
72 // if err != nil { | |
73 // return err | |
74 // } | |
75 // | |
76 // if len(items) == 0 && len(tombstones) == 0 { | |
77 // return nil | |
78 // } | |
79 // | |
80 // if err := set.CleanupStorage(tombstones); err != nil { | |
81 // return err | |
82 // } | |
83 // | |
84 // ... Fetch any additional info associated with 'items' ... | |
85 // | |
86 // return datastore.RunInTransaction(ctx, func(ctx context.Context) error { | |
87 // op, err := set.BeginPop(ctx) | |
88 // if err != nil { | |
89 // return err | |
90 // } | |
91 // op.CleanupTombstones(tombstones) | |
92 // for _, itm := range items { | |
93 // if op.Pop(item.ID) { | |
94 // // The item was indeed in the set and we've just removed it! | |
95 // } else { | |
96 // // Some other transaction has popped it already. | |
97 // } | |
98 // } | |
99 // return op.Submit() | |
100 // }, nil) | |
101 type Set struct { | |
102 ID string // global ID, used to construct datastore keys | |
103 ShardCount int // number of entity groups to use for sto rage | |
104 TombstonesRoot *datastore.Key // tombstones entity parent key | |
105 TombstonesDelay time.Duration // how long to keep tombstones in the set | |
106 } | |
107 | |
108 // Item is what's stored in the set. | |
109 type Item struct { | |
110 ID string // unique in time identifier of the item | |
111 Value []byte // arbitrary value (<1 MB, but preferably much smaller) | |
112 } | |
113 | |
114 // Tombstone is a reference to a deleted item that still lingers in the set. | |
115 // | |
116 // Tombstones exist to make sure recently popped items do not reappear in the | |
117 // set if producers attempt to re-add them. | |
118 // | |
119 // Its fields are intentionally private to force correct usage CleanupStorage | |
tandrii(chromium)
2017/07/18 15:55:14
usage of
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
120 // and CleanupTombstones. | |
121 type Tombstone struct { | |
122 set string // parent set name | |
tandrii(chromium)
2017/07/18 15:55:15
parent set ID, right?
Vadim Sh.
2017/07/24 00:30:17
Yes. 'ID' used to be 'Name', I changed it midway a
| |
123 id string // deleted item ID | |
124 storage []*datastore.Key // itemEntity's to delete | |
125 } | |
126 | |
127 // Add adds a bunch of items to the set. | |
tandrii(chromium)
2017/07/18 15:55:15
s/set./set. idempotently.
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
128 // | |
129 // Writes to some single entity group (not known in advance). If called outside | |
130 // of a transaction and the call fails, may add only some subset of items. | |
131 // Running inside a transaction makes this operation atomic. | |
132 // | |
133 // If items with given keys are already in the set, or has been deleted | |
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
134 // recently, they won't be re-added. No error is returned in this case. | |
135 // | |
136 // Returns only transient errors. | |
137 func (s *Set) Add(c context.Context, items []Item) error { | |
138 // TODO: batch + parallel | |
tandrii(chromium)
2017/07/18 15:55:15
what's batch for? If # of items added is too large
Vadim Sh.
2017/07/24 00:30:17
Yes. There's a limit on number of entities in sing
tandrii(chromium)
2017/07/31 18:58:46
Acknowledged.
| |
139 | |
140 // Pick a random shard and add all new items there. If this is a retry, they | |
141 // may exist in some other shard already. We don't care, they'll be | |
142 // deduplicated in 'List'. If added items has been popped already (they have | |
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:15
Done.
| |
143 // tombstones), 'List' will omit them as well. | |
144 shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount)) | |
145 entities := make([]itemEntity, len(items)) | |
146 for i, itm := range items { | |
147 entities[i] = itemEntity{ | |
148 ID: itm.ID, | |
149 Parent: shardRoot, | |
150 Value: itm.Value, | |
151 } | |
152 } | |
153 return transient.Tag.Apply(datastore.Put(c, entities)) | |
154 } | |
155 | |
156 // List returns all items that are currently in the set (in arbitrary order), | |
157 // as well as set of tombstones that points to items that were deleted | |
tandrii(chromium)
2017/07/18 15:55:15
as well as A set
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
158 // sufficiently long ago and can be cleaned up now. | |
159 // | |
160 // Must be called outside of transactions (panics otherwise). Reads many entity | |
161 // groups, including TombstonesRoot one. | |
162 // | |
163 // The set of tombstones to cleanup can be passed to CleanupStorage, and | |
164 // later to CleanupTombstones (during pop operation), in that order. Not doing | |
165 // so will lead to accumulation of garbage in the set that will slow down List | |
166 // and Pop. | |
167 // | |
168 // Returns only transient errors. | |
169 func (s *Set) List(c context.Context) (items []Item, cleanup []Tombstone, err er ror) { | |
170 if datastore.CurrentTransaction(c) != nil { | |
171 panic("dsset.Set.List must be called outside of a transaction") | |
172 } | |
173 | |
174 // Fetch all shards (via consistent ancestor queries) and all tombstones . | |
175 | |
176 shards := make([][]*itemEntity, s.ShardCount) | |
177 tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
178 | |
179 wg := sync.WaitGroup{} | |
180 errs := errors.NewLazyMultiError(s.ShardCount + 1) | |
181 | |
182 wg.Add(1) | |
183 go func() { | |
184 defer wg.Done() | |
185 if err := datastore.Get(c, &tombsEntity); err != nil && err != d atastore.ErrNoSuchEntity { | |
186 errs.Assign(0, err) | |
187 } | |
188 }() | |
189 | |
190 wg.Add(s.ShardCount) | |
191 for i := 0; i < s.ShardCount; i++ { | |
192 go func(i int) { | |
193 defer wg.Done() | |
194 q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRo ot(c, i)) | |
195 errs.Assign(i+1, datastore.GetAll(c, q, &shards[i])) | |
196 }(i) | |
197 } | |
198 | |
199 wg.Wait() | |
200 if err := errs.Get(); err != nil { | |
201 return nil, nil, transient.Tag.Apply(err) | |
202 } | |
203 | |
204 logging.Infof(c, "%s: %d tombs", s.ID, len(tombsEntity.Tombstones)) | |
205 for i, shard := range shards { | |
206 logging.Infof(c, "%s: shard #%d - %d items", s.ID, i, len(shard) ) | |
207 } | |
208 | |
209 // A set of items we pretend not to see. Initially all tombstoned ones. | |
210 // | |
211 // Since we are iterating over tombstone list anyway, find all sufficien tly | |
212 // old tombstones to return them to the caller, so they can be cleaned u p. | |
213 ignore := stringset.New(len(tombsEntity.Tombstones)) | |
214 oldestTombs := map[string]*Tombstone{} // key is item ID | |
215 now := clock.Now(c).UTC() | |
216 for _, t := range tombsEntity.Tombstones { | |
217 ignore.Add(t.ID) | |
218 if now.Sub(t.Tombstoned) > s.TombstonesDelay { | |
219 oldestTombs[t.ID] = &Tombstone{set: s.ID, id: t.ID} | |
220 } | |
221 } | |
222 | |
223 // Join all shards, throwing away tombstoned and duplicated items. For o ldest | |
224 // tombstones collect references to entities that need to be deleted. Th ere | |
225 // may be more than one such entity per ID due to possibility of differe nt | |
226 // shards hosting same item (if Add call was retried)! | |
tandrii(chromium)
2017/07/18 17:54:18
the same item
Vadim Sh.
2017/07/24 00:30:18
Done.
| |
227 for _, shard := range shards { | |
228 for _, itemEntity := range shard { | |
229 if !ignore.Has(itemEntity.ID) { | |
230 items = append(items, Item{ | |
231 ID: itemEntity.ID, | |
232 Value: itemEntity.Value, | |
233 }) | |
234 ignore.Add(itemEntity.ID) | |
235 } | |
236 if tomb := oldestTombs[itemEntity.ID]; tomb != nil { | |
237 tomb.storage = append(tomb.storage, datastore.Ke yForObj(c, itemEntity)) | |
238 } | |
239 } | |
240 } | |
241 | |
242 if len(oldestTombs) != 0 { | |
243 cleanup = make([]Tombstone, 0, len(oldestTombs)) | |
244 for _, tomb := range oldestTombs { | |
245 cleanup = append(cleanup, *tomb) | |
246 } | |
247 } | |
248 | |
249 return items, cleanup, nil | |
250 } | |
251 | |
252 // CleanupStorage deletes entities used to store items under given tombstones. | |
tandrii(chromium)
2017/07/18 17:54:18
nit: I'd call it GC :)
Vadim Sh.
2017/07/24 00:30:17
There are two "kinds" of GC: one removes itemEntit
| |
253 // | |
254 // Touches many entity groups. Must be called outside of transactions. | |
255 // Idempotent. | |
256 // | |
257 // This MUST be called before tombstones are popped. Failure to do so will make | |
tandrii(chromium)
2017/07/18 17:54:18
is this worth enforcing? If so, then we could reco
Vadim Sh.
2017/07/24 00:30:16
Done, sort of.
| |
258 // items reappear in the set. | |
259 // | |
260 // Returns only transient errors. | |
261 func (s *Set) CleanupStorage(c context.Context, cleanup []Tombstone) error { | |
262 if datastore.CurrentTransaction(c) != nil { | |
263 panic("dsset.Set.CleanupStorage must be called outside of a tran saction") | |
264 } | |
265 | |
266 keys := []*datastore.Key{} | |
267 for _, old := range cleanup { | |
268 if old.set != s.ID { | |
269 panic("passed Tombstone from some other set") | |
270 } | |
271 keys = append(keys, old.storage...) | |
272 } | |
273 | |
274 // TODO: parallel | |
275 | |
276 merr := errors.MultiError{} | |
277 for len(keys) > 0 { | |
278 count := 100 | |
279 if count > len(keys) { | |
280 count = len(keys) | |
281 } | |
282 logging.Infof(c, "Cleaning up storage for %q by removing %d keys ", s.ID, count) | |
283 if err := datastore.Delete(c, keys[:count]); err != nil { | |
284 logging.Warningf(c, "RPC failed - %s", err) | |
285 merr = append(merr, err) | |
286 } | |
287 keys = keys[count:] | |
288 } | |
289 | |
290 if len(merr) == 0 { | |
291 return nil | |
292 } | |
293 | |
294 return transient.Tag.Apply(merr) | |
295 } | |
296 | |
297 // PopOp is an in-progress Pop operation. | |
298 // | |
299 // See BeginPop. | |
300 type PopOp struct { | |
301 ctx context.Context // datastore context to use for this op | |
302 set string // id of the set | |
303 now time.Time // popping time for all popped items | |
304 entity *tombstonesEntity // entity with tombstones | |
305 tombs map[string]tombstone // tombstones in a map form | |
306 dirty bool // true if the tombstone map was modified | |
307 } | |
308 | |
309 // BeginPop initiates Pop operation. | |
310 // | |
311 // Pop operation is used to transactionally remove items from the set, as well | |
312 // as cleanup old tombstones. It must be finished with Submit. | |
313 // | |
314 // Requires a transaction. Modifies TombstonesRoot entity group (and only it). | |
315 // | |
316 // Returns only transient errors. | |
317 func (s *Set) BeginPop(c context.Context) (*PopOp, error) { | |
318 if datastore.CurrentTransaction(c) == nil { | |
319 panic("dsset.Set.BeginPop must be called inside a transaction") | |
320 } | |
321 | |
322 entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
323 if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNo SuchEntity { | |
324 return nil, transient.Tag.Apply(err) | |
325 } | |
326 | |
327 // The data in tombstonesEntity, in map form. | |
328 tombs := make(map[string]tombstone, len(entity.Tombstones)) | |
329 for _, t := range entity.Tombstones { | |
330 tombs[t.ID] = t | |
331 } | |
332 | |
333 return &PopOp{ | |
334 ctx: c, | |
335 set: s.ID, | |
336 now: clock.Now(c).UTC(), | |
337 entity: entity, | |
338 tombs: tombs, | |
339 dirty: false, | |
340 }, nil | |
341 } | |
342 | |
343 // AlreadyPopped returns true if the given item has already been popped, perhaps | |
344 // in another transaction. | |
345 // | |
346 // Returns false for items that are still in the set, or items that have never | |
347 // been in the set to being with. Use List (prior to starting the transaction) | |
tandrii(chromium)
2017/07/18 17:54:18
to beGIN with
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
348 // to distinguish this two cases. | |
tandrii(chromium)
2017/07/18 17:54:18
these
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
349 func (p *PopOp) AlreadyPopped(id string) bool { | |
350 _, hasTomb := p.tombs[id] | |
351 return hasTomb | |
352 } | |
353 | |
354 // Pop pop the item from the set and returns true if it was indeed | |
tandrii(chromium)
2017/07/18 17:54:18
Pop pops
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
355 // popped. | |
356 // | |
357 // Returns false if this item has been popped before (perhaps in another | |
358 // transaction). | |
359 // | |
360 // Returns true if item existed in the set and was popped, or it never existed | |
361 // there at all. Popping items that have never been in the set leads to weird | |
362 // behavior, don't do this. Use List (prior to starting the transaction) | |
363 // to figure out what items can be potentially popped. | |
364 func (p *PopOp) Pop(id string) bool { | |
365 if _, hasTomb := p.tombs[id]; hasTomb { | |
366 return false | |
367 } | |
368 p.tombs[id] = tombstone{ID: id, Tombstoned: p.now} | |
tandrii(chromium)
2017/07/18 17:54:19
Suppose List is called > TombstonesDelay seconds b
Vadim Sh.
2017/07/24 00:30:16
It is single consumer mostly because 1 QPS limit o
| |
369 p.dirty = true | |
370 return true | |
371 } | |
372 | |
373 // CleanupTombstones removes old tombstones. | |
374 // | |
375 // This must be called only after set's CleanupStorage method, otherwise removed | |
376 // items may reappear again. | |
377 func (p *PopOp) CleanupTombstones(tombstones []Tombstone) { | |
378 for _, old := range tombstones { | |
379 if old.set != p.set { | |
380 panic("passed Tombstone from some other set") | |
381 } | |
382 if _, hasTomb := p.tombs[old.id]; hasTomb { | |
383 delete(p.tombs, old.id) | |
384 p.dirty = true | |
385 } | |
386 } | |
387 } | |
388 | |
389 // Submit completes this pop operation but submitting changes to datastore. | |
tandrii(chromium)
2017/07/18 15:55:14
s/but/by
Vadim Sh.
2017/07/24 00:30:15
Done.
| |
390 // | |
391 // The operation is unusable after this. | |
tandrii(chromium)
2017/07/18 17:54:19
wdyt about adding a field "submitted"=false by def
Vadim Sh.
2017/07/24 00:30:17
I did it initially, but the removed to simplify co
| |
392 func (p *PopOp) Submit() error { | |
393 if p.dirty { | |
394 p.entity.Tombstones = p.entity.Tombstones[:0] | |
395 for _, tomb := range p.tombs { | |
396 p.entity.Tombstones = append(p.entity.Tombstones, tomb) | |
397 } | |
398 if err := datastore.Put(p.ctx, p.entity); err != nil { | |
399 return transient.Tag.Apply(err) | |
400 } | |
401 } | |
402 return nil | |
403 } | |
404 | |
405 //////////////////////////////////////////////////////////////////////////////// | |
406 | |
407 type itemEntity struct { | |
408 _kind string `gae:"$kind,dsset.Item"` | |
409 | |
410 ID string `gae:"$id"` | |
411 Parent *datastore.Key `gae:"$parent"` | |
412 Value []byte `gae:",noindex"` | |
413 } | |
414 | |
415 type tombstonesEntity struct { | |
416 _kind string `gae:"$kind,dsset.Tombstones"` | |
417 | |
418 ID string `gae:"$id"` | |
419 Parent *datastore.Key `gae:"$parent"` | |
420 Tombstones []tombstone `gae:",noindex"` | |
421 } | |
422 | |
423 type tombstone struct { | |
424 ID string // ID of tombstoned item | |
425 Tombstoned time.Time // when it was popped | |
426 } | |
427 | |
428 // shardRoot returns entity group key to use for a given shard. | |
429 func (s *Set) shardRoot(c context.Context, n int) *datastore.Key { | |
430 return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil) | |
431 } | |
OLD | NEW |