OLD | NEW |
1 package gkvlite | 1 package gkvlite |
2 | 2 |
3 import ( | 3 import ( |
4 "encoding/json" | 4 "encoding/json" |
5 "errors" | 5 "errors" |
6 "fmt" | 6 "fmt" |
7 "math/rand" | 7 "math/rand" |
8 "sync" | 8 "sync" |
9 "sync/atomic" | |
10 "unsafe" | |
11 ) | 9 ) |
12 | 10 |
13 // User-supplied key comparison func should return 0 if a == b, | 11 // User-supplied key comparison func should return 0 if a == b, |
14 // -1 if a < b, and +1 if a > b. For example: bytes.Compare() | 12 // -1 if a < b, and +1 if a > b. For example: bytes.Compare() |
15 type KeyCompare func(a, b []byte) int | 13 type KeyCompare func(a, b []byte) int |
16 | 14 |
17 // A persistable collection of ordered key-values (Item's). | 15 // A persistable collection of ordered key-values (Item's). |
18 type Collection struct { | 16 type Collection struct { |
19 name string // May be "" for a private collection. | 17 name string // May be "" for a private collection. |
20 store *Store | 18 store *Store |
21 compare KeyCompare | 19 compare KeyCompare |
22 | 20 |
23 rootLock *sync.Mutex | 21 rootLock *sync.Mutex |
24 root *rootNodeLoc // Protected by rootLock. | 22 root *rootNodeLoc // Protected by rootLock. |
25 | 23 |
26 allocStats AllocStats // User must serialize access (e.g., see locks in
alloc.go). | 24 allocStats AllocStats // User must serialize access (e.g., see locks in
alloc.go). |
27 | 25 |
28 » AppData unsafe.Pointer // For app-specific data; atomic CAS recommended. | 26 » AppData interface{} // For app-specific data. |
29 } | 27 } |
30 | 28 |
31 type rootNodeLoc struct { | 29 type rootNodeLoc struct { |
32 // The rootNodeLoc fields are protected by Collection.rootLock. | 30 // The rootNodeLoc fields are protected by Collection.rootLock. |
33 refs int64 // Reference counter. | 31 refs int64 // Reference counter. |
34 root *nodeLoc | 32 root *nodeLoc |
35 next *rootNodeLoc // For free-list tracking. | 33 next *rootNodeLoc // For free-list tracking. |
36 | 34 |
37 reclaimMark node // Address is used as a sentinel. | 35 reclaimMark node // Address is used as a sentinel. |
38 | 36 |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
131 return errors.New("Item.Key/Val missing or too long") | 129 return errors.New("Item.Key/Val missing or too long") |
132 } | 130 } |
133 if item.Priority < 0 { | 131 if item.Priority < 0 { |
134 return errors.New("Item.Priority must be non-negative") | 132 return errors.New("Item.Priority must be non-negative") |
135 } | 133 } |
136 rnl := t.rootAddRef() | 134 rnl := t.rootAddRef() |
137 defer t.rootDecRef(rnl) | 135 defer t.rootDecRef(rnl) |
138 root := rnl.root | 136 root := rnl.root |
139 n := t.mkNode(nil, nil, nil, 1, uint64(len(item.Key))+uint64(item.NumVal
Bytes(t))) | 137 n := t.mkNode(nil, nil, nil, 1, uint64(len(item.Key))+uint64(item.NumVal
Bytes(t))) |
140 t.store.ItemAddRef(t, item) | 138 t.store.ItemAddRef(t, item) |
141 » n.item.item = unsafe.Pointer(item) // Avoid garbage via separate init. | 139 » n.item.item = item // Avoid garbage via separate init. |
142 nloc := t.mkNodeLoc(n) | 140 nloc := t.mkNodeLoc(n) |
143 defer t.freeNodeLoc(nloc) | 141 defer t.freeNodeLoc(nloc) |
144 r, err := t.store.union(t, root, nloc, &rnl.reclaimMark) | 142 r, err := t.store.union(t, root, nloc, &rnl.reclaimMark) |
145 if err != nil { | 143 if err != nil { |
146 return err | 144 return err |
147 } | 145 } |
148 rnlNew := t.mkRootNodeLoc(r) | 146 rnlNew := t.mkRootNodeLoc(r) |
149 // Can't reclaim n right now because r might point to n. | 147 // Can't reclaim n right now because r might point to n. |
150 rnlNew.reclaimLater[0] = t.reclaimMarkUpdate(nloc, | 148 rnlNew.reclaimLater[0] = t.reclaimMarkUpdate(nloc, |
151 &rnl.reclaimMark, &rnlNew.reclaimMark) | 149 &rnl.reclaimMark, &rnlNew.reclaimMark) |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
221 // Evict some clean items found by randomly walking a tree branch. | 219 // Evict some clean items found by randomly walking a tree branch. |
222 // For concurrent users, only the single mutator thread should call | 220 // For concurrent users, only the single mutator thread should call |
223 // EvictSomeItems(), making it serialized with mutations. | 221 // EvictSomeItems(), making it serialized with mutations. |
224 func (t *Collection) EvictSomeItems() (numEvicted uint64) { | 222 func (t *Collection) EvictSomeItems() (numEvicted uint64) { |
225 if t.store.readOnly { | 223 if t.store.readOnly { |
226 return 0 | 224 return 0 |
227 } | 225 } |
228 i, err := t.store.walk(t, false, func(n *node) (*nodeLoc, bool) { | 226 i, err := t.store.walk(t, false, func(n *node) (*nodeLoc, bool) { |
229 if !n.item.Loc().isEmpty() { | 227 if !n.item.Loc().isEmpty() { |
230 i := n.item.Item() | 228 i := n.item.Item() |
231 » » » if i != nil && atomic.CompareAndSwapPointer(&n.item.item
, | 229 » » » if i != nil && n.item.casItem(i, nil) { |
232 » » » » unsafe.Pointer(i), unsafe.Pointer(nil)) { | |
233 t.store.ItemDecRef(t, i) | 230 t.store.ItemDecRef(t, i) |
234 numEvicted++ | 231 numEvicted++ |
235 } | 232 } |
236 } | 233 } |
237 next := &n.left | 234 next := &n.left |
238 if (rand.Int() & 0x01) == 0x01 { | 235 if (rand.Int() & 0x01) == 0x01 { |
239 next = &n.right | 236 next = &n.right |
240 } | 237 } |
241 if next.isEmpty() { | 238 if next.isEmpty() { |
242 return nil, false | 239 return nil, false |
(...skipping 99 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
342 // Unmarshals JSON representation of root node file location. | 339 // Unmarshals JSON representation of root node file location. |
343 func (t *Collection) UnmarshalJSON(d []byte) error { | 340 func (t *Collection) UnmarshalJSON(d []byte) error { |
344 p := ploc{} | 341 p := ploc{} |
345 if err := json.Unmarshal(d, &p); err != nil { | 342 if err := json.Unmarshal(d, &p); err != nil { |
346 return err | 343 return err |
347 } | 344 } |
348 if t.rootLock == nil { | 345 if t.rootLock == nil { |
349 t.rootLock = &sync.Mutex{} | 346 t.rootLock = &sync.Mutex{} |
350 } | 347 } |
351 nloc := t.mkNodeLoc(nil) | 348 nloc := t.mkNodeLoc(nil) |
352 » nloc.loc = unsafe.Pointer(&p) | 349 » nloc.loc = &p |
353 if !t.rootCAS(nil, t.mkRootNodeLoc(nloc)) { | 350 if !t.rootCAS(nil, t.mkRootNodeLoc(nloc)) { |
354 return errors.New("concurrent mutation during UnmarshalJSON().") | 351 return errors.New("concurrent mutation during UnmarshalJSON().") |
355 } | 352 } |
356 return nil | 353 return nil |
357 } | 354 } |
358 | 355 |
359 func (t *Collection) AllocStats() (res AllocStats) { | 356 func (t *Collection) AllocStats() (res AllocStats) { |
360 withAllocLocks(func() { res = t.allocStats }) | 357 withAllocLocks(func() { res = t.allocStats }) |
361 return res | 358 return res |
362 } | 359 } |
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
467 t.reclaimNodes_unlocked(r.root.Node(), &r.reclaimLater, &r.reclaimMark) | 464 t.reclaimNodes_unlocked(r.root.Node(), &r.reclaimLater, &r.reclaimMark) |
468 for i := 0; i < len(r.reclaimLater); i++ { | 465 for i := 0; i < len(r.reclaimLater); i++ { |
469 if r.reclaimLater[i] != nil { | 466 if r.reclaimLater[i] != nil { |
470 t.reclaimNodes_unlocked(r.reclaimLater[i], nil, &r.recla
imMark) | 467 t.reclaimNodes_unlocked(r.reclaimLater[i], nil, &r.recla
imMark) |
471 r.reclaimLater[i] = nil | 468 r.reclaimLater[i] = nil |
472 } | 469 } |
473 } | 470 } |
474 t.freeNodeLoc(r.root) | 471 t.freeNodeLoc(r.root) |
475 t.freeRootNodeLoc(r) | 472 t.freeRootNodeLoc(r) |
476 } | 473 } |
OLD | NEW |