Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 // Package dsset implements a particular flavor of datastore-backed set. | |
| 16 // | |
| 17 // Due to its internal structure, it requires some maintenance on behalf of the | |
| 18 // caller to periodically cleanup removed items (aka tombstones). | |
| 19 // | |
| 20 // Items added to set should have IDs unique in time (or at least unique for | |
|
tandrii(chromium)
2017/07/18 15:55:14
1. I **feel**: "the" set, and "the" duration. But
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
| 21 // duration of some configurable time interval, as defined by 'TombstonesDelay' | |
| 22 // property). It means removed items can be added back to the set right away. | |
| 23 // | |
| 24 // Properties: | |
| 25 // * Batch 'Add' with configurable QPS limit. | |
| 26 // * Transactional consistent 'Pop' (1 QPS limit). | |
| 27 // * Non-transactional consistent 'List' (1 QPS limit). | |
| 28 // * Popped items can't be re-added until their tombstones expire. | |
| 29 // | |
| 30 // These properties make dsset suitable for multiple producers/single consumer | |
|
tandrii(chromium)
2017/07/18 15:55:15
nit: no need for slash in MPSC
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
| 31 // queues, where order of items is not important and each item has an identifier | |
| 32 // unique in time. | |
|
tandrii(chromium)
2017/07/18 15:55:14
ditto for "in time". How about "unique in each tim
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
| 33 // | |
| 34 // Structurally dsset consists of N+1 entity groups: | |
| 35 // * N separate entity groups that contain N shards of the set. | |
| 36 // * 1 entity group (with a configurable root) that holds tombstones. | |
| 37 // | |
| 38 // It is safe to increase number of shards at any time. Decreasing number of | |
| 39 // shards is dangerous (but can be done with some more coding). | |
| 40 // | |
| 41 // More shards make: | |
| 42 // * Add() less contentious (so it can support more QPS). | |
| 43 // * List() and CleanupStorage() slower and more expensive. | |
| 44 // * Pop() is not affected by number of shards. | |
| 45 package dsset | |
| 46 | |
| 47 import ( | |
| 48 "fmt" | |
| 49 "sync" | |
| 50 "time" | |
| 51 | |
| 52 "golang.org/x/net/context" | |
| 53 | |
| 54 "github.com/luci/gae/service/datastore" | |
| 55 "github.com/luci/luci-go/common/clock" | |
| 56 "github.com/luci/luci-go/common/data/rand/mathrand" | |
| 57 "github.com/luci/luci-go/common/data/stringset" | |
| 58 "github.com/luci/luci-go/common/errors" | |
| 59 "github.com/luci/luci-go/common/logging" | |
| 60 "github.com/luci/luci-go/common/retry/transient" | |
| 61 ) | |
| 62 | |
| 63 // Set holds a set of Items and uses Tombstones to achieve idempotency of Add. | |
| 64 // | |
| 65 // Produces just call Add(...). | |
|
tandrii(chromium)
2017/07/18 15:55:14
Producers
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
| 66 // | |
| 67 // A consumer need to run more elaborate algorithm that ensures atomicity of | |
|
tandrii(chromium)
2017/07/18 15:55:15
consider: s/A consumer need/The consumer (there sh
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
| 68 // 'Pop' and takes care of cleanup up of the garbage. This requires a mix of | |
|
tandrii(chromium)
2017/07/18 15:55:15
cleanup up -> cleanup OR cleaning up
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
| 69 // transactional and non-transactional actions: | |
| 70 // | |
| 71 // items, tombstones, err := set.List(ctx) | |
| 72 // if err != nil { | |
| 73 // return err | |
| 74 // } | |
| 75 // | |
| 76 // if len(items) == 0 && len(tombstones) == 0 { | |
| 77 // return nil | |
| 78 // } | |
| 79 // | |
| 80 // if err := set.CleanupStorage(tombstones); err != nil { | |
| 81 // return err | |
| 82 // } | |
| 83 // | |
| 84 // ... Fetch any additional info associated with 'items' ... | |
| 85 // | |
| 86 // return datastore.RunInTransaction(ctx, func(ctx context.Context) error { | |
| 87 // op, err := set.BeginPop(ctx) | |
| 88 // if err != nil { | |
| 89 // return err | |
| 90 // } | |
| 91 // op.CleanupTombstones(tombstones) | |
| 92 // for _, itm := range items { | |
| 93 // if op.Pop(item.ID) { | |
| 94 // // The item was indeed in the set and we've just removed it! | |
| 95 // } else { | |
| 96 // // Some other transaction has popped it already. | |
| 97 // } | |
| 98 // } | |
| 99 // return op.Submit() | |
| 100 // }, nil) | |
| 101 type Set struct { | |
| 102 ID string // global ID, used to construct datastore keys | |
| 103 ShardCount int // number of entity groups to use for sto rage | |
| 104 TombstonesRoot *datastore.Key // tombstones entity parent key | |
| 105 TombstonesDelay time.Duration // how long to keep tombstones in the set | |
| 106 } | |
| 107 | |
| 108 // Item is what's stored in the set. | |
| 109 type Item struct { | |
| 110 ID string // unique in time identifier of the item | |
| 111 Value []byte // arbitrary value (<1 MB, but preferably much smaller) | |
| 112 } | |
| 113 | |
| 114 // Tombstone is a reference to a deleted item that still lingers in the set. | |
| 115 // | |
| 116 // Tombstones exist to make sure recently popped items do not reappear in the | |
| 117 // set if producers attempt to re-add them. | |
| 118 // | |
| 119 // Its fields are intentionally private to force correct usage CleanupStorage | |
|
tandrii(chromium)
2017/07/18 15:55:14
usage of
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
| 120 // and CleanupTombstones. | |
| 121 type Tombstone struct { | |
| 122 set string // parent set name | |
|
tandrii(chromium)
2017/07/18 15:55:15
parent set ID, right?
Vadim Sh.
2017/07/24 00:30:17
Yes. 'ID' used to be 'Name', I changed it midway a
| |
| 123 id string // deleted item ID | |
| 124 storage []*datastore.Key // itemEntity's to delete | |
| 125 } | |
| 126 | |
| 127 // Add adds a bunch of items to the set. | |
|
tandrii(chromium)
2017/07/18 15:55:15
s/set./set. idempotently.
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
| 128 // | |
| 129 // Writes to some single entity group (not known in advance). If called outside | |
| 130 // of a transaction and the call fails, may add only some subset of items. | |
| 131 // Running inside a transaction makes this operation atomic. | |
| 132 // | |
| 133 // If items with given keys are already in the set, or has been deleted | |
|
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
| 134 // recently, they won't be re-added. No error is returned in this case. | |
| 135 // | |
| 136 // Returns only transient errors. | |
| 137 func (s *Set) Add(c context.Context, items []Item) error { | |
| 138 // TODO: batch + parallel | |
|
tandrii(chromium)
2017/07/18 15:55:15
what's batch for? If # of items added is too large
Vadim Sh.
2017/07/24 00:30:17
Yes. There's a limit on number of entities in sing
tandrii(chromium)
2017/07/31 18:58:46
Acknowledged.
| |
| 139 | |
| 140 // Pick a random shard and add all new items there. If this is a retry, they | |
| 141 // may exist in some other shard already. We don't care, they'll be | |
| 142 // deduplicated in 'List'. If added items has been popped already (they have | |
|
tandrii(chromium)
2017/07/18 15:55:14
s/has/have
Vadim Sh.
2017/07/24 00:30:15
Done.
| |
| 143 // tombstones), 'List' will omit them as well. | |
| 144 shardRoot := s.shardRoot(c, mathrand.Intn(c, s.ShardCount)) | |
| 145 entities := make([]itemEntity, len(items)) | |
| 146 for i, itm := range items { | |
| 147 entities[i] = itemEntity{ | |
| 148 ID: itm.ID, | |
| 149 Parent: shardRoot, | |
| 150 Value: itm.Value, | |
| 151 } | |
| 152 } | |
| 153 return transient.Tag.Apply(datastore.Put(c, entities)) | |
| 154 } | |
| 155 | |
| 156 // List returns all items that are currently in the set (in arbitrary order), | |
| 157 // as well as set of tombstones that points to items that were deleted | |
|
tandrii(chromium)
2017/07/18 15:55:15
as well as A set
Vadim Sh.
2017/07/24 00:30:16
Done.
| |
| 158 // sufficiently long ago and can be cleaned up now. | |
| 159 // | |
| 160 // Must be called outside of transactions (panics otherwise). Reads many entity | |
| 161 // groups, including TombstonesRoot one. | |
| 162 // | |
| 163 // The set of tombstones to cleanup can be passed to CleanupStorage, and | |
| 164 // later to CleanupTombstones (during pop operation), in that order. Not doing | |
| 165 // so will lead to accumulation of garbage in the set that will slow down List | |
| 166 // and Pop. | |
| 167 // | |
| 168 // Returns only transient errors. | |
| 169 func (s *Set) List(c context.Context) (items []Item, cleanup []Tombstone, err er ror) { | |
| 170 if datastore.CurrentTransaction(c) != nil { | |
| 171 panic("dsset.Set.List must be called outside of a transaction") | |
| 172 } | |
| 173 | |
| 174 // Fetch all shards (via consistent ancestor queries) and all tombstones . | |
| 175 | |
| 176 shards := make([][]*itemEntity, s.ShardCount) | |
| 177 tombsEntity := tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
| 178 | |
| 179 wg := sync.WaitGroup{} | |
| 180 errs := errors.NewLazyMultiError(s.ShardCount + 1) | |
| 181 | |
| 182 wg.Add(1) | |
| 183 go func() { | |
| 184 defer wg.Done() | |
| 185 if err := datastore.Get(c, &tombsEntity); err != nil && err != d atastore.ErrNoSuchEntity { | |
| 186 errs.Assign(0, err) | |
| 187 } | |
| 188 }() | |
| 189 | |
| 190 wg.Add(s.ShardCount) | |
| 191 for i := 0; i < s.ShardCount; i++ { | |
| 192 go func(i int) { | |
| 193 defer wg.Done() | |
| 194 q := datastore.NewQuery("dsset.Item").Ancestor(s.shardRo ot(c, i)) | |
| 195 errs.Assign(i+1, datastore.GetAll(c, q, &shards[i])) | |
| 196 }(i) | |
| 197 } | |
| 198 | |
| 199 wg.Wait() | |
| 200 if err := errs.Get(); err != nil { | |
| 201 return nil, nil, transient.Tag.Apply(err) | |
| 202 } | |
| 203 | |
| 204 logging.Infof(c, "%s: %d tombs", s.ID, len(tombsEntity.Tombstones)) | |
| 205 for i, shard := range shards { | |
| 206 logging.Infof(c, "%s: shard #%d - %d items", s.ID, i, len(shard) ) | |
| 207 } | |
| 208 | |
| 209 // A set of items we pretend not to see. Initially all tombstoned ones. | |
| 210 // | |
| 211 // Since we are iterating over tombstone list anyway, find all sufficien tly | |
| 212 // old tombstones to return them to the caller, so they can be cleaned u p. | |
| 213 ignore := stringset.New(len(tombsEntity.Tombstones)) | |
| 214 oldestTombs := map[string]*Tombstone{} // key is item ID | |
| 215 now := clock.Now(c).UTC() | |
| 216 for _, t := range tombsEntity.Tombstones { | |
| 217 ignore.Add(t.ID) | |
| 218 if now.Sub(t.Tombstoned) > s.TombstonesDelay { | |
| 219 oldestTombs[t.ID] = &Tombstone{set: s.ID, id: t.ID} | |
| 220 } | |
| 221 } | |
| 222 | |
| 223 // Join all shards, throwing away tombstoned and duplicated items. For o ldest | |
| 224 // tombstones collect references to entities that need to be deleted. Th ere | |
| 225 // may be more than one such entity per ID due to possibility of differe nt | |
| 226 // shards hosting same item (if Add call was retried)! | |
|
tandrii(chromium)
2017/07/18 17:54:18
the same item
Vadim Sh.
2017/07/24 00:30:18
Done.
| |
| 227 for _, shard := range shards { | |
| 228 for _, itemEntity := range shard { | |
| 229 if !ignore.Has(itemEntity.ID) { | |
| 230 items = append(items, Item{ | |
| 231 ID: itemEntity.ID, | |
| 232 Value: itemEntity.Value, | |
| 233 }) | |
| 234 ignore.Add(itemEntity.ID) | |
| 235 } | |
| 236 if tomb := oldestTombs[itemEntity.ID]; tomb != nil { | |
| 237 tomb.storage = append(tomb.storage, datastore.Ke yForObj(c, itemEntity)) | |
| 238 } | |
| 239 } | |
| 240 } | |
| 241 | |
| 242 if len(oldestTombs) != 0 { | |
| 243 cleanup = make([]Tombstone, 0, len(oldestTombs)) | |
| 244 for _, tomb := range oldestTombs { | |
| 245 cleanup = append(cleanup, *tomb) | |
| 246 } | |
| 247 } | |
| 248 | |
| 249 return items, cleanup, nil | |
| 250 } | |
| 251 | |
| 252 // CleanupStorage deletes entities used to store items under given tombstones. | |
|
tandrii(chromium)
2017/07/18 17:54:18
nit: I'd call it GC :)
Vadim Sh.
2017/07/24 00:30:17
There are two "kinds" of GC: one removes itemEntit
| |
| 253 // | |
| 254 // Touches many entity groups. Must be called outside of transactions. | |
| 255 // Idempotent. | |
| 256 // | |
| 257 // This MUST be called before tombstones are popped. Failure to do so will make | |
|
tandrii(chromium)
2017/07/18 17:54:18
is this worth enforcing? If so, then we could reco
Vadim Sh.
2017/07/24 00:30:16
Done, sort of.
| |
| 258 // items reappear in the set. | |
| 259 // | |
| 260 // Returns only transient errors. | |
| 261 func (s *Set) CleanupStorage(c context.Context, cleanup []Tombstone) error { | |
| 262 if datastore.CurrentTransaction(c) != nil { | |
| 263 panic("dsset.Set.CleanupStorage must be called outside of a tran saction") | |
| 264 } | |
| 265 | |
| 266 keys := []*datastore.Key{} | |
| 267 for _, old := range cleanup { | |
| 268 if old.set != s.ID { | |
| 269 panic("passed Tombstone from some other set") | |
| 270 } | |
| 271 keys = append(keys, old.storage...) | |
| 272 } | |
| 273 | |
| 274 // TODO: parallel | |
| 275 | |
| 276 merr := errors.MultiError{} | |
| 277 for len(keys) > 0 { | |
| 278 count := 100 | |
| 279 if count > len(keys) { | |
| 280 count = len(keys) | |
| 281 } | |
| 282 logging.Infof(c, "Cleaning up storage for %q by removing %d keys ", s.ID, count) | |
| 283 if err := datastore.Delete(c, keys[:count]); err != nil { | |
| 284 logging.Warningf(c, "RPC failed - %s", err) | |
| 285 merr = append(merr, err) | |
| 286 } | |
| 287 keys = keys[count:] | |
| 288 } | |
| 289 | |
| 290 if len(merr) == 0 { | |
| 291 return nil | |
| 292 } | |
| 293 | |
| 294 return transient.Tag.Apply(merr) | |
| 295 } | |
| 296 | |
| 297 // PopOp is an in-progress Pop operation. | |
| 298 // | |
| 299 // See BeginPop. | |
| 300 type PopOp struct { | |
| 301 ctx context.Context // datastore context to use for this op | |
| 302 set string // id of the set | |
| 303 now time.Time // popping time for all popped items | |
| 304 entity *tombstonesEntity // entity with tombstones | |
| 305 tombs map[string]tombstone // tombstones in a map form | |
| 306 dirty bool // true if the tombstone map was modified | |
| 307 } | |
| 308 | |
| 309 // BeginPop initiates Pop operation. | |
| 310 // | |
| 311 // Pop operation is used to transactionally remove items from the set, as well | |
| 312 // as cleanup old tombstones. It must be finished with Submit. | |
| 313 // | |
| 314 // Requires a transaction. Modifies TombstonesRoot entity group (and only it). | |
| 315 // | |
| 316 // Returns only transient errors. | |
| 317 func (s *Set) BeginPop(c context.Context) (*PopOp, error) { | |
| 318 if datastore.CurrentTransaction(c) == nil { | |
| 319 panic("dsset.Set.BeginPop must be called inside a transaction") | |
| 320 } | |
| 321 | |
| 322 entity := &tombstonesEntity{ID: s.ID, Parent: s.TombstonesRoot} | |
| 323 if err := datastore.Get(c, entity); err != nil && err != datastore.ErrNo SuchEntity { | |
| 324 return nil, transient.Tag.Apply(err) | |
| 325 } | |
| 326 | |
| 327 // The data in tombstonesEntity, in map form. | |
| 328 tombs := make(map[string]tombstone, len(entity.Tombstones)) | |
| 329 for _, t := range entity.Tombstones { | |
| 330 tombs[t.ID] = t | |
| 331 } | |
| 332 | |
| 333 return &PopOp{ | |
| 334 ctx: c, | |
| 335 set: s.ID, | |
| 336 now: clock.Now(c).UTC(), | |
| 337 entity: entity, | |
| 338 tombs: tombs, | |
| 339 dirty: false, | |
| 340 }, nil | |
| 341 } | |
| 342 | |
| 343 // AlreadyPopped returns true if the given item has already been popped, perhaps | |
| 344 // in another transaction. | |
| 345 // | |
| 346 // Returns false for items that are still in the set, or items that have never | |
| 347 // been in the set to being with. Use List (prior to starting the transaction) | |
|
tandrii(chromium)
2017/07/18 17:54:18
to beGIN with
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
| 348 // to distinguish this two cases. | |
|
tandrii(chromium)
2017/07/18 17:54:18
these
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
| 349 func (p *PopOp) AlreadyPopped(id string) bool { | |
| 350 _, hasTomb := p.tombs[id] | |
| 351 return hasTomb | |
| 352 } | |
| 353 | |
| 354 // Pop pop the item from the set and returns true if it was indeed | |
|
tandrii(chromium)
2017/07/18 17:54:18
Pop pops
Vadim Sh.
2017/07/24 00:30:17
Done.
| |
| 355 // popped. | |
| 356 // | |
| 357 // Returns false if this item has been popped before (perhaps in another | |
| 358 // transaction). | |
| 359 // | |
| 360 // Returns true if item existed in the set and was popped, or it never existed | |
| 361 // there at all. Popping items that have never been in the set leads to weird | |
| 362 // behavior, don't do this. Use List (prior to starting the transaction) | |
| 363 // to figure out what items can be potentially popped. | |
| 364 func (p *PopOp) Pop(id string) bool { | |
| 365 if _, hasTomb := p.tombs[id]; hasTomb { | |
| 366 return false | |
| 367 } | |
| 368 p.tombs[id] = tombstone{ID: id, Tombstoned: p.now} | |
|
tandrii(chromium)
2017/07/18 17:54:19
Suppose List is called > TombstonesDelay seconds b
Vadim Sh.
2017/07/24 00:30:16
It is single consumer mostly because 1 QPS limit o
| |
| 369 p.dirty = true | |
| 370 return true | |
| 371 } | |
| 372 | |
| 373 // CleanupTombstones removes old tombstones. | |
| 374 // | |
| 375 // This must be called only after set's CleanupStorage method, otherwise removed | |
| 376 // items may reappear again. | |
| 377 func (p *PopOp) CleanupTombstones(tombstones []Tombstone) { | |
| 378 for _, old := range tombstones { | |
| 379 if old.set != p.set { | |
| 380 panic("passed Tombstone from some other set") | |
| 381 } | |
| 382 if _, hasTomb := p.tombs[old.id]; hasTomb { | |
| 383 delete(p.tombs, old.id) | |
| 384 p.dirty = true | |
| 385 } | |
| 386 } | |
| 387 } | |
| 388 | |
| 389 // Submit completes this pop operation but submitting changes to datastore. | |
|
tandrii(chromium)
2017/07/18 15:55:14
s/but/by
Vadim Sh.
2017/07/24 00:30:15
Done.
| |
| 390 // | |
| 391 // The operation is unusable after this. | |
|
tandrii(chromium)
2017/07/18 17:54:19
wdyt about adding a field "submitted"=false by def
Vadim Sh.
2017/07/24 00:30:17
I did it initially, but the removed to simplify co
| |
| 392 func (p *PopOp) Submit() error { | |
| 393 if p.dirty { | |
| 394 p.entity.Tombstones = p.entity.Tombstones[:0] | |
| 395 for _, tomb := range p.tombs { | |
| 396 p.entity.Tombstones = append(p.entity.Tombstones, tomb) | |
| 397 } | |
| 398 if err := datastore.Put(p.ctx, p.entity); err != nil { | |
| 399 return transient.Tag.Apply(err) | |
| 400 } | |
| 401 } | |
| 402 return nil | |
| 403 } | |
| 404 | |
| 405 //////////////////////////////////////////////////////////////////////////////// | |
| 406 | |
| 407 type itemEntity struct { | |
| 408 _kind string `gae:"$kind,dsset.Item"` | |
| 409 | |
| 410 ID string `gae:"$id"` | |
| 411 Parent *datastore.Key `gae:"$parent"` | |
| 412 Value []byte `gae:",noindex"` | |
| 413 } | |
| 414 | |
| 415 type tombstonesEntity struct { | |
| 416 _kind string `gae:"$kind,dsset.Tombstones"` | |
| 417 | |
| 418 ID string `gae:"$id"` | |
| 419 Parent *datastore.Key `gae:"$parent"` | |
| 420 Tombstones []tombstone `gae:",noindex"` | |
| 421 } | |
| 422 | |
| 423 type tombstone struct { | |
| 424 ID string // ID of tombstoned item | |
| 425 Tombstoned time.Time // when it was popped | |
| 426 } | |
| 427 | |
| 428 // shardRoot returns entity group key to use for a given shard. | |
| 429 func (s *Set) shardRoot(c context.Context, n int) *datastore.Key { | |
| 430 return datastore.NewKey(c, "dsset.Shard", fmt.Sprintf("%s:%d", s.ID, n), 0, nil) | |
| 431 } | |
| OLD | NEW |