| OLD | NEW |
| 1 // Package gkvlite provides a simple, ordered, ACID, key-value | 1 // Package gkvlite provides a simple, ordered, ACID, key-value |
| 2 // persistence library. It provides persistent, immutable data | 2 // persistence library. It provides persistent, immutable data |
| 3 // structure abstrations. | 3 // structure abstrations. |
| 4 package gkvlite | 4 package gkvlite |
| 5 | 5 |
| 6 import ( | 6 import ( |
| 7 "bytes" | 7 "bytes" |
| 8 "encoding/binary" | 8 "encoding/binary" |
| 9 "encoding/json" | 9 "encoding/json" |
| 10 "errors" | 10 "errors" |
| 11 "fmt" | 11 "fmt" |
| 12 "io" | 12 "io" |
| 13 "os" | 13 "os" |
| 14 "reflect" | 14 "reflect" |
| 15 "sort" | 15 "sort" |
| 16 "sync" | 16 "sync" |
| 17 "sync/atomic" | 17 "sync/atomic" |
| 18 "unsafe" | |
| 19 ) | 18 ) |
| 20 | 19 |
| 21 // A persistable store holding collections of ordered keys & values. | 20 // A persistable store holding collections of ordered keys & values. |
| 22 type Store struct { | 21 type Store struct { |
| 22 m sync.Mutex |
| 23 |
| 23 // Atomic CAS'ed int64/uint64's must be at the top for 32-bit compatibil
ity. | 24 // Atomic CAS'ed int64/uint64's must be at the top for 32-bit compatibil
ity. |
| 24 » size int64 // Atomic protected; file size or next write p
osition. | 25 » size int64 // Atomic protected; file size or nex
t write position. |
| 25 » nodeAllocs uint64 // Atomic protected; total node allocation sta
ts. | 26 » nodeAllocs uint64 // Atomic protected; total node alloc
ation stats. |
| 26 » coll unsafe.Pointer // Copy-on-write map[string]*Collection. | 27 » coll *map[string]*Collection // Copy-on-write map[string]*Collecti
on. |
| 27 » file StoreFile // When nil, we're memory-only or no persisten
ce. | 28 » file StoreFile // When nil, we're memory-only or no
persistence. |
| 28 » callbacks StoreCallbacks // Optional / may be nil. | 29 » callbacks StoreCallbacks // Optional / may be nil. |
| 29 » readOnly bool // When true, Flush()'ing is disallowed. | 30 » readOnly bool // When true, Flush()'ing is disallow
ed. |
| 31 } |
| 32 |
| 33 func (s *Store) setColl(n *map[string]*Collection) { |
| 34 » s.m.Lock() |
| 35 » defer s.m.Unlock() |
| 36 » s.coll = n |
| 37 } |
| 38 |
| 39 func (s *Store) getColl() *map[string]*Collection { |
| 40 » s.m.Lock() |
| 41 » defer s.m.Unlock() |
| 42 » return s.coll |
| 43 } |
| 44 |
| 45 func (s *Store) casColl(o, n *map[string]*Collection) bool { |
| 46 » s.m.Lock() |
| 47 » defer s.m.Unlock() |
| 48 » if s.coll == o { |
| 49 » » s.coll = n |
| 50 » » return true |
| 51 » } |
| 52 » return false |
| 30 } | 53 } |
| 31 | 54 |
| 32 // The StoreFile interface is implemented by os.File. Application | 55 // The StoreFile interface is implemented by os.File. Application |
| 33 // specific implementations may add concurrency, caching, stats, etc. | 56 // specific implementations may add concurrency, caching, stats, etc. |
| 34 type StoreFile interface { | 57 type StoreFile interface { |
| 35 io.ReaderAt | 58 io.ReaderAt |
| 36 io.WriterAt | 59 io.WriterAt |
| 37 Stat() (os.FileInfo, error) | 60 Stat() (os.FileInfo, error) |
| 38 Truncate(size int64) error | 61 Truncate(size int64) error |
| 39 } | 62 } |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 88 var rootsLen int64 = int64(2*len(MAGIC_BEG) + 4 + 4 + rootsEndLen) | 111 var rootsLen int64 = int64(2*len(MAGIC_BEG) + 4 + 4 + rootsEndLen) |
| 89 | 112 |
| 90 // Provide a nil StoreFile for in-memory-only (non-persistent) usage. | 113 // Provide a nil StoreFile for in-memory-only (non-persistent) usage. |
| 91 func NewStore(file StoreFile) (*Store, error) { | 114 func NewStore(file StoreFile) (*Store, error) { |
| 92 return NewStoreEx(file, StoreCallbacks{}) | 115 return NewStoreEx(file, StoreCallbacks{}) |
| 93 } | 116 } |
| 94 | 117 |
| 95 func NewStoreEx(file StoreFile, | 118 func NewStoreEx(file StoreFile, |
| 96 callbacks StoreCallbacks) (*Store, error) { | 119 callbacks StoreCallbacks) (*Store, error) { |
| 97 coll := make(map[string]*Collection) | 120 coll := make(map[string]*Collection) |
| 98 » res := &Store{coll: unsafe.Pointer(&coll), callbacks: callbacks} | 121 » res := &Store{coll: &coll, callbacks: callbacks} |
| 99 if file == nil || !reflect.ValueOf(file).Elem().IsValid() { | 122 if file == nil || !reflect.ValueOf(file).Elem().IsValid() { |
| 100 return res, nil // Memory-only Store. | 123 return res, nil // Memory-only Store. |
| 101 } | 124 } |
| 102 res.file = file | 125 res.file = file |
| 103 if err := res.readRoots(); err != nil { | 126 if err := res.readRoots(); err != nil { |
| 104 return nil, err | 127 return nil, err |
| 105 } | 128 } |
| 106 return res, nil | 129 return res, nil |
| 107 } | 130 } |
| 108 | 131 |
| 109 // SetCollection() is used to create a named Collection, or to modify | 132 // SetCollection() is used to create a named Collection, or to modify |
| 110 // the KeyCompare function on an existing Collection. In either case, | 133 // the KeyCompare function on an existing Collection. In either case, |
| 111 // a new Collection to use is returned. A newly created Collection | 134 // a new Collection to use is returned. A newly created Collection |
| 112 // and any mutations on it won't be persisted until you do a Flush(). | 135 // and any mutations on it won't be persisted until you do a Flush(). |
| 113 func (s *Store) SetCollection(name string, compare KeyCompare) *Collection { | 136 func (s *Store) SetCollection(name string, compare KeyCompare) *Collection { |
| 114 if compare == nil { | 137 if compare == nil { |
| 115 compare = bytes.Compare | 138 compare = bytes.Compare |
| 116 } | 139 } |
| 117 for { | 140 for { |
| 118 » » orig := atomic.LoadPointer(&s.coll) | 141 » » orig := s.getColl() |
| 119 coll := copyColl(*(*map[string]*Collection)(orig)) | 142 coll := copyColl(*(*map[string]*Collection)(orig)) |
| 120 cnew := s.MakePrivateCollection(compare) | 143 cnew := s.MakePrivateCollection(compare) |
| 121 cnew.name = name | 144 cnew.name = name |
| 122 cold := coll[name] | 145 cold := coll[name] |
| 123 if cold != nil { | 146 if cold != nil { |
| 124 cnew.rootLock = cold.rootLock | 147 cnew.rootLock = cold.rootLock |
| 125 cnew.root = cold.rootAddRef() | 148 cnew.root = cold.rootAddRef() |
| 126 } | 149 } |
| 127 coll[name] = cnew | 150 coll[name] = cnew |
| 128 » » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&c
oll)) { | 151 » » if s.casColl(orig, &coll) { |
| 129 cold.closeCollection() | 152 cold.closeCollection() |
| 130 return cnew | 153 return cnew |
| 131 } | 154 } |
| 132 cnew.closeCollection() | 155 cnew.closeCollection() |
| 133 } | 156 } |
| 134 } | 157 } |
| 135 | 158 |
| 136 // Returns a new, unregistered (non-named) collection. This allows | 159 // Returns a new, unregistered (non-named) collection. This allows |
| 137 // advanced users to manage collections of private collections. | 160 // advanced users to manage collections of private collections. |
| 138 func (s *Store) MakePrivateCollection(compare KeyCompare) *Collection { | 161 func (s *Store) MakePrivateCollection(compare KeyCompare) *Collection { |
| 139 if compare == nil { | 162 if compare == nil { |
| 140 compare = bytes.Compare | 163 compare = bytes.Compare |
| 141 } | 164 } |
| 142 return &Collection{ | 165 return &Collection{ |
| 143 store: s, | 166 store: s, |
| 144 compare: compare, | 167 compare: compare, |
| 145 rootLock: &sync.Mutex{}, | 168 rootLock: &sync.Mutex{}, |
| 146 root: &rootNodeLoc{refs: 1, root: empty_nodeLoc}, | 169 root: &rootNodeLoc{refs: 1, root: empty_nodeLoc}, |
| 147 } | 170 } |
| 148 } | 171 } |
| 149 | 172 |
| 150 // Retrieves a named Collection. | 173 // Retrieves a named Collection. |
| 151 func (s *Store) GetCollection(name string) *Collection { | 174 func (s *Store) GetCollection(name string) *Collection { |
| 152 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) | 175 » return (*s.getColl())[name] |
| 153 » return coll[name] | |
| 154 } | 176 } |
| 155 | 177 |
| 156 func (s *Store) GetCollectionNames() []string { | 178 func (s *Store) GetCollectionNames() []string { |
| 157 » return collNames(*(*map[string]*Collection)(atomic.LoadPointer(&s.coll))
) | 179 » return collNames(*s.getColl()) |
| 158 } | 180 } |
| 159 | 181 |
| 160 func collNames(coll map[string]*Collection) []string { | 182 func collNames(coll map[string]*Collection) []string { |
| 161 res := make([]string, 0, len(coll)) | 183 res := make([]string, 0, len(coll)) |
| 162 for name := range coll { | 184 for name := range coll { |
| 163 res = append(res, name) | 185 res = append(res, name) |
| 164 } | 186 } |
| 165 sort.Strings(res) // Sorting because common callers need stability. | 187 sort.Strings(res) // Sorting because common callers need stability. |
| 166 return res | 188 return res |
| 167 } | 189 } |
| 168 | 190 |
| 169 // The Collection removal won't be reflected into persistence until | 191 // The Collection removal won't be reflected into persistence until |
| 170 // you do a Flush(). Invoking RemoveCollection(x) and then | 192 // you do a Flush(). Invoking RemoveCollection(x) and then |
| 171 // SetCollection(x) is a fast way to empty a Collection. | 193 // SetCollection(x) is a fast way to empty a Collection. |
| 172 func (s *Store) RemoveCollection(name string) { | 194 func (s *Store) RemoveCollection(name string) { |
| 173 for { | 195 for { |
| 174 » » orig := atomic.LoadPointer(&s.coll) | 196 » » orig := s.getColl() |
| 175 coll := copyColl(*(*map[string]*Collection)(orig)) | 197 coll := copyColl(*(*map[string]*Collection)(orig)) |
| 176 cold := coll[name] | 198 cold := coll[name] |
| 177 delete(coll, name) | 199 delete(coll, name) |
| 178 » » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&c
oll)) { | 200 » » if s.casColl(orig, &coll) { |
| 179 cold.closeCollection() | 201 cold.closeCollection() |
| 180 return | 202 return |
| 181 } | 203 } |
| 182 } | 204 } |
| 183 } | 205 } |
| 184 | 206 |
| 185 func copyColl(orig map[string]*Collection) map[string]*Collection { | 207 func copyColl(orig map[string]*Collection) map[string]*Collection { |
| 186 res := make(map[string]*Collection) | 208 res := make(map[string]*Collection) |
| 187 for name, c := range orig { | 209 for name, c := range orig { |
| 188 res[name] = c | 210 res[name] = c |
| 189 } | 211 } |
| 190 return res | 212 return res |
| 191 } | 213 } |
| 192 | 214 |
| 193 // Writes (appends) any dirty, unpersisted data to file. As a | 215 // Writes (appends) any dirty, unpersisted data to file. As a |
| 194 // greater-window-of-data-loss versus higher-performance tradeoff, | 216 // greater-window-of-data-loss versus higher-performance tradeoff, |
| 195 // consider having many mutations (Set()'s & Delete()'s) and then | 217 // consider having many mutations (Set()'s & Delete()'s) and then |
| 196 // have a less occasional Flush() instead of Flush()'ing after every | 218 // have a less occasional Flush() instead of Flush()'ing after every |
| 197 // mutation. Users may also wish to file.Sync() after a Flush() for | 219 // mutation. Users may also wish to file.Sync() after a Flush() for |
| 198 // extra data-loss protection. | 220 // extra data-loss protection. |
| 199 func (s *Store) Flush() error { | 221 func (s *Store) Flush() error { |
| 200 if s.readOnly { | 222 if s.readOnly { |
| 201 return errors.New("readonly, so cannot Flush()") | 223 return errors.New("readonly, so cannot Flush()") |
| 202 } | 224 } |
| 203 if s.file == nil { | 225 if s.file == nil { |
| 204 return errors.New("no file / in-memory only, so cannot Flush()") | 226 return errors.New("no file / in-memory only, so cannot Flush()") |
| 205 } | 227 } |
| 206 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) | 228 » coll := *s.getColl() |
| 207 rnls := map[string]*rootNodeLoc{} | 229 rnls := map[string]*rootNodeLoc{} |
| 208 cnames := collNames(coll) | 230 cnames := collNames(coll) |
| 209 for _, name := range cnames { | 231 for _, name := range cnames { |
| 210 c := coll[name] | 232 c := coll[name] |
| 211 rnls[name] = c.rootAddRef() | 233 rnls[name] = c.rootAddRef() |
| 212 } | 234 } |
| 213 defer func() { | 235 defer func() { |
| 214 for _, name := range cnames { | 236 for _, name := range cnames { |
| 215 coll[name].rootDecRef(rnls[name]) | 237 coll[name].rootDecRef(rnls[name]) |
| 216 } | 238 } |
| 217 }() | 239 }() |
| 218 for _, name := range cnames { | 240 for _, name := range cnames { |
| 219 if err := coll[name].write(rnls[name].root); err != nil { | 241 if err := coll[name].write(rnls[name].root); err != nil { |
| 220 return err | 242 return err |
| 221 } | 243 } |
| 222 } | 244 } |
| 223 return s.writeRoots(rnls) | 245 return s.writeRoots(rnls) |
| 224 } | 246 } |
| 225 | 247 |
| 226 // Reverts the last Flush(), bringing the Store back to its state at | 248 // Reverts the last Flush(), bringing the Store back to its state at |
| 227 // its next-to-last Flush() or to an empty Store (with no Collections) | 249 // its next-to-last Flush() or to an empty Store (with no Collections) |
| 228 // if there were no next-to-last Flush(). This call will truncate the | 250 // if there were no next-to-last Flush(). This call will truncate the |
| 229 // Store file. | 251 // Store file. |
| 230 func (s *Store) FlushRevert() error { | 252 func (s *Store) FlushRevert() error { |
| 231 if s.file == nil { | 253 if s.file == nil { |
| 232 return errors.New("no file / in-memory only, so cannot FlushReve
rt()") | 254 return errors.New("no file / in-memory only, so cannot FlushReve
rt()") |
| 233 } | 255 } |
| 234 » orig := atomic.LoadPointer(&s.coll) | 256 » orig := s.getColl() |
| 235 coll := make(map[string]*Collection) | 257 coll := make(map[string]*Collection) |
| 236 » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&coll)) { | 258 » if s.casColl(orig, &coll) { |
| 237 for _, cold := range *(*map[string]*Collection)(orig) { | 259 for _, cold := range *(*map[string]*Collection)(orig) { |
| 238 cold.closeCollection() | 260 cold.closeCollection() |
| 239 } | 261 } |
| 240 } | 262 } |
| 241 if atomic.LoadInt64(&s.size) > rootsLen { | 263 if atomic.LoadInt64(&s.size) > rootsLen { |
| 242 atomic.AddInt64(&s.size, -1) | 264 atomic.AddInt64(&s.size, -1) |
| 243 } | 265 } |
| 244 err := s.readRootsScan(true) | 266 err := s.readRootsScan(true) |
| 245 if err != nil { | 267 if err != nil { |
| 246 return err | 268 return err |
| 247 } | 269 } |
| 248 if s.readOnly { | 270 if s.readOnly { |
| 249 return nil | 271 return nil |
| 250 } | 272 } |
| 251 return s.file.Truncate(atomic.LoadInt64(&s.size)) | 273 return s.file.Truncate(atomic.LoadInt64(&s.size)) |
| 252 } | 274 } |
| 253 | 275 |
| 254 // Returns a read-only snapshot, including any mutations on the | 276 // Returns a read-only snapshot, including any mutations on the |
| 255 // original Store that have not been Flush()'ed to disk yet. The | 277 // original Store that have not been Flush()'ed to disk yet. The |
| 256 // snapshot has its mutations and Flush() operations disabled because | 278 // snapshot has its mutations and Flush() operations disabled because |
| 257 // the original store "owns" writes to the StoreFile. | 279 // the original store "owns" writes to the StoreFile. |
| 258 func (s *Store) Snapshot() (snapshot *Store) { | 280 func (s *Store) Snapshot() (snapshot *Store) { |
| 259 » coll := copyColl(*(*map[string]*Collection)(atomic.LoadPointer(&s.coll))
) | 281 » coll := copyColl(*s.getColl()) |
| 260 res := &Store{ | 282 res := &Store{ |
| 261 » » coll: unsafe.Pointer(&coll), | 283 » » coll: &coll, |
| 262 file: s.file, | 284 file: s.file, |
| 263 size: atomic.LoadInt64(&s.size), | 285 size: atomic.LoadInt64(&s.size), |
| 264 readOnly: true, | 286 readOnly: true, |
| 265 callbacks: s.callbacks, | 287 callbacks: s.callbacks, |
| 266 } | 288 } |
| 267 for _, name := range collNames(coll) { | 289 for _, name := range collNames(coll) { |
| 268 collOrig := coll[name] | 290 collOrig := coll[name] |
| 269 coll[name] = &Collection{ | 291 coll[name] = &Collection{ |
| 270 store: res, | 292 store: res, |
| 271 compare: collOrig.compare, | 293 compare: collOrig.compare, |
| 272 rootLock: collOrig.rootLock, | 294 rootLock: collOrig.rootLock, |
| 273 root: collOrig.rootAddRef(), | 295 root: collOrig.rootAddRef(), |
| 274 } | 296 } |
| 275 } | 297 } |
| 276 return res | 298 return res |
| 277 } | 299 } |
| 278 | 300 |
| 279 func (s *Store) Close() { | 301 func (s *Store) Close() { |
| 280 s.file = nil | 302 s.file = nil |
| 281 » cptr := atomic.LoadPointer(&s.coll) | 303 » cptr := s.getColl() |
| 282 » if cptr == nil || | 304 » if cptr == nil || !s.casColl(cptr, nil) { |
| 283 » » !atomic.CompareAndSwapPointer(&s.coll, cptr, unsafe.Pointer(nil)
) { | |
| 284 return | 305 return |
| 285 } | 306 } |
| 286 coll := *(*map[string]*Collection)(cptr) | 307 coll := *(*map[string]*Collection)(cptr) |
| 287 for _, name := range collNames(coll) { | 308 for _, name := range collNames(coll) { |
| 288 coll[name].closeCollection() | 309 coll[name].closeCollection() |
| 289 } | 310 } |
| 290 } | 311 } |
| 291 | 312 |
| 292 // Copy all active collections and their items to a different file. | 313 // Copy all active collections and their items to a different file. |
| 293 // If flushEvery > 0, then during the item copying, Flush() will be | 314 // If flushEvery > 0, then during the item copying, Flush() will be |
| 294 // invoked at every flushEvery'th item and at the end of the item | 315 // invoked at every flushEvery'th item and at the end of the item |
| 295 // copying. The copy will not include any old items or nodes so the | 316 // copying. The copy will not include any old items or nodes so the |
| 296 // copy should be more compact if flushEvery is relatively large. | 317 // copy should be more compact if flushEvery is relatively large. |
| 297 func (s *Store) CopyTo(dstFile StoreFile, flushEvery int) (res *Store, err error
) { | 318 func (s *Store) CopyTo(dstFile StoreFile, flushEvery int) (res *Store, err error
) { |
| 298 dstStore, err := NewStore(dstFile) | 319 dstStore, err := NewStore(dstFile) |
| 299 if err != nil { | 320 if err != nil { |
| 300 return nil, err | 321 return nil, err |
| 301 } | 322 } |
| 302 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) | 323 » coll := *s.getColl() |
| 303 for _, name := range collNames(coll) { | 324 for _, name := range collNames(coll) { |
| 304 srcColl := coll[name] | 325 srcColl := coll[name] |
| 305 dstColl := dstStore.SetCollection(name, srcColl.compare) | 326 dstColl := dstStore.SetCollection(name, srcColl.compare) |
| 306 minItem, err := srcColl.MinItem(true) | 327 minItem, err := srcColl.MinItem(true) |
| 307 if err != nil { | 328 if err != nil { |
| 308 return nil, err | 329 return nil, err |
| 309 } | 330 } |
| 310 if minItem == nil { | 331 if minItem == nil { |
| 311 continue | 332 continue |
| 312 } | 333 } |
| (...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 445 for collName, t := range m { | 466 for collName, t := range m { |
| 446 t.name = collName | 467 t.name = collName |
| 447 t.store = o | 468 t.store = o |
| 448 if o.callbacks.KeyCompareForCollection !
= nil { | 469 if o.callbacks.KeyCompareForCollection !
= nil { |
| 449 t.compare = o.callbacks.KeyCompa
reForCollection(collName) | 470 t.compare = o.callbacks.KeyCompa
reForCollection(collName) |
| 450 } | 471 } |
| 451 if t.compare == nil { | 472 if t.compare == nil { |
| 452 t.compare = bytes.Compare | 473 t.compare = bytes.Compare |
| 453 } | 474 } |
| 454 } | 475 } |
| 455 » » » » atomic.StorePointer(&o.coll, unsafe.Pointer(&m)) | 476 » » » » o.setColl(&m) |
| 456 return nil | 477 return nil |
| 457 } // else, perhaps value was unlucky in having MAGIC_END
's. | 478 } // else, perhaps value was unlucky in having MAGIC_END
's. |
| 458 } // else, perhaps a gkvlite file was stored as a value. | 479 } // else, perhaps a gkvlite file was stored as a value. |
| 459 atomic.AddInt64(&o.size, -1) // Roots were wrong, so keep scanni
ng. | 480 atomic.AddInt64(&o.size, -1) // Roots were wrong, so keep scanni
ng. |
| 460 } | 481 } |
| 461 } | 482 } |
| 462 | 483 |
| 463 func (o *Store) ItemAlloc(c *Collection, keyLength uint32) *Item { | 484 func (o *Store) ItemAlloc(c *Collection, keyLength uint32) *Item { |
| 464 if o.callbacks.ItemAlloc != nil { | 485 if o.callbacks.ItemAlloc != nil { |
| 465 return o.callbacks.ItemAlloc(c, keyLength) | 486 return o.callbacks.ItemAlloc(c, keyLength) |
| (...skipping 23 matching lines...) Expand all Loading... |
| 489 return err | 510 return err |
| 490 } | 511 } |
| 491 | 512 |
| 492 func (o *Store) ItemValWrite(c *Collection, i *Item, w io.WriterAt, offset int64
) error { | 513 func (o *Store) ItemValWrite(c *Collection, i *Item, w io.WriterAt, offset int64
) error { |
| 493 if o.callbacks.ItemValWrite != nil { | 514 if o.callbacks.ItemValWrite != nil { |
| 494 return o.callbacks.ItemValWrite(c, i, w, offset) | 515 return o.callbacks.ItemValWrite(c, i, w, offset) |
| 495 } | 516 } |
| 496 _, err := w.WriteAt(i.Val, offset) | 517 _, err := w.WriteAt(i.Val, offset) |
| 497 return err | 518 return err |
| 498 } | 519 } |
| OLD | NEW |