| OLD | NEW |
| 1 package gkvlite | 1 package gkvlite |
| 2 | 2 |
| 3 import ( | 3 import ( |
| 4 "encoding/binary" | 4 "encoding/binary" |
| 5 "errors" | 5 "errors" |
| 6 "fmt" | 6 "fmt" |
| 7 "sync" |
| 7 "sync/atomic" | 8 "sync/atomic" |
| 8 "unsafe" | |
| 9 ) | 9 ) |
| 10 | 10 |
| 11 // A persistable item. | 11 // A persistable item. |
| 12 type Item struct { | 12 type Item struct { |
| 13 » Transient unsafe.Pointer // For any ephemeral data; atomic CAS recommend
ed. | 13 » Transient interface{} // For any ephemeral data. |
| 14 » Key, Val []byte // Val may be nil if not fetched into memory ye
t. | 14 » Key, Val []byte // Val may be nil if not fetched into memory yet. |
| 15 » Priority int32 // Use rand.Int31() for probabilistic balancing
. | 15 » Priority int32 // Use rand.Int31() for probabilistic balancing. |
| 16 } | 16 } |
| 17 | 17 |
| 18 // A persistable item and its persistence location. | 18 // A persistable item and its persistence location. |
| 19 type itemLoc struct { | 19 type itemLoc struct { |
| 20 » loc unsafe.Pointer // *ploc - can be nil if item is dirty (not yet pers
isted). | 20 » m sync.Mutex |
| 21 » item unsafe.Pointer // *Item - can be nil if item is not fetched into me
mory yet. | 21 |
| 22 » loc *ploc // can be nil if item is dirty (not yet persisted). |
| 23 » item *Item // can be nil if item is not fetched into memory yet. |
| 22 } | 24 } |
| 23 | 25 |
| 24 var empty_itemLoc = &itemLoc{} | 26 var empty_itemLoc = &itemLoc{} |
| 25 | 27 |
| 26 // Number of Key bytes plus number of Val bytes. | 28 // Number of Key bytes plus number of Val bytes. |
| 27 func (i *Item) NumBytes(c *Collection) int { | 29 func (i *Item) NumBytes(c *Collection) int { |
| 28 return len(i.Key) + i.NumValBytes(c) | 30 return len(i.Key) + i.NumValBytes(c) |
| 29 } | 31 } |
| 30 | 32 |
| 31 func (i *Item) NumValBytes(c *Collection) int { | 33 func (i *Item) NumValBytes(c *Collection) int { |
| 32 if c.store.callbacks.ItemValLength != nil { | 34 if c.store.callbacks.ItemValLength != nil { |
| 33 return c.store.callbacks.ItemValLength(c, i) | 35 return c.store.callbacks.ItemValLength(c, i) |
| 34 } | 36 } |
| 35 return len(i.Val) | 37 return len(i.Val) |
| 36 } | 38 } |
| 37 | 39 |
| 38 // The returned Item will not have been allocated through the optional | 40 // The returned Item will not have been allocated through the optional |
| 39 // StoreCallbacks.ItemAlloc() callback. | 41 // StoreCallbacks.ItemAlloc() callback. |
| 40 func (i *Item) Copy() *Item { | 42 func (i *Item) Copy() *Item { |
| 41 return &Item{ | 43 return &Item{ |
| 42 Key: i.Key, | 44 Key: i.Key, |
| 43 Val: i.Val, | 45 Val: i.Val, |
| 44 Priority: i.Priority, | 46 Priority: i.Priority, |
| 45 Transient: i.Transient, | 47 Transient: i.Transient, |
| 46 } | 48 } |
| 47 } | 49 } |
| 48 | 50 |
| 49 func (i *itemLoc) Loc() *ploc { | 51 func (i *itemLoc) Loc() *ploc { |
| 50 » return (*ploc)(atomic.LoadPointer(&i.loc)) | 52 » i.m.Lock() |
| 53 » defer i.m.Unlock() |
| 54 » return i.loc |
| 55 } |
| 56 |
| 57 func (i *itemLoc) setLoc(n *ploc) { |
| 58 » i.m.Lock() |
| 59 » defer i.m.Unlock() |
| 60 » i.loc = n |
| 51 } | 61 } |
| 52 | 62 |
| 53 func (i *itemLoc) Item() *Item { | 63 func (i *itemLoc) Item() *Item { |
| 54 » return (*Item)(atomic.LoadPointer(&i.item)) | 64 » i.m.Lock() |
| 65 » defer i.m.Unlock() |
| 66 » return i.item |
| 67 } |
| 68 |
| 69 func (i *itemLoc) casItem(o, n *Item) bool { |
| 70 » i.m.Lock() |
| 71 » defer i.m.Unlock() |
| 72 » if i.item == o { |
| 73 » » i.item = n |
| 74 » » return true |
| 75 » } |
| 76 » return false |
| 55 } | 77 } |
| 56 | 78 |
| 57 func (i *itemLoc) Copy(src *itemLoc) { | 79 func (i *itemLoc) Copy(src *itemLoc) { |
| 58 if src == nil { | 80 if src == nil { |
| 59 i.Copy(empty_itemLoc) | 81 i.Copy(empty_itemLoc) |
| 60 return | 82 return |
| 61 } | 83 } |
| 62 » atomic.StorePointer(&i.loc, unsafe.Pointer(src.Loc())) | 84 » newloc := src.Loc() |
| 63 » atomic.StorePointer(&i.item, unsafe.Pointer(src.Item())) | 85 » newitem := src.Item() |
| 86 |
| 87 » i.m.Lock() |
| 88 » defer i.m.Unlock() |
| 89 » i.loc = newloc |
| 90 » i.item = newitem |
| 64 } | 91 } |
| 65 | 92 |
| 66 const itemLoc_hdrLength int = 4 + 4 + 4 + 4 | 93 const itemLoc_hdrLength int = 4 + 4 + 4 + 4 |
| 67 | 94 |
| 68 func (i *itemLoc) write(c *Collection) (err error) { | 95 func (i *itemLoc) write(c *Collection) (err error) { |
| 69 if i.Loc().isEmpty() { | 96 if i.Loc().isEmpty() { |
| 70 iItem := i.Item() | 97 iItem := i.Item() |
| 71 if iItem == nil { | 98 if iItem == nil { |
| 72 return errors.New("itemLoc.write with nil item") | 99 return errors.New("itemLoc.write with nil item") |
| 73 } | 100 } |
| (...skipping 23 matching lines...) Expand all Loading... |
| 97 pos, hlength) | 124 pos, hlength) |
| 98 } | 125 } |
| 99 if _, err := c.store.file.WriteAt(b, offset); err != nil { | 126 if _, err := c.store.file.WriteAt(b, offset); err != nil { |
| 100 return err | 127 return err |
| 101 } | 128 } |
| 102 err := c.store.ItemValWrite(c, iItem, c.store.file, offset+int64
(pos)) | 129 err := c.store.ItemValWrite(c, iItem, c.store.file, offset+int64
(pos)) |
| 103 if err != nil { | 130 if err != nil { |
| 104 return err | 131 return err |
| 105 } | 132 } |
| 106 atomic.StoreInt64(&c.store.size, offset+int64(ilength)) | 133 atomic.StoreInt64(&c.store.size, offset+int64(ilength)) |
| 107 » » atomic.StorePointer(&i.loc, | 134 » » i.setLoc(&ploc{Offset: offset, Length: uint32(ilength)}) |
| 108 » » » unsafe.Pointer(&ploc{Offset: offset, Length: uint32(ilen
gth)})) | |
| 109 } | 135 } |
| 110 return nil | 136 return nil |
| 111 } | 137 } |
| 112 | 138 |
| 113 func (iloc *itemLoc) read(c *Collection, withValue bool) (icur *Item, err error)
{ | 139 func (iloc *itemLoc) read(c *Collection, withValue bool) (icur *Item, err error)
{ |
| 114 if iloc == nil { | 140 if iloc == nil { |
| 115 return nil, nil | 141 return nil, nil |
| 116 } | 142 } |
| 117 icur = iloc.Item() | 143 icur = iloc.Item() |
| 118 if icur == nil || (icur.Val == nil && withValue) { | 144 if icur == nil || (icur.Val == nil && withValue) { |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 163 return nil, err | 189 return nil, err |
| 164 } | 190 } |
| 165 } | 191 } |
| 166 if c.store.callbacks.AfterItemRead != nil { | 192 if c.store.callbacks.AfterItemRead != nil { |
| 167 i, err = c.store.callbacks.AfterItemRead(c, i) | 193 i, err = c.store.callbacks.AfterItemRead(c, i) |
| 168 if err != nil { | 194 if err != nil { |
| 169 c.store.ItemDecRef(c, i) | 195 c.store.ItemDecRef(c, i) |
| 170 return nil, err | 196 return nil, err |
| 171 } | 197 } |
| 172 } | 198 } |
| 173 » » if !atomic.CompareAndSwapPointer(&iloc.item, | 199 » » if !iloc.casItem(icur, i) { |
| 174 » » » unsafe.Pointer(icur), unsafe.Pointer(i)) { | |
| 175 c.store.ItemDecRef(c, i) | 200 c.store.ItemDecRef(c, i) |
| 176 return iloc.read(c, withValue) | 201 return iloc.read(c, withValue) |
| 177 } | 202 } |
| 178 if icur != nil { | 203 if icur != nil { |
| 179 c.store.ItemDecRef(c, icur) | 204 c.store.ItemDecRef(c, icur) |
| 180 } | 205 } |
| 181 icur = i | 206 icur = i |
| 182 } | 207 } |
| 183 return icur, nil | 208 return icur, nil |
| 184 } | 209 } |
| 185 | 210 |
| 186 func (iloc *itemLoc) NumBytes(c *Collection) int { | 211 func (iloc *itemLoc) NumBytes(c *Collection) int { |
| 187 loc := iloc.Loc() | 212 loc := iloc.Loc() |
| 188 if loc.isEmpty() { | 213 if loc.isEmpty() { |
| 189 i := iloc.Item() | 214 i := iloc.Item() |
| 190 if i == nil { | 215 if i == nil { |
| 191 return 0 | 216 return 0 |
| 192 } | 217 } |
| 193 return i.NumBytes(c) | 218 return i.NumBytes(c) |
| 194 } | 219 } |
| 195 return int(loc.Length) - itemLoc_hdrLength | 220 return int(loc.Length) - itemLoc_hdrLength |
| 196 } | 221 } |
| OLD | NEW |