Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: store.go

Issue 1409173004: Remove usage of unsafe from gkvlite (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/gkvlite.git@master
Patch Set: get locks out of other lock Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « node.go ('k') | store_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Package gkvlite provides a simple, ordered, ACID, key-value 1 // Package gkvlite provides a simple, ordered, ACID, key-value
2 // persistence library. It provides persistent, immutable data 2 // persistence library. It provides persistent, immutable data
3 // structure abstrations. 3 // structure abstrations.
4 package gkvlite 4 package gkvlite
5 5
6 import ( 6 import (
7 "bytes" 7 "bytes"
8 "encoding/binary" 8 "encoding/binary"
9 "encoding/json" 9 "encoding/json"
10 "errors" 10 "errors"
11 "fmt" 11 "fmt"
12 "io" 12 "io"
13 "os" 13 "os"
14 "reflect" 14 "reflect"
15 "sort" 15 "sort"
16 "sync" 16 "sync"
17 "sync/atomic" 17 "sync/atomic"
18 "unsafe"
19 ) 18 )
20 19
21 // A persistable store holding collections of ordered keys & values. 20 // A persistable store holding collections of ordered keys & values.
22 type Store struct { 21 type Store struct {
22 m sync.Mutex
23
23 // Atomic CAS'ed int64/uint64's must be at the top for 32-bit compatibil ity. 24 // Atomic CAS'ed int64/uint64's must be at the top for 32-bit compatibil ity.
24 » size int64 // Atomic protected; file size or next write p osition. 25 » size int64 // Atomic protected; file size or nex t write position.
25 » nodeAllocs uint64 // Atomic protected; total node allocation sta ts. 26 » nodeAllocs uint64 // Atomic protected; total node alloc ation stats.
26 » coll unsafe.Pointer // Copy-on-write map[string]*Collection. 27 » coll *map[string]*Collection // Copy-on-write map[string]*Collecti on.
27 » file StoreFile // When nil, we're memory-only or no persisten ce. 28 » file StoreFile // When nil, we're memory-only or no persistence.
28 » callbacks StoreCallbacks // Optional / may be nil. 29 » callbacks StoreCallbacks // Optional / may be nil.
29 » readOnly bool // When true, Flush()'ing is disallowed. 30 » readOnly bool // When true, Flush()'ing is disallow ed.
31 }
32
33 func (s *Store) setColl(n *map[string]*Collection) {
34 » s.m.Lock()
35 » defer s.m.Unlock()
36 » s.coll = n
37 }
38
39 func (s *Store) getColl() *map[string]*Collection {
40 » s.m.Lock()
41 » defer s.m.Unlock()
42 » return s.coll
43 }
44
45 func (s *Store) casColl(o, n *map[string]*Collection) bool {
46 » s.m.Lock()
47 » defer s.m.Unlock()
48 » if s.coll == o {
49 » » s.coll = n
50 » » return true
51 » }
52 » return false
30 } 53 }
31 54
32 // The StoreFile interface is implemented by os.File. Application 55 // The StoreFile interface is implemented by os.File. Application
33 // specific implementations may add concurrency, caching, stats, etc. 56 // specific implementations may add concurrency, caching, stats, etc.
34 type StoreFile interface { 57 type StoreFile interface {
35 io.ReaderAt 58 io.ReaderAt
36 io.WriterAt 59 io.WriterAt
37 Stat() (os.FileInfo, error) 60 Stat() (os.FileInfo, error)
38 Truncate(size int64) error 61 Truncate(size int64) error
39 } 62 }
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
88 var rootsLen int64 = int64(2*len(MAGIC_BEG) + 4 + 4 + rootsEndLen) 111 var rootsLen int64 = int64(2*len(MAGIC_BEG) + 4 + 4 + rootsEndLen)
89 112
90 // Provide a nil StoreFile for in-memory-only (non-persistent) usage. 113 // Provide a nil StoreFile for in-memory-only (non-persistent) usage.
91 func NewStore(file StoreFile) (*Store, error) { 114 func NewStore(file StoreFile) (*Store, error) {
92 return NewStoreEx(file, StoreCallbacks{}) 115 return NewStoreEx(file, StoreCallbacks{})
93 } 116 }
94 117
95 func NewStoreEx(file StoreFile, 118 func NewStoreEx(file StoreFile,
96 callbacks StoreCallbacks) (*Store, error) { 119 callbacks StoreCallbacks) (*Store, error) {
97 coll := make(map[string]*Collection) 120 coll := make(map[string]*Collection)
98 » res := &Store{coll: unsafe.Pointer(&coll), callbacks: callbacks} 121 » res := &Store{coll: &coll, callbacks: callbacks}
99 if file == nil || !reflect.ValueOf(file).Elem().IsValid() { 122 if file == nil || !reflect.ValueOf(file).Elem().IsValid() {
100 return res, nil // Memory-only Store. 123 return res, nil // Memory-only Store.
101 } 124 }
102 res.file = file 125 res.file = file
103 if err := res.readRoots(); err != nil { 126 if err := res.readRoots(); err != nil {
104 return nil, err 127 return nil, err
105 } 128 }
106 return res, nil 129 return res, nil
107 } 130 }
108 131
109 // SetCollection() is used to create a named Collection, or to modify 132 // SetCollection() is used to create a named Collection, or to modify
110 // the KeyCompare function on an existing Collection. In either case, 133 // the KeyCompare function on an existing Collection. In either case,
111 // a new Collection to use is returned. A newly created Collection 134 // a new Collection to use is returned. A newly created Collection
112 // and any mutations on it won't be persisted until you do a Flush(). 135 // and any mutations on it won't be persisted until you do a Flush().
113 func (s *Store) SetCollection(name string, compare KeyCompare) *Collection { 136 func (s *Store) SetCollection(name string, compare KeyCompare) *Collection {
114 if compare == nil { 137 if compare == nil {
115 compare = bytes.Compare 138 compare = bytes.Compare
116 } 139 }
117 for { 140 for {
118 » » orig := atomic.LoadPointer(&s.coll) 141 » » orig := s.getColl()
119 coll := copyColl(*(*map[string]*Collection)(orig)) 142 coll := copyColl(*(*map[string]*Collection)(orig))
120 cnew := s.MakePrivateCollection(compare) 143 cnew := s.MakePrivateCollection(compare)
121 cnew.name = name 144 cnew.name = name
122 cold := coll[name] 145 cold := coll[name]
123 if cold != nil { 146 if cold != nil {
124 cnew.rootLock = cold.rootLock 147 cnew.rootLock = cold.rootLock
125 cnew.root = cold.rootAddRef() 148 cnew.root = cold.rootAddRef()
126 } 149 }
127 coll[name] = cnew 150 coll[name] = cnew
128 » » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&c oll)) { 151 » » if s.casColl(orig, &coll) {
129 cold.closeCollection() 152 cold.closeCollection()
130 return cnew 153 return cnew
131 } 154 }
132 cnew.closeCollection() 155 cnew.closeCollection()
133 } 156 }
134 } 157 }
135 158
136 // Returns a new, unregistered (non-named) collection. This allows 159 // Returns a new, unregistered (non-named) collection. This allows
137 // advanced users to manage collections of private collections. 160 // advanced users to manage collections of private collections.
138 func (s *Store) MakePrivateCollection(compare KeyCompare) *Collection { 161 func (s *Store) MakePrivateCollection(compare KeyCompare) *Collection {
139 if compare == nil { 162 if compare == nil {
140 compare = bytes.Compare 163 compare = bytes.Compare
141 } 164 }
142 return &Collection{ 165 return &Collection{
143 store: s, 166 store: s,
144 compare: compare, 167 compare: compare,
145 rootLock: &sync.Mutex{}, 168 rootLock: &sync.Mutex{},
146 root: &rootNodeLoc{refs: 1, root: empty_nodeLoc}, 169 root: &rootNodeLoc{refs: 1, root: empty_nodeLoc},
147 } 170 }
148 } 171 }
149 172
150 // Retrieves a named Collection. 173 // Retrieves a named Collection.
151 func (s *Store) GetCollection(name string) *Collection { 174 func (s *Store) GetCollection(name string) *Collection {
152 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) 175 » return (*s.getColl())[name]
153 » return coll[name]
154 } 176 }
155 177
156 func (s *Store) GetCollectionNames() []string { 178 func (s *Store) GetCollectionNames() []string {
157 » return collNames(*(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) ) 179 » return collNames(*s.getColl())
158 } 180 }
159 181
160 func collNames(coll map[string]*Collection) []string { 182 func collNames(coll map[string]*Collection) []string {
161 res := make([]string, 0, len(coll)) 183 res := make([]string, 0, len(coll))
162 for name := range coll { 184 for name := range coll {
163 res = append(res, name) 185 res = append(res, name)
164 } 186 }
165 sort.Strings(res) // Sorting because common callers need stability. 187 sort.Strings(res) // Sorting because common callers need stability.
166 return res 188 return res
167 } 189 }
168 190
169 // The Collection removal won't be reflected into persistence until 191 // The Collection removal won't be reflected into persistence until
170 // you do a Flush(). Invoking RemoveCollection(x) and then 192 // you do a Flush(). Invoking RemoveCollection(x) and then
171 // SetCollection(x) is a fast way to empty a Collection. 193 // SetCollection(x) is a fast way to empty a Collection.
172 func (s *Store) RemoveCollection(name string) { 194 func (s *Store) RemoveCollection(name string) {
173 for { 195 for {
174 » » orig := atomic.LoadPointer(&s.coll) 196 » » orig := s.getColl()
175 coll := copyColl(*(*map[string]*Collection)(orig)) 197 coll := copyColl(*(*map[string]*Collection)(orig))
176 cold := coll[name] 198 cold := coll[name]
177 delete(coll, name) 199 delete(coll, name)
178 » » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&c oll)) { 200 » » if s.casColl(orig, &coll) {
179 cold.closeCollection() 201 cold.closeCollection()
180 return 202 return
181 } 203 }
182 } 204 }
183 } 205 }
184 206
185 func copyColl(orig map[string]*Collection) map[string]*Collection { 207 func copyColl(orig map[string]*Collection) map[string]*Collection {
186 res := make(map[string]*Collection) 208 res := make(map[string]*Collection)
187 for name, c := range orig { 209 for name, c := range orig {
188 res[name] = c 210 res[name] = c
189 } 211 }
190 return res 212 return res
191 } 213 }
192 214
193 // Writes (appends) any dirty, unpersisted data to file. As a 215 // Writes (appends) any dirty, unpersisted data to file. As a
194 // greater-window-of-data-loss versus higher-performance tradeoff, 216 // greater-window-of-data-loss versus higher-performance tradeoff,
195 // consider having many mutations (Set()'s & Delete()'s) and then 217 // consider having many mutations (Set()'s & Delete()'s) and then
196 // have a less occasional Flush() instead of Flush()'ing after every 218 // have a less occasional Flush() instead of Flush()'ing after every
197 // mutation. Users may also wish to file.Sync() after a Flush() for 219 // mutation. Users may also wish to file.Sync() after a Flush() for
198 // extra data-loss protection. 220 // extra data-loss protection.
199 func (s *Store) Flush() error { 221 func (s *Store) Flush() error {
200 if s.readOnly { 222 if s.readOnly {
201 return errors.New("readonly, so cannot Flush()") 223 return errors.New("readonly, so cannot Flush()")
202 } 224 }
203 if s.file == nil { 225 if s.file == nil {
204 return errors.New("no file / in-memory only, so cannot Flush()") 226 return errors.New("no file / in-memory only, so cannot Flush()")
205 } 227 }
206 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) 228 » coll := *s.getColl()
207 rnls := map[string]*rootNodeLoc{} 229 rnls := map[string]*rootNodeLoc{}
208 cnames := collNames(coll) 230 cnames := collNames(coll)
209 for _, name := range cnames { 231 for _, name := range cnames {
210 c := coll[name] 232 c := coll[name]
211 rnls[name] = c.rootAddRef() 233 rnls[name] = c.rootAddRef()
212 } 234 }
213 defer func() { 235 defer func() {
214 for _, name := range cnames { 236 for _, name := range cnames {
215 coll[name].rootDecRef(rnls[name]) 237 coll[name].rootDecRef(rnls[name])
216 } 238 }
217 }() 239 }()
218 for _, name := range cnames { 240 for _, name := range cnames {
219 if err := coll[name].write(rnls[name].root); err != nil { 241 if err := coll[name].write(rnls[name].root); err != nil {
220 return err 242 return err
221 } 243 }
222 } 244 }
223 return s.writeRoots(rnls) 245 return s.writeRoots(rnls)
224 } 246 }
225 247
226 // Reverts the last Flush(), bringing the Store back to its state at 248 // Reverts the last Flush(), bringing the Store back to its state at
227 // its next-to-last Flush() or to an empty Store (with no Collections) 249 // its next-to-last Flush() or to an empty Store (with no Collections)
228 // if there were no next-to-last Flush(). This call will truncate the 250 // if there were no next-to-last Flush(). This call will truncate the
229 // Store file. 251 // Store file.
230 func (s *Store) FlushRevert() error { 252 func (s *Store) FlushRevert() error {
231 if s.file == nil { 253 if s.file == nil {
232 return errors.New("no file / in-memory only, so cannot FlushReve rt()") 254 return errors.New("no file / in-memory only, so cannot FlushReve rt()")
233 } 255 }
234 » orig := atomic.LoadPointer(&s.coll) 256 » orig := s.getColl()
235 coll := make(map[string]*Collection) 257 coll := make(map[string]*Collection)
236 » if atomic.CompareAndSwapPointer(&s.coll, orig, unsafe.Pointer(&coll)) { 258 » if s.casColl(orig, &coll) {
237 for _, cold := range *(*map[string]*Collection)(orig) { 259 for _, cold := range *(*map[string]*Collection)(orig) {
238 cold.closeCollection() 260 cold.closeCollection()
239 } 261 }
240 } 262 }
241 if atomic.LoadInt64(&s.size) > rootsLen { 263 if atomic.LoadInt64(&s.size) > rootsLen {
242 atomic.AddInt64(&s.size, -1) 264 atomic.AddInt64(&s.size, -1)
243 } 265 }
244 err := s.readRootsScan(true) 266 err := s.readRootsScan(true)
245 if err != nil { 267 if err != nil {
246 return err 268 return err
247 } 269 }
248 if s.readOnly { 270 if s.readOnly {
249 return nil 271 return nil
250 } 272 }
251 return s.file.Truncate(atomic.LoadInt64(&s.size)) 273 return s.file.Truncate(atomic.LoadInt64(&s.size))
252 } 274 }
253 275
254 // Returns a read-only snapshot, including any mutations on the 276 // Returns a read-only snapshot, including any mutations on the
255 // original Store that have not been Flush()'ed to disk yet. The 277 // original Store that have not been Flush()'ed to disk yet. The
256 // snapshot has its mutations and Flush() operations disabled because 278 // snapshot has its mutations and Flush() operations disabled because
257 // the original store "owns" writes to the StoreFile. 279 // the original store "owns" writes to the StoreFile.
258 func (s *Store) Snapshot() (snapshot *Store) { 280 func (s *Store) Snapshot() (snapshot *Store) {
259 » coll := copyColl(*(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) ) 281 » coll := copyColl(*s.getColl())
260 res := &Store{ 282 res := &Store{
261 » » coll: unsafe.Pointer(&coll), 283 » » coll: &coll,
262 file: s.file, 284 file: s.file,
263 size: atomic.LoadInt64(&s.size), 285 size: atomic.LoadInt64(&s.size),
264 readOnly: true, 286 readOnly: true,
265 callbacks: s.callbacks, 287 callbacks: s.callbacks,
266 } 288 }
267 for _, name := range collNames(coll) { 289 for _, name := range collNames(coll) {
268 collOrig := coll[name] 290 collOrig := coll[name]
269 coll[name] = &Collection{ 291 coll[name] = &Collection{
270 store: res, 292 store: res,
271 compare: collOrig.compare, 293 compare: collOrig.compare,
272 rootLock: collOrig.rootLock, 294 rootLock: collOrig.rootLock,
273 root: collOrig.rootAddRef(), 295 root: collOrig.rootAddRef(),
274 } 296 }
275 } 297 }
276 return res 298 return res
277 } 299 }
278 300
279 func (s *Store) Close() { 301 func (s *Store) Close() {
280 s.file = nil 302 s.file = nil
281 » cptr := atomic.LoadPointer(&s.coll) 303 » cptr := s.getColl()
282 » if cptr == nil || 304 » if cptr == nil || !s.casColl(cptr, nil) {
283 » » !atomic.CompareAndSwapPointer(&s.coll, cptr, unsafe.Pointer(nil) ) {
284 return 305 return
285 } 306 }
286 coll := *(*map[string]*Collection)(cptr) 307 coll := *(*map[string]*Collection)(cptr)
287 for _, name := range collNames(coll) { 308 for _, name := range collNames(coll) {
288 coll[name].closeCollection() 309 coll[name].closeCollection()
289 } 310 }
290 } 311 }
291 312
292 // Copy all active collections and their items to a different file. 313 // Copy all active collections and their items to a different file.
293 // If flushEvery > 0, then during the item copying, Flush() will be 314 // If flushEvery > 0, then during the item copying, Flush() will be
294 // invoked at every flushEvery'th item and at the end of the item 315 // invoked at every flushEvery'th item and at the end of the item
295 // copying. The copy will not include any old items or nodes so the 316 // copying. The copy will not include any old items or nodes so the
296 // copy should be more compact if flushEvery is relatively large. 317 // copy should be more compact if flushEvery is relatively large.
297 func (s *Store) CopyTo(dstFile StoreFile, flushEvery int) (res *Store, err error ) { 318 func (s *Store) CopyTo(dstFile StoreFile, flushEvery int) (res *Store, err error ) {
298 dstStore, err := NewStore(dstFile) 319 dstStore, err := NewStore(dstFile)
299 if err != nil { 320 if err != nil {
300 return nil, err 321 return nil, err
301 } 322 }
302 » coll := *(*map[string]*Collection)(atomic.LoadPointer(&s.coll)) 323 » coll := *s.getColl()
303 for _, name := range collNames(coll) { 324 for _, name := range collNames(coll) {
304 srcColl := coll[name] 325 srcColl := coll[name]
305 dstColl := dstStore.SetCollection(name, srcColl.compare) 326 dstColl := dstStore.SetCollection(name, srcColl.compare)
306 minItem, err := srcColl.MinItem(true) 327 minItem, err := srcColl.MinItem(true)
307 if err != nil { 328 if err != nil {
308 return nil, err 329 return nil, err
309 } 330 }
310 if minItem == nil { 331 if minItem == nil {
311 continue 332 continue
312 } 333 }
(...skipping 132 matching lines...) Expand 10 before | Expand all | Expand 10 after
445 for collName, t := range m { 466 for collName, t := range m {
446 t.name = collName 467 t.name = collName
447 t.store = o 468 t.store = o
448 if o.callbacks.KeyCompareForCollection ! = nil { 469 if o.callbacks.KeyCompareForCollection ! = nil {
449 t.compare = o.callbacks.KeyCompa reForCollection(collName) 470 t.compare = o.callbacks.KeyCompa reForCollection(collName)
450 } 471 }
451 if t.compare == nil { 472 if t.compare == nil {
452 t.compare = bytes.Compare 473 t.compare = bytes.Compare
453 } 474 }
454 } 475 }
455 » » » » atomic.StorePointer(&o.coll, unsafe.Pointer(&m)) 476 » » » » o.setColl(&m)
456 return nil 477 return nil
457 } // else, perhaps value was unlucky in having MAGIC_END 's. 478 } // else, perhaps value was unlucky in having MAGIC_END 's.
458 } // else, perhaps a gkvlite file was stored as a value. 479 } // else, perhaps a gkvlite file was stored as a value.
459 atomic.AddInt64(&o.size, -1) // Roots were wrong, so keep scanni ng. 480 atomic.AddInt64(&o.size, -1) // Roots were wrong, so keep scanni ng.
460 } 481 }
461 } 482 }
462 483
463 func (o *Store) ItemAlloc(c *Collection, keyLength uint32) *Item { 484 func (o *Store) ItemAlloc(c *Collection, keyLength uint32) *Item {
464 if o.callbacks.ItemAlloc != nil { 485 if o.callbacks.ItemAlloc != nil {
465 return o.callbacks.ItemAlloc(c, keyLength) 486 return o.callbacks.ItemAlloc(c, keyLength)
(...skipping 23 matching lines...) Expand all
489 return err 510 return err
490 } 511 }
491 512
492 func (o *Store) ItemValWrite(c *Collection, i *Item, w io.WriterAt, offset int64 ) error { 513 func (o *Store) ItemValWrite(c *Collection, i *Item, w io.WriterAt, offset int64 ) error {
493 if o.callbacks.ItemValWrite != nil { 514 if o.callbacks.ItemValWrite != nil {
494 return o.callbacks.ItemValWrite(c, i, w, offset) 515 return o.callbacks.ItemValWrite(c, i, w, offset)
495 } 516 }
496 _, err := w.WriteAt(i.Val, offset) 517 _, err := w.WriteAt(i.Val, offset)
497 return err 518 return err
498 } 519 }
OLDNEW
« no previous file with comments | « node.go ('k') | store_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698