Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(148)

Unified Diff: go/src/infra/gae/libs/memlock/memlock.go

Issue 986553002: A simple memcache lock for appengine. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@meta
Patch Set: and fix a boog Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: go/src/infra/gae/libs/memlock/memlock.go
diff --git a/go/src/infra/gae/libs/memlock/memlock.go b/go/src/infra/gae/libs/memlock/memlock.go
new file mode 100644
index 0000000000000000000000000000000000000000..0b5284d13b360ad9eb7a6aece773bec2598527da
--- /dev/null
+++ b/go/src/infra/gae/libs/memlock/memlock.go
@@ -0,0 +1,160 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package memlock
+
+import (
+ "bytes"
+ "crypto/rand"
+ "sync/atomic"
+ "time"
+
+ "appengine"
+ "appengine/memcache"
+)
+
+type checkOp string
+
+const (
+ release checkOp = "release"
+ refresh = "refresh"
+)
+
+// MemcacheLockTime is the expiration time of the memcache entry. If the lock
+// is correctly released, then it will be released before this time.
+const MemcacheLockTime = 16 * time.Second
+
+// MemcacheLock allows multiple appengine handlers to coordinate best-effort
+// mutual execution via memcache. "best-effort" here means "best-effort"...
+// memcache is not reliable. However, colliding on memcache is a lot cheaper
+// than, for example, colliding with datastore transactions.
+//
+// Construct a new MemcacheLock with the New() method.
+type MemcacheLock struct {
+ ctx appengine.Context
+ key string
+ clientID []byte
+}
+
+// New creates a MemcacheLock. `key` is the memcache key to use. Clients locking
+// the same data must use the same key. clientID is the unique identifier for
+// this client (lock-holder). If it's empty then New() will generate a random
+// one.
+//
+// Using a deterministid clientID (like the taskqueue task name, for example) has
+// the benefit that on an error the re-tried taskqueue handler may be able to
+// re-obtain the lock, assuming it wasn't released properly.
+func New(c appengine.Context, key, clientID string) (*MemcacheLock, error) {
+ clientIDbytes := []byte(clientID)
+ if len(clientIDbytes) == 0 {
+ clientIDbytes = make([]byte, 32)
+ _, err := rand.Read(clientIDbytes)
+ if err != nil {
+ return nil, err
+ }
+ c.Debugf("memlock: generated clientId: %v", clientIDbytes)
+ }
+ return &MemcacheLock{
+ ctx: c,
+ key: "memlock:" + key,
+ clientID: clientIDbytes,
+ }, nil
+}
+
+// TryWithLock attempts to obtains the lock once, and then invokes f if
+// sucessful. The `check` function can be used within f to see if the lock is
+// still held. This function returns true iff the lock was acquired and f was
+// invoked.
+func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool {
+ err := memcache.Add(m.ctx, &memcache.Item{
+ Key: m.key, Expiration: MemcacheLockTime, Value: m.clientID})
+ if err != nil {
+ if err != memcache.ErrNotStored {
+ m.warningf("error adding: %s", err)
+ }
+ if !m.checkAnd(refresh) {
Vadim Sh. 2015/03/07 02:55:50 actually, it makes the lock reentrant (same gorout
iannucci 2015/03/29 21:22:36 oh, yeah, good point... I'll make it prevent this.
+ return false
+ }
+ }
+
+ stopChan := make(chan struct{}, 1)
Vadim Sh. 2015/03/07 02:55:50 these "-chan" make the impression I'm watching som
iannucci 2015/03/29 21:22:36 Chan-chan!
+ stoppedChan := make(chan struct{}, 1)
+ var held uint32 = 1
+
+ check := func() bool {
+ return atomic.LoadUint32(&held) == 1
+ }
+ defer func() {
+ stopChan <- struct{}{}
Vadim Sh. 2015/03/07 02:55:50 I think "idiomatic" (I start to not like this word
iannucci 2015/03/29 21:22:36 Hm, ok I guess that'll work.
+ <-stoppedChan
+ }()
+
+ go func() {
+ defer func() { stoppedChan <- struct{}{} }()
+ checkLoop:
+ for {
+ select {
+ case <-stopChan:
+ break checkLoop
Vadim Sh. 2015/03/07 02:55:50 do you really need a label here?
iannucci 2015/03/29 21:22:35 yeah. a plain break stops the select statement (wh
+ case <-time.After(time.Second):
+ }
+ if !m.checkAnd(refresh) {
+ atomic.StoreUint32(&held, 0)
+ m.warningf("lost lock: %s", err)
+ break
+ }
+ }
+
+ m.checkAnd(release)
+ atomic.StoreUint32(&held, 0)
+ }()
+
+ f(check)
+ return true
+}
+
+func (m *MemcacheLock) infof(fmt string, args ...interface{}) {
+ args = append([]interface{}{m.key, string(m.clientID)}, args...)
+ m.ctx.Infof("memlock(%s:%q): "+fmt, args)
+}
+
+func (m *MemcacheLock) warningf(fmt string, args ...interface{}) {
+ args = append([]interface{}{m.key, string(m.clientID)}, args...)
+ m.ctx.Warningf("memlock(%s:%q): "+fmt, args)
+}
+
+func (m *MemcacheLock) checkAnd(op checkOp) bool {
+ itm, err := memcache.Get(m.ctx, m.key)
+ if err != nil {
+ m.warningf("error getting: %s", err)
+ return false
+ }
+
+ if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) {
+ m.infof("lock owned by %q", string(itm.Value))
+ return false
+ }
+
+ if op == refresh {
+ itm.Value = m.clientID
+ itm.Expiration = MemcacheLockTime
+ } else {
+ if len(itm.Value) == 0 {
+ // it's already unlocked, no need to CAS
+ m.infof("lock already released")
+ return true
+ }
+ itm.Value = []byte{}
+ itm.Expiration = time.Second
+ }
+
+ err = memcache.CompareAndSwap(m.ctx, itm)
+ if err != nil {
+ m.warningf("failed to %s lock: %s", op, err)
+ return false
+ }
+
+ m.infof("%sed lock", op)
+ return true
+}
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698