Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package memlock | |
| 6 | |
| 7 import ( | |
| 8 "bytes" | |
| 9 "crypto/rand" | |
| 10 "sync/atomic" | |
| 11 "time" | |
| 12 | |
| 13 "appengine" | |
| 14 "appengine/memcache" | |
| 15 ) | |
| 16 | |
| 17 type checkOp string | |
| 18 | |
| 19 const ( | |
| 20 release checkOp = "release" | |
| 21 refresh = "refresh" | |
| 22 ) | |
| 23 | |
| 24 // MemcacheLockTime is the expiration time of the memcache entry. If the lock | |
| 25 // is correctly released, then it will be released before this time. | |
| 26 const MemcacheLockTime = 16 * time.Second | |
| 27 | |
| 28 // MemcacheLock allows multiple appengine handlers to coordinate best-effort | |
| 29 // mutual execution via memcache. "best-effort" here means "best-effort"... | |
| 30 // memcache is not reliable. However, colliding on memcache is a lot cheaper | |
| 31 // than, for example, colliding with datastore transactions. | |
| 32 // | |
| 33 // Construct a new MemcacheLock with the New() method. | |
| 34 type MemcacheLock struct { | |
| 35 ctx appengine.Context | |
| 36 key string | |
| 37 clientID []byte | |
| 38 } | |
| 39 | |
| 40 // New creates a MemcacheLock. `key` is the memcache key to use. Clients locking | |
| 41 // the same data must use the same key. clientID is the unique identifier for | |
| 42 // this client (lock-holder). If it's empty then New() will generate a random | |
| 43 // one. | |
| 44 // | |
| 45 // Using a deterministid clientID (like the taskqueue task name, for example) ha s | |
| 46 // the benefit that on an error the re-tried taskqueue handler may be able to | |
| 47 // re-obtain the lock, assuming it wasn't released properly. | |
| 48 func New(c appengine.Context, key, clientID string) (*MemcacheLock, error) { | |
| 49 clientIDbytes := []byte(clientID) | |
| 50 if len(clientIDbytes) == 0 { | |
| 51 clientIDbytes = make([]byte, 32) | |
| 52 _, err := rand.Read(clientIDbytes) | |
| 53 if err != nil { | |
| 54 return nil, err | |
| 55 } | |
| 56 c.Debugf("memlock: generated clientId: %v", clientIDbytes) | |
| 57 } | |
| 58 return &MemcacheLock{ | |
| 59 ctx: c, | |
| 60 key: "memlock:" + key, | |
| 61 clientID: clientIDbytes, | |
| 62 }, nil | |
| 63 } | |
| 64 | |
| 65 // TryWithLock attempts to obtains the lock once, and then invokes f if | |
| 66 // sucessful. The `check` function can be used within f to see if the lock is | |
| 67 // still held. This function returns true iff the lock was acquired and f was | |
| 68 // invoked. | |
| 69 func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool { | |
| 70 err := memcache.Add(m.ctx, &memcache.Item{ | |
| 71 Key: m.key, Expiration: MemcacheLockTime, Value: m.clientID}) | |
| 72 if err != nil { | |
| 73 if err != memcache.ErrNotStored { | |
| 74 m.warningf("error adding: %s", err) | |
| 75 } | |
| 76 if !m.checkAnd(refresh) { | |
|
Vadim Sh.
2015/03/07 02:55:50
actually, it makes the lock reentrant (same gorout
iannucci
2015/03/29 21:22:36
oh, yeah, good point... I'll make it prevent this.
| |
| 77 return false | |
| 78 } | |
| 79 } | |
| 80 | |
| 81 stopChan := make(chan struct{}, 1) | |
|
Vadim Sh.
2015/03/07 02:55:50
these "-chan" make the impression I'm watching som
iannucci
2015/03/29 21:22:36
Chan-chan!
| |
| 82 stoppedChan := make(chan struct{}, 1) | |
| 83 var held uint32 = 1 | |
| 84 | |
| 85 check := func() bool { | |
| 86 return atomic.LoadUint32(&held) == 1 | |
| 87 } | |
| 88 defer func() { | |
| 89 stopChan <- struct{}{} | |
|
Vadim Sh.
2015/03/07 02:55:50
I think "idiomatic" (I start to not like this word
iannucci
2015/03/29 21:22:36
Hm, ok I guess that'll work.
| |
| 90 <-stoppedChan | |
| 91 }() | |
| 92 | |
| 93 go func() { | |
| 94 defer func() { stoppedChan <- struct{}{} }() | |
| 95 checkLoop: | |
| 96 for { | |
| 97 select { | |
| 98 case <-stopChan: | |
| 99 break checkLoop | |
|
Vadim Sh.
2015/03/07 02:55:50
do you really need a label here?
iannucci
2015/03/29 21:22:35
yeah. a plain break stops the select statement (wh
| |
| 100 case <-time.After(time.Second): | |
| 101 } | |
| 102 if !m.checkAnd(refresh) { | |
| 103 atomic.StoreUint32(&held, 0) | |
| 104 m.warningf("lost lock: %s", err) | |
| 105 break | |
| 106 } | |
| 107 } | |
| 108 | |
| 109 m.checkAnd(release) | |
| 110 atomic.StoreUint32(&held, 0) | |
| 111 }() | |
| 112 | |
| 113 f(check) | |
| 114 return true | |
| 115 } | |
| 116 | |
| 117 func (m *MemcacheLock) infof(fmt string, args ...interface{}) { | |
| 118 args = append([]interface{}{m.key, string(m.clientID)}, args...) | |
| 119 m.ctx.Infof("memlock(%s:%q): "+fmt, args) | |
| 120 } | |
| 121 | |
| 122 func (m *MemcacheLock) warningf(fmt string, args ...interface{}) { | |
| 123 args = append([]interface{}{m.key, string(m.clientID)}, args...) | |
| 124 m.ctx.Warningf("memlock(%s:%q): "+fmt, args) | |
| 125 } | |
| 126 | |
| 127 func (m *MemcacheLock) checkAnd(op checkOp) bool { | |
| 128 itm, err := memcache.Get(m.ctx, m.key) | |
| 129 if err != nil { | |
| 130 m.warningf("error getting: %s", err) | |
| 131 return false | |
| 132 } | |
| 133 | |
| 134 if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) { | |
| 135 m.infof("lock owned by %q", string(itm.Value)) | |
| 136 return false | |
| 137 } | |
| 138 | |
| 139 if op == refresh { | |
| 140 itm.Value = m.clientID | |
| 141 itm.Expiration = MemcacheLockTime | |
| 142 } else { | |
| 143 if len(itm.Value) == 0 { | |
| 144 // it's already unlocked, no need to CAS | |
| 145 m.infof("lock already released") | |
| 146 return true | |
| 147 } | |
| 148 itm.Value = []byte{} | |
| 149 itm.Expiration = time.Second | |
| 150 } | |
| 151 | |
| 152 err = memcache.CompareAndSwap(m.ctx, itm) | |
| 153 if err != nil { | |
| 154 m.warningf("failed to %s lock: %s", op, err) | |
| 155 return false | |
| 156 } | |
| 157 | |
| 158 m.infof("%sed lock", op) | |
| 159 return true | |
| 160 } | |
| OLD | NEW |