OLD | NEW |
1 package gkvlite | 1 package gkvlite |
2 | 2 |
3 import ( | 3 import ( |
4 "encoding/binary" | 4 "encoding/binary" |
5 "errors" | 5 "errors" |
6 "fmt" | 6 "fmt" |
| 7 "sync" |
7 "sync/atomic" | 8 "sync/atomic" |
8 "unsafe" | |
9 ) | 9 ) |
10 | 10 |
11 // A persistable item. | 11 // A persistable item. |
12 type Item struct { | 12 type Item struct { |
13 » Transient unsafe.Pointer // For any ephemeral data; atomic CAS recommend
ed. | 13 » Transient interface{} // For any ephemeral data. |
14 » Key, Val []byte // Val may be nil if not fetched into memory ye
t. | 14 » Key, Val []byte // Val may be nil if not fetched into memory yet. |
15 » Priority int32 // Use rand.Int31() for probabilistic balancing
. | 15 » Priority int32 // Use rand.Int31() for probabilistic balancing. |
16 } | 16 } |
17 | 17 |
18 // A persistable item and its persistence location. | 18 // A persistable item and its persistence location. |
19 type itemLoc struct { | 19 type itemLoc struct { |
20 » loc unsafe.Pointer // *ploc - can be nil if item is dirty (not yet pers
isted). | 20 » m sync.Mutex |
21 » item unsafe.Pointer // *Item - can be nil if item is not fetched into me
mory yet. | 21 |
| 22 » loc *ploc // can be nil if item is dirty (not yet persisted). |
| 23 » item *Item // can be nil if item is not fetched into memory yet. |
22 } | 24 } |
23 | 25 |
24 var empty_itemLoc = &itemLoc{} | 26 var empty_itemLoc = &itemLoc{} |
25 | 27 |
26 // Number of Key bytes plus number of Val bytes. | 28 // Number of Key bytes plus number of Val bytes. |
27 func (i *Item) NumBytes(c *Collection) int { | 29 func (i *Item) NumBytes(c *Collection) int { |
28 return len(i.Key) + i.NumValBytes(c) | 30 return len(i.Key) + i.NumValBytes(c) |
29 } | 31 } |
30 | 32 |
31 func (i *Item) NumValBytes(c *Collection) int { | 33 func (i *Item) NumValBytes(c *Collection) int { |
32 if c.store.callbacks.ItemValLength != nil { | 34 if c.store.callbacks.ItemValLength != nil { |
33 return c.store.callbacks.ItemValLength(c, i) | 35 return c.store.callbacks.ItemValLength(c, i) |
34 } | 36 } |
35 return len(i.Val) | 37 return len(i.Val) |
36 } | 38 } |
37 | 39 |
38 // The returned Item will not have been allocated through the optional | 40 // The returned Item will not have been allocated through the optional |
39 // StoreCallbacks.ItemAlloc() callback. | 41 // StoreCallbacks.ItemAlloc() callback. |
40 func (i *Item) Copy() *Item { | 42 func (i *Item) Copy() *Item { |
41 return &Item{ | 43 return &Item{ |
42 Key: i.Key, | 44 Key: i.Key, |
43 Val: i.Val, | 45 Val: i.Val, |
44 Priority: i.Priority, | 46 Priority: i.Priority, |
45 Transient: i.Transient, | 47 Transient: i.Transient, |
46 } | 48 } |
47 } | 49 } |
48 | 50 |
49 func (i *itemLoc) Loc() *ploc { | 51 func (i *itemLoc) Loc() *ploc { |
50 » return (*ploc)(atomic.LoadPointer(&i.loc)) | 52 » i.m.Lock() |
| 53 » defer i.m.Unlock() |
| 54 » return i.loc |
| 55 } |
| 56 |
| 57 func (i *itemLoc) setLoc(n *ploc) { |
| 58 » i.m.Lock() |
| 59 » defer i.m.Unlock() |
| 60 » i.loc = n |
51 } | 61 } |
52 | 62 |
53 func (i *itemLoc) Item() *Item { | 63 func (i *itemLoc) Item() *Item { |
54 » return (*Item)(atomic.LoadPointer(&i.item)) | 64 » i.m.Lock() |
| 65 » defer i.m.Unlock() |
| 66 » return i.item |
| 67 } |
| 68 |
| 69 func (i *itemLoc) casItem(o, n *Item) bool { |
| 70 » i.m.Lock() |
| 71 » defer i.m.Unlock() |
| 72 » if i.item == o { |
| 73 » » i.item = n |
| 74 » » return true |
| 75 » } |
| 76 » return false |
55 } | 77 } |
56 | 78 |
57 func (i *itemLoc) Copy(src *itemLoc) { | 79 func (i *itemLoc) Copy(src *itemLoc) { |
58 if src == nil { | 80 if src == nil { |
59 i.Copy(empty_itemLoc) | 81 i.Copy(empty_itemLoc) |
60 return | 82 return |
61 } | 83 } |
62 » atomic.StorePointer(&i.loc, unsafe.Pointer(src.Loc())) | 84 » newloc := src.Loc() |
63 » atomic.StorePointer(&i.item, unsafe.Pointer(src.Item())) | 85 » newitem := src.Item() |
| 86 |
| 87 » i.m.Lock() |
| 88 » defer i.m.Unlock() |
| 89 » i.loc = newloc |
| 90 » i.item = newitem |
64 } | 91 } |
65 | 92 |
66 const itemLoc_hdrLength int = 4 + 4 + 4 + 4 | 93 const itemLoc_hdrLength int = 4 + 4 + 4 + 4 |
67 | 94 |
68 func (i *itemLoc) write(c *Collection) (err error) { | 95 func (i *itemLoc) write(c *Collection) (err error) { |
69 if i.Loc().isEmpty() { | 96 if i.Loc().isEmpty() { |
70 iItem := i.Item() | 97 iItem := i.Item() |
71 if iItem == nil { | 98 if iItem == nil { |
72 return errors.New("itemLoc.write with nil item") | 99 return errors.New("itemLoc.write with nil item") |
73 } | 100 } |
(...skipping 23 matching lines...) Expand all Loading... |
97 pos, hlength) | 124 pos, hlength) |
98 } | 125 } |
99 if _, err := c.store.file.WriteAt(b, offset); err != nil { | 126 if _, err := c.store.file.WriteAt(b, offset); err != nil { |
100 return err | 127 return err |
101 } | 128 } |
102 err := c.store.ItemValWrite(c, iItem, c.store.file, offset+int64
(pos)) | 129 err := c.store.ItemValWrite(c, iItem, c.store.file, offset+int64
(pos)) |
103 if err != nil { | 130 if err != nil { |
104 return err | 131 return err |
105 } | 132 } |
106 atomic.StoreInt64(&c.store.size, offset+int64(ilength)) | 133 atomic.StoreInt64(&c.store.size, offset+int64(ilength)) |
107 » » atomic.StorePointer(&i.loc, | 134 » » i.setLoc(&ploc{Offset: offset, Length: uint32(ilength)}) |
108 » » » unsafe.Pointer(&ploc{Offset: offset, Length: uint32(ilen
gth)})) | |
109 } | 135 } |
110 return nil | 136 return nil |
111 } | 137 } |
112 | 138 |
113 func (iloc *itemLoc) read(c *Collection, withValue bool) (icur *Item, err error)
{ | 139 func (iloc *itemLoc) read(c *Collection, withValue bool) (icur *Item, err error)
{ |
114 if iloc == nil { | 140 if iloc == nil { |
115 return nil, nil | 141 return nil, nil |
116 } | 142 } |
117 icur = iloc.Item() | 143 icur = iloc.Item() |
118 if icur == nil || (icur.Val == nil && withValue) { | 144 if icur == nil || (icur.Val == nil && withValue) { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
163 return nil, err | 189 return nil, err |
164 } | 190 } |
165 } | 191 } |
166 if c.store.callbacks.AfterItemRead != nil { | 192 if c.store.callbacks.AfterItemRead != nil { |
167 i, err = c.store.callbacks.AfterItemRead(c, i) | 193 i, err = c.store.callbacks.AfterItemRead(c, i) |
168 if err != nil { | 194 if err != nil { |
169 c.store.ItemDecRef(c, i) | 195 c.store.ItemDecRef(c, i) |
170 return nil, err | 196 return nil, err |
171 } | 197 } |
172 } | 198 } |
173 » » if !atomic.CompareAndSwapPointer(&iloc.item, | 199 » » if !iloc.casItem(icur, i) { |
174 » » » unsafe.Pointer(icur), unsafe.Pointer(i)) { | |
175 c.store.ItemDecRef(c, i) | 200 c.store.ItemDecRef(c, i) |
176 return iloc.read(c, withValue) | 201 return iloc.read(c, withValue) |
177 } | 202 } |
178 if icur != nil { | 203 if icur != nil { |
179 c.store.ItemDecRef(c, icur) | 204 c.store.ItemDecRef(c, icur) |
180 } | 205 } |
181 icur = i | 206 icur = i |
182 } | 207 } |
183 return icur, nil | 208 return icur, nil |
184 } | 209 } |
185 | 210 |
186 func (iloc *itemLoc) NumBytes(c *Collection) int { | 211 func (iloc *itemLoc) NumBytes(c *Collection) int { |
187 loc := iloc.Loc() | 212 loc := iloc.Loc() |
188 if loc.isEmpty() { | 213 if loc.isEmpty() { |
189 i := iloc.Item() | 214 i := iloc.Item() |
190 if i == nil { | 215 if i == nil { |
191 return 0 | 216 return 0 |
192 } | 217 } |
193 return i.NumBytes(c) | 218 return i.NumBytes(c) |
194 } | 219 } |
195 return int(loc.Length) - itemLoc_hdrLength | 220 return int(loc.Length) - itemLoc_hdrLength |
196 } | 221 } |
OLD | NEW |