OLD | NEW |
1 // Package gkvlite provides a simple, ordered, ACID, key-value | 1 // Package gkvlite provides a simple, ordered, ACID, key-value |
2 // persistence library. It provides persistent, immutable data | 2 // persistence library. It provides persistent, immutable data |
3 // structure abstrations. | 3 // structure abstrations. |
4 package gkvlite | 4 package gkvlite |
5 | 5 |
6 import ( | 6 import ( |
7 "bytes" | 7 "bytes" |
8 "encoding/binary" | 8 "encoding/binary" |
9 "encoding/json" | 9 "encoding/json" |
10 "errors" | 10 "errors" |
11 "fmt" | 11 "fmt" |
12 "io" | 12 "io" |
13 "os" | 13 "os" |
14 "reflect" | 14 "reflect" |
15 "sort" | 15 "sort" |
16 "sync" | 16 "sync" |
17 "sync/atomic" | 17 "sync/atomic" |
18 "unsafe" | |
19 ) | 18 ) |
20 | 19 |
21 // A persistable store holding collections of ordered keys & values. | 20 // A persistable store holding collections of ordered keys & values. |
22 type Store struct { | 21 type Store struct { |
| 22 m sync.Mutex |
| 23 |
23 // Atomic CAS'ed int64/uint64's must be at the top for 32-bit compatibil
ity. | 24 // Atomic CAS'ed int64/uint64's must be at the top for 32-bit compatibil
ity. |
24 » size int64 // Atomic protected; file size or next write p
osition. | 25 » size int64 // Atomic protected; file size or nex
t write position. |
25 » nodeAllocs uint64 // Atomic protected; total node allocation sta
ts. | 26 » nodeAllocs uint64 // Atomic protected; total node alloc
ation stats. |
26 » coll unsafe.Pointer // Copy-on-write map[string]*Collection. | 27 » coll *map[string]*Collection // Copy-on-write map[string]*Collecti
on. |
27 » file StoreFile // When nil, we're memory-only or no persisten
ce. | 28 » file StoreFile // When nil, we're memory-only or no
persistence. |
28 » callbacks StoreCallbacks // Optional / may be nil. | 29 » callbacks StoreCallbacks // Optional / may be nil. |
29 » readOnly bool // When true, Flush()'ing is disallowed. | 30 » readOnly bool // When true, Flush()'ing is disallow
ed. |
| 31 } |
| 32 |
| 33 func (s *Store) setColl(n *map[string]*Collection) { |
| 34 » s.m.Lock() |
| 35 » defer s.m.Unlock() |
| 36 » s.coll = n |
| 37 } |
| 38 |
| 39 func (s *Store) getColl() *map[string]*Collection { |
| 40 » s.m.Lock() |
| 41 » defer s.m.Unlock() |
| 42 » return s.coll |
| 43 } |
| 44 |
| 45 func (s *Store) casColl(o, n *map[string]*Collection) bool { |
| 46 » s.m.Lock() |
| 47 » defer s.m.Unlock() |
| 48 » if s.coll == o { |
| 49 » » s.coll = n |
| 50 » » return true |
| 51 » } |
| 52 » return false |
30 } | 53 } |
31 | 54 |
32 // The StoreFile interface is implemented by os.File. Application | 55 // The StoreFile interface is implemented by os.File. Application |
33 // specific implementations may add concurrency, caching, stats, etc. | 56 // specific implementations may add concurrency, caching, stats, etc. |
34 type StoreFile interface { | 57 type StoreFile interface { |
35 io.ReaderAt | 58 io.ReaderAt |
36 io.WriterAt | 59 io.WriterAt |
37 Stat() (os.FileInfo, error) | 60 Stat() (os.FileInfo, error) |
38 Truncate(size int64) error | 61 Truncate(size int64) error |
39 } | 62 } |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
88 var rootsLen int64 = int64(2*len(MAGIC_BEG) + 4 + 4 + rootsEndLen) | 111 var rootsLen int64 = int64(2*len(MAGIC_BEG) + 4 + 4 + rootsEndLen) |
89 | 112 |
90 // Provide a nil StoreFile for in-memory-only (non-persistent) usage. | 113 // Provide a nil StoreFile for in-memory-only (non-persistent) usage. |
91 func NewStore(file StoreFile) (*Store, error) { | 114 func NewStore(file StoreFile) (*Store, error) { |
92 return NewStoreEx(file, StoreCallbacks{}) | 115 return NewStoreEx(file, StoreCallbacks{}) |
93 } | 116 } |
94 | 117 |
95 func NewStoreEx(file StoreFile, | 118 func NewStoreEx(file StoreFile, |
96 callbacks StoreCallbacks) (*Store, error) { | 119 callbacks StoreCallbacks) (*Store, error) { |
97 coll := make(map[string]*Collection) | 120 coll := make(map[string]*Collection) |
98 » res := &Store{coll: unsafe.Pointer(&coll), callbacks: callbacks} | 121 » res := &Store{coll: &coll, callbacks: callbacks} |
99 if file == nil || !reflect.ValueOf(file).Elem().IsValid() { | 122 if file == nil || !reflect.ValueOf(file).Elem().IsValid() { |
100 return res, nil // Memory-only Store. | 123 return res, nil // Memory-only Store. |
101 } | 124 } |
102 res.file = file | 125 res.file = file |
103 if err := res.readRoots(); err != nil { | 126 if err := res.readRoots(); err != nil { |
104 return nil, err | 127 return nil, err |
105 } | 128 } |
106 return res, nil | 129 return res, nil |
107 } | 130 } |
108 | 131 |
109 // SetCollection() is used to create a named Collection, or to modify | 132 // SetCollection() is used to create a named Collection, or to modify |
110 // the KeyCompare function on an existing Collection. In either case, | 133 // the KeyCompare function on an existing Collection. In either case, |
111 // a new Collection to use is returned. A newly created Collection | 134 // a new Collection to use is returned. A newly created Collection |
112 // and any mutations on it won't be persisted until you do a Flush(). | 135 // and any mutations on it won't be persisted until you do a Flush(). |
113 func (s *Store) SetCollection(name string, compare KeyCompare) *Collection { | 136 func (s *Store) SetCollection(name string, compare KeyCompare) *Collection { |
114 if compare == nil { | 137 if compare == nil { |
115 compare = bytes.Compare | 138 compare = bytes.Compare |
116 } | 139 } |
117 for { | 140 for { |
118 » » orig := atomic.LoadPointer(&s.coll) | 141 » » orig := s.getColl() |
119 coll := copyColl(*(*map[string]*Collection)(orig)) | 142 coll := copyColl(*(*map[string]*Collection)(orig)) |
120 cnew := s.MakePrivateCollection(compare) | 143 cnew := s.MakePrivateCollection(compare) |
121 cnew.name = name | 144 cnew.name = name |
122 cold := coll[name] | 145 cold := coll[name] |
123 if cold != nil { | 146 if cold != nil { |
124 cnew.rootLock = cold.rootLock | 147 cnew.rootLock = cold.rootLock |
125 cnew.root = cold.rootAddRef() | 148 cnew.root = cold.rootAddRef() |
126 } | 149 } |
127 coll[name] = cnew | 150 coll[name] = cnew |
128 » » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&c
oll)) { | 151 » » if s.casColl(orig, &coll) { |
129 cold.closeCollection() | 152 cold.closeCollection() |
130 return cnew | 153 return cnew |
131 } | 154 } |
132 cnew.closeCollection() | 155 cnew.closeCollection() |
133 } | 156 } |
134 } | 157 } |
135 | 158 |
136 // Returns a new, unregistered (non-named) collection. This allows | 159 // Returns a new, unregistered (non-named) collection. This allows |
137 // advanced users to manage collections of private collections. | 160 // advanced users to manage collections of private collections. |
138 func (s *Store) MakePrivateCollection(compare KeyCompare) *Collection { | 161 func (s *Store) MakePrivateCollection(compare KeyCompare) *Collection { |
139 if compare == nil { | 162 if compare == nil { |
140 compare = bytes.Compare | 163 compare = bytes.Compare |
141 } | 164 } |
142 return &Collection{ | 165 return &Collection{ |
143 store: s, | 166 store: s, |
144 compare: compare, | 167 compare: compare, |
145 rootLock: &sync.Mutex{}, | 168 rootLock: &sync.Mutex{}, |
146 root: &rootNodeLoc{refs: 1, root: empty_nodeLoc}, | 169 root: &rootNodeLoc{refs: 1, root: empty_nodeLoc}, |
147 } | 170 } |
148 } | 171 } |
149 | 172 |
150 // Retrieves a named Collection. | 173 // Retrieves a named Collection. |
151 func (s *Store) GetCollection(name string) *Collection { | 174 func (s *Store) GetCollection(name string) *Collection { |
152 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) | 175 » return (*s.getColl())[name] |
153 » return coll[name] | |
154 } | 176 } |
155 | 177 |
156 func (s *Store) GetCollectionNames() []string { | 178 func (s *Store) GetCollectionNames() []string { |
157 » return collNames(*(*map[string]*Collection)(atomic.LoadPointer(&s.coll))
) | 179 » return collNames(*s.getColl()) |
158 } | 180 } |
159 | 181 |
160 func collNames(coll map[string]*Collection) []string { | 182 func collNames(coll map[string]*Collection) []string { |
161 res := make([]string, 0, len(coll)) | 183 res := make([]string, 0, len(coll)) |
162 for name := range coll { | 184 for name := range coll { |
163 res = append(res, name) | 185 res = append(res, name) |
164 } | 186 } |
165 sort.Strings(res) // Sorting because common callers need stability. | 187 sort.Strings(res) // Sorting because common callers need stability. |
166 return res | 188 return res |
167 } | 189 } |
168 | 190 |
169 // The Collection removal won't be reflected into persistence until | 191 // The Collection removal won't be reflected into persistence until |
170 // you do a Flush(). Invoking RemoveCollection(x) and then | 192 // you do a Flush(). Invoking RemoveCollection(x) and then |
171 // SetCollection(x) is a fast way to empty a Collection. | 193 // SetCollection(x) is a fast way to empty a Collection. |
172 func (s *Store) RemoveCollection(name string) { | 194 func (s *Store) RemoveCollection(name string) { |
173 for { | 195 for { |
174 » » orig := atomic.LoadPointer(&s.coll) | 196 » » orig := s.getColl() |
175 coll := copyColl(*(*map[string]*Collection)(orig)) | 197 coll := copyColl(*(*map[string]*Collection)(orig)) |
176 cold := coll[name] | 198 cold := coll[name] |
177 delete(coll, name) | 199 delete(coll, name) |
178 » » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&c
oll)) { | 200 » » if s.casColl(orig, &coll) { |
179 cold.closeCollection() | 201 cold.closeCollection() |
180 return | 202 return |
181 } | 203 } |
182 } | 204 } |
183 } | 205 } |
184 | 206 |
185 func copyColl(orig map[string]*Collection) map[string]*Collection { | 207 func copyColl(orig map[string]*Collection) map[string]*Collection { |
186 res := make(map[string]*Collection) | 208 res := make(map[string]*Collection) |
187 for name, c := range orig { | 209 for name, c := range orig { |
188 res[name] = c | 210 res[name] = c |
189 } | 211 } |
190 return res | 212 return res |
191 } | 213 } |
192 | 214 |
193 // Writes (appends) any dirty, unpersisted data to file. As a | 215 // Writes (appends) any dirty, unpersisted data to file. As a |
194 // greater-window-of-data-loss versus higher-performance tradeoff, | 216 // greater-window-of-data-loss versus higher-performance tradeoff, |
195 // consider having many mutations (Set()'s & Delete()'s) and then | 217 // consider having many mutations (Set()'s & Delete()'s) and then |
196 // have a less occasional Flush() instead of Flush()'ing after every | 218 // have a less occasional Flush() instead of Flush()'ing after every |
197 // mutation. Users may also wish to file.Sync() after a Flush() for | 219 // mutation. Users may also wish to file.Sync() after a Flush() for |
198 // extra data-loss protection. | 220 // extra data-loss protection. |
199 func (s *Store) Flush() error { | 221 func (s *Store) Flush() error { |
200 if s.readOnly { | 222 if s.readOnly { |
201 return errors.New("readonly, so cannot Flush()") | 223 return errors.New("readonly, so cannot Flush()") |
202 } | 224 } |
203 if s.file == nil { | 225 if s.file == nil { |
204 return errors.New("no file / in-memory only, so cannot Flush()") | 226 return errors.New("no file / in-memory only, so cannot Flush()") |
205 } | 227 } |
206 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) | 228 » coll := *s.getColl() |
207 rnls := map[string]*rootNodeLoc{} | 229 rnls := map[string]*rootNodeLoc{} |
208 cnames := collNames(coll) | 230 cnames := collNames(coll) |
209 for _, name := range cnames { | 231 for _, name := range cnames { |
210 c := coll[name] | 232 c := coll[name] |
211 rnls[name] = c.rootAddRef() | 233 rnls[name] = c.rootAddRef() |
212 } | 234 } |
213 defer func() { | 235 defer func() { |
214 for _, name := range cnames { | 236 for _, name := range cnames { |
215 coll[name].rootDecRef(rnls[name]) | 237 coll[name].rootDecRef(rnls[name]) |
216 } | 238 } |
217 }() | 239 }() |
218 for _, name := range cnames { | 240 for _, name := range cnames { |
219 if err := coll[name].write(rnls[name].root); err != nil { | 241 if err := coll[name].write(rnls[name].root); err != nil { |
220 return err | 242 return err |
221 } | 243 } |
222 } | 244 } |
223 return s.writeRoots(rnls) | 245 return s.writeRoots(rnls) |
224 } | 246 } |
225 | 247 |
226 // Reverts the last Flush(), bringing the Store back to its state at | 248 // Reverts the last Flush(), bringing the Store back to its state at |
227 // its next-to-last Flush() or to an empty Store (with no Collections) | 249 // its next-to-last Flush() or to an empty Store (with no Collections) |
228 // if there were no next-to-last Flush(). This call will truncate the | 250 // if there were no next-to-last Flush(). This call will truncate the |
229 // Store file. | 251 // Store file. |
230 func (s *Store) FlushRevert() error { | 252 func (s *Store) FlushRevert() error { |
231 if s.file == nil { | 253 if s.file == nil { |
232 return errors.New("no file / in-memory only, so cannot FlushReve
rt()") | 254 return errors.New("no file / in-memory only, so cannot FlushReve
rt()") |
233 } | 255 } |
234 » orig := atomic.LoadPointer(&s.coll) | 256 » orig := s.getColl() |
235 coll := make(map[string]*Collection) | 257 coll := make(map[string]*Collection) |
236 » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&coll)) { | 258 » if s.casColl(orig, &coll) { |
237 for _, cold := range *(*map[string]*Collection)(orig) { | 259 for _, cold := range *(*map[string]*Collection)(orig) { |
238 cold.closeCollection() | 260 cold.closeCollection() |
239 } | 261 } |
240 } | 262 } |
241 if atomic.LoadInt64(&s.size) > rootsLen { | 263 if atomic.LoadInt64(&s.size) > rootsLen { |
242 atomic.AddInt64(&s.size, -1) | 264 atomic.AddInt64(&s.size, -1) |
243 } | 265 } |
244 err := s.readRootsScan(true) | 266 err := s.readRootsScan(true) |
245 if err != nil { | 267 if err != nil { |
246 return err | 268 return err |
247 } | 269 } |
248 if s.readOnly { | 270 if s.readOnly { |
249 return nil | 271 return nil |
250 } | 272 } |
251 return s.file.Truncate(atomic.LoadInt64(&s.size)) | 273 return s.file.Truncate(atomic.LoadInt64(&s.size)) |
252 } | 274 } |
253 | 275 |
254 // Returns a read-only snapshot, including any mutations on the | 276 // Returns a read-only snapshot, including any mutations on the |
255 // original Store that have not been Flush()'ed to disk yet. The | 277 // original Store that have not been Flush()'ed to disk yet. The |
256 // snapshot has its mutations and Flush() operations disabled because | 278 // snapshot has its mutations and Flush() operations disabled because |
257 // the original store "owns" writes to the StoreFile. | 279 // the original store "owns" writes to the StoreFile. |
258 func (s *Store) Snapshot() (snapshot *Store) { | 280 func (s *Store) Snapshot() (snapshot *Store) { |
259 » coll := copyColl(*(*map[string]*Collection)(atomic.LoadPointer(&s.coll))
) | 281 » coll := copyColl(*s.getColl()) |
260 res := &Store{ | 282 res := &Store{ |
261 » » coll: unsafe.Pointer(&coll), | 283 » » coll: &coll, |
262 file: s.file, | 284 file: s.file, |
263 size: atomic.LoadInt64(&s.size), | 285 size: atomic.LoadInt64(&s.size), |
264 readOnly: true, | 286 readOnly: true, |
265 callbacks: s.callbacks, | 287 callbacks: s.callbacks, |
266 } | 288 } |
267 for _, name := range collNames(coll) { | 289 for _, name := range collNames(coll) { |
268 collOrig := coll[name] | 290 collOrig := coll[name] |
269 coll[name] = &Collection{ | 291 coll[name] = &Collection{ |
270 store: res, | 292 store: res, |
271 compare: collOrig.compare, | 293 compare: collOrig.compare, |
272 rootLock: collOrig.rootLock, | 294 rootLock: collOrig.rootLock, |
273 root: collOrig.rootAddRef(), | 295 root: collOrig.rootAddRef(), |
274 } | 296 } |
275 } | 297 } |
276 return res | 298 return res |
277 } | 299 } |
278 | 300 |
279 func (s *Store) Close() { | 301 func (s *Store) Close() { |
280 s.file = nil | 302 s.file = nil |
281 » cptr := atomic.LoadPointer(&s.coll) | 303 » cptr := s.getColl() |
282 » if cptr == nil || | 304 » if cptr == nil || !s.casColl(cptr, nil) { |
283 » » !atomic.CompareAndSwapPointer(&s.coll, cptr, unsafe.Pointer(nil)
) { | |
284 return | 305 return |
285 } | 306 } |
286 coll := *(*map[string]*Collection)(cptr) | 307 coll := *(*map[string]*Collection)(cptr) |
287 for _, name := range collNames(coll) { | 308 for _, name := range collNames(coll) { |
288 coll[name].closeCollection() | 309 coll[name].closeCollection() |
289 } | 310 } |
290 } | 311 } |
291 | 312 |
292 // Copy all active collections and their items to a different file. | 313 // Copy all active collections and their items to a different file. |
293 // If flushEvery > 0, then during the item copying, Flush() will be | 314 // If flushEvery > 0, then during the item copying, Flush() will be |
294 // invoked at every flushEvery'th item and at the end of the item | 315 // invoked at every flushEvery'th item and at the end of the item |
295 // copying. The copy will not include any old items or nodes so the | 316 // copying. The copy will not include any old items or nodes so the |
296 // copy should be more compact if flushEvery is relatively large. | 317 // copy should be more compact if flushEvery is relatively large. |
297 func (s *Store) CopyTo(dstFile StoreFile, flushEvery int) (res *Store, err error
) { | 318 func (s *Store) CopyTo(dstFile StoreFile, flushEvery int) (res *Store, err error
) { |
298 dstStore, err := NewStore(dstFile) | 319 dstStore, err := NewStore(dstFile) |
299 if err != nil { | 320 if err != nil { |
300 return nil, err | 321 return nil, err |
301 } | 322 } |
302 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) | 323 » coll := *s.getColl() |
303 for _, name := range collNames(coll) { | 324 for _, name := range collNames(coll) { |
304 srcColl := coll[name] | 325 srcColl := coll[name] |
305 dstColl := dstStore.SetCollection(name, srcColl.compare) | 326 dstColl := dstStore.SetCollection(name, srcColl.compare) |
306 minItem, err := srcColl.MinItem(true) | 327 minItem, err := srcColl.MinItem(true) |
307 if err != nil { | 328 if err != nil { |
308 return nil, err | 329 return nil, err |
309 } | 330 } |
310 if minItem == nil { | 331 if minItem == nil { |
311 continue | 332 continue |
312 } | 333 } |
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
445 for collName, t := range m { | 466 for collName, t := range m { |
446 t.name = collName | 467 t.name = collName |
447 t.store = o | 468 t.store = o |
448 if o.callbacks.KeyCompareForCollection !
= nil { | 469 if o.callbacks.KeyCompareForCollection !
= nil { |
449 t.compare = o.callbacks.KeyCompa
reForCollection(collName) | 470 t.compare = o.callbacks.KeyCompa
reForCollection(collName) |
450 } | 471 } |
451 if t.compare == nil { | 472 if t.compare == nil { |
452 t.compare = bytes.Compare | 473 t.compare = bytes.Compare |
453 } | 474 } |
454 } | 475 } |
455 » » » » atomic.StorePointer(&o.coll, unsafe.Pointer(&m)) | 476 » » » » o.setColl(&m) |
456 return nil | 477 return nil |
457 } // else, perhaps value was unlucky in having MAGIC_END
's. | 478 } // else, perhaps value was unlucky in having MAGIC_END
's. |
458 } // else, perhaps a gkvlite file was stored as a value. | 479 } // else, perhaps a gkvlite file was stored as a value. |
459 atomic.AddInt64(&o.size, -1) // Roots were wrong, so keep scanni
ng. | 480 atomic.AddInt64(&o.size, -1) // Roots were wrong, so keep scanni
ng. |
460 } | 481 } |
461 } | 482 } |
462 | 483 |
463 func (o *Store) ItemAlloc(c *Collection, keyLength uint32) *Item { | 484 func (o *Store) ItemAlloc(c *Collection, keyLength uint32) *Item { |
464 if o.callbacks.ItemAlloc != nil { | 485 if o.callbacks.ItemAlloc != nil { |
465 return o.callbacks.ItemAlloc(c, keyLength) | 486 return o.callbacks.ItemAlloc(c, keyLength) |
(...skipping 23 matching lines...) Expand all Loading... |
489 return err | 510 return err |
490 } | 511 } |
491 | 512 |
492 func (o *Store) ItemValWrite(c *Collection, i *Item, w io.WriterAt, offset int64
) error { | 513 func (o *Store) ItemValWrite(c *Collection, i *Item, w io.WriterAt, offset int64
) error { |
493 if o.callbacks.ItemValWrite != nil { | 514 if o.callbacks.ItemValWrite != nil { |
494 return o.callbacks.ItemValWrite(c, i, w, offset) | 515 return o.callbacks.ItemValWrite(c, i, w, offset) |
495 } | 516 } |
496 _, err := w.WriteAt(i.Val, offset) | 517 _, err := w.WriteAt(i.Val, offset) |
497 return err | 518 return err |
498 } | 519 } |
OLD | NEW |