Index: go/src/infra/gae/libs/memlock/memlock.go |
diff --git a/go/src/infra/gae/libs/memlock/memlock.go b/go/src/infra/gae/libs/memlock/memlock.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..0b5284d13b360ad9eb7a6aece773bec2598527da |
--- /dev/null |
+++ b/go/src/infra/gae/libs/memlock/memlock.go |
@@ -0,0 +1,160 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package memlock |
+ |
+import ( |
+ "bytes" |
+ "crypto/rand" |
+ "sync/atomic" |
+ "time" |
+ |
+ "appengine" |
+ "appengine/memcache" |
+) |
+ |
+type checkOp string |
+ |
+const ( |
+ release checkOp = "release" |
+ refresh = "refresh" |
+) |
+ |
+// MemcacheLockTime is the expiration time of the memcache entry. If the lock |
+// is correctly released, then it will be released before this time. |
+const MemcacheLockTime = 16 * time.Second |
+ |
+// MemcacheLock allows multiple appengine handlers to coordinate best-effort |
+// mutual execution via memcache. "best-effort" here means "best-effort"... |
+// memcache is not reliable. However, colliding on memcache is a lot cheaper |
+// than, for example, colliding with datastore transactions. |
+// |
+// Construct a new MemcacheLock with the New() method. |
+type MemcacheLock struct { |
+ ctx appengine.Context |
+ key string |
+ clientID []byte |
+} |
+ |
+// New creates a MemcacheLock. `key` is the memcache key to use. Clients locking |
+// the same data must use the same key. clientID is the unique identifier for |
+// this client (lock-holder). If it's empty then New() will generate a random |
+// one. |
+// |
+// Using a deterministid clientID (like the taskqueue task name, for example) has |
+// the benefit that on an error the re-tried taskqueue handler may be able to |
+// re-obtain the lock, assuming it wasn't released properly. |
+func New(c appengine.Context, key, clientID string) (*MemcacheLock, error) { |
+ clientIDbytes := []byte(clientID) |
+ if len(clientIDbytes) == 0 { |
+ clientIDbytes = make([]byte, 32) |
+ _, err := rand.Read(clientIDbytes) |
+ if err != nil { |
+ return nil, err |
+ } |
+ c.Debugf("memlock: generated clientId: %v", clientIDbytes) |
+ } |
+ return &MemcacheLock{ |
+ ctx: c, |
+ key: "memlock:" + key, |
+ clientID: clientIDbytes, |
+ }, nil |
+} |
+ |
+// TryWithLock attempts to obtains the lock once, and then invokes f if |
+// sucessful. The `check` function can be used within f to see if the lock is |
+// still held. This function returns true iff the lock was acquired and f was |
+// invoked. |
+func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool { |
+ err := memcache.Add(m.ctx, &memcache.Item{ |
+ Key: m.key, Expiration: MemcacheLockTime, Value: m.clientID}) |
+ if err != nil { |
+ if err != memcache.ErrNotStored { |
+ m.warningf("error adding: %s", err) |
+ } |
+ if !m.checkAnd(refresh) { |
Vadim Sh.
2015/03/07 02:55:50
actually, it makes the lock reentrant (same gorout
iannucci
2015/03/29 21:22:36
oh, yeah, good point... I'll make it prevent this.
|
+ return false |
+ } |
+ } |
+ |
+ stopChan := make(chan struct{}, 1) |
Vadim Sh.
2015/03/07 02:55:50
these "-chan" make the impression I'm watching som
iannucci
2015/03/29 21:22:36
Chan-chan!
|
+ stoppedChan := make(chan struct{}, 1) |
+ var held uint32 = 1 |
+ |
+ check := func() bool { |
+ return atomic.LoadUint32(&held) == 1 |
+ } |
+ defer func() { |
+ stopChan <- struct{}{} |
Vadim Sh.
2015/03/07 02:55:50
I think "idiomatic" (I start to not like this word
iannucci
2015/03/29 21:22:36
Hm, ok I guess that'll work.
|
+ <-stoppedChan |
+ }() |
+ |
+ go func() { |
+ defer func() { stoppedChan <- struct{}{} }() |
+ checkLoop: |
+ for { |
+ select { |
+ case <-stopChan: |
+ break checkLoop |
Vadim Sh.
2015/03/07 02:55:50
do you really need a label here?
iannucci
2015/03/29 21:22:35
yeah. a plain break stops the select statement (wh
|
+ case <-time.After(time.Second): |
+ } |
+ if !m.checkAnd(refresh) { |
+ atomic.StoreUint32(&held, 0) |
+ m.warningf("lost lock: %s", err) |
+ break |
+ } |
+ } |
+ |
+ m.checkAnd(release) |
+ atomic.StoreUint32(&held, 0) |
+ }() |
+ |
+ f(check) |
+ return true |
+} |
+ |
+func (m *MemcacheLock) infof(fmt string, args ...interface{}) { |
+ args = append([]interface{}{m.key, string(m.clientID)}, args...) |
+ m.ctx.Infof("memlock(%s:%q): "+fmt, args) |
+} |
+ |
+func (m *MemcacheLock) warningf(fmt string, args ...interface{}) { |
+ args = append([]interface{}{m.key, string(m.clientID)}, args...) |
+ m.ctx.Warningf("memlock(%s:%q): "+fmt, args) |
+} |
+ |
+func (m *MemcacheLock) checkAnd(op checkOp) bool { |
+ itm, err := memcache.Get(m.ctx, m.key) |
+ if err != nil { |
+ m.warningf("error getting: %s", err) |
+ return false |
+ } |
+ |
+ if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) { |
+ m.infof("lock owned by %q", string(itm.Value)) |
+ return false |
+ } |
+ |
+ if op == refresh { |
+ itm.Value = m.clientID |
+ itm.Expiration = MemcacheLockTime |
+ } else { |
+ if len(itm.Value) == 0 { |
+ // it's already unlocked, no need to CAS |
+ m.infof("lock already released") |
+ return true |
+ } |
+ itm.Value = []byte{} |
+ itm.Expiration = time.Second |
+ } |
+ |
+ err = memcache.CompareAndSwap(m.ctx, itm) |
+ if err != nil { |
+ m.warningf("failed to %s lock: %s", op, err) |
+ return false |
+ } |
+ |
+ m.infof("%sed lock", op) |
+ return true |
+} |