Index: appengine/memlock/memlock.go |
diff --git a/appengine/memlock/memlock.go b/appengine/memlock/memlock.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b0f14f5c663f5b579f1f53989469ccdd6522799d |
--- /dev/null |
+++ b/appengine/memlock/memlock.go |
@@ -0,0 +1,175 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+// Package memlock allows multiple appengine handlers to coordinate best-effort |
+// mutual execution via memcache. "best-effort" here means "best-effort"... |
+// memcache is not reliable. However, colliding on memcache is a lot cheaper |
+// than, for example, colliding with datastore transactions. |
+package memlock |
+ |
+import ( |
+ "bytes" |
+ "errors" |
+ "time" |
+ |
+ "github.com/luci/gae/service/memcache" |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/logging" |
+ "golang.org/x/net/context" |
+) |
+ |
+// ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
+// prior to invoking the user-supplied function. |
+var ErrFailedToLock = errors.New("memlock: failed to obtain lock") |
+ |
+// ErrEmptyClientID is returned from TryWithLock when you specify an empty |
+// clientID. |
+var ErrEmptyClientID = errors.New("memlock: empty clientID") |
+ |
+// memlockKeyPrefix is the memcache Key prefix for all user-supplied keys. |
+const memlockKeyPrefix = "memlock:" |
+ |
+type checkOp string |
+ |
+// var so we can override it in the tests |
+var delay = time.Second |
+ |
+type testStopCBKeyType int |
+ |
+var testStopCBKey testStopCBKeyType |
+ |
+const ( |
+ release checkOp = "release" |
+ refresh = "refresh" |
+) |
+ |
+// memcacheLockTime is the expiration time of the memcache entry. If the lock |
+// is correctly released, then it will be released before this time. It's a |
+// var so we can override it in the tests. |
+var memcacheLockTime = 16 * time.Second |
+ |
+// TryWithLock attempts to obtains the lock once, and then invokes f if |
+// sucessful. The context provided to f will be canceled (e.g. ctx.Done() will |
+// be closed) if memlock detects that we've lost the lock. |
+// |
+// TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, |
+// otherwise returns the error that f returns. |
+// |
+// `key` is the memcache key to use (i.e. the name of the lock). Clients locking |
+// the same data must use the same key. clientID is the unique identifier for |
+// this client (lock-holder). If it's empty then TryWithLock() will return |
+// ErrEmptyClientID. |
+// |
+// Note that the lock provided by TryWithLock is a best-effort lock... some |
+// other form of locking or synchronization should be used inside of f (such as |
+// Datastore transactions) to ensure that f is, in fact, operating exclusively. |
+// The purpose of TryWithLock is to have a cheap filter to prevent unnecessary |
+// contention on heavier synchronization primitives like transactions. |
+func TryWithLock(ctx context.Context, key, clientID string, f func(context.Context) error) error { |
+ if len(clientID) == 0 { |
+ return ErrEmptyClientID |
+ } |
+ |
+ ctx = logging.SetField(ctx, "key", key) |
+ ctx = logging.SetField(ctx, "clientID", clientID) |
+ log := logging.Get(ctx) |
+ mc := memcache.Get(ctx) |
+ |
+ key = memlockKeyPrefix + key |
+ cid := []byte(clientID) |
+ |
+ // checkAnd gets the current value from memcache, and then attempts to do the |
+ // checkOp (which can either be `refresh` or `release`). These pieces of |
+ // functionality are necessarially intertwined, because CAS only works with |
+ // the exact-same *Item which was returned from a Get. |
+ // |
+ // refresh will attempt to CAS the item with the same content to reset it's |
+ // timeout. |
+ // |
+ // release will attempt to CAS the item to remove it's contents (clientID). |
+ // another lock observing an empty clientID will know that the lock is |
+ // obtainable. |
+ checkAnd := func(op checkOp) bool { |
+ itm, err := mc.Get(key) |
+ if err != nil { |
+ log.Warningf("error getting: %s", err) |
+ return false |
+ } |
+ |
+ if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) { |
+ log.Infof("lock owned by %q", string(itm.Value())) |
+ return false |
+ } |
+ |
+ if op == refresh { |
+ itm.SetValue(cid).SetExpiration(memcacheLockTime) |
+ } else { |
+ if len(itm.Value()) == 0 { |
+ // it's already unlocked, no need to CAS |
+ log.Infof("lock already released") |
+ return true |
+ } |
+ itm.SetValue([]byte{}).SetExpiration(delay) |
+ } |
+ |
+ if err := mc.CompareAndSwap(itm); err != nil { |
+ log.Warningf("failed to %s lock: %q", op, err) |
+ return false |
+ } |
+ |
+ return true |
+ } |
+ |
+ // Now the actual logic begins. First we 'Add' the item, which will set it if |
+ // it's not present in the memcache, otherwise leaves it alone. |
+ |
+ err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTime)) |
+ if err != nil { |
+ if err != memcache.ErrNotStored { |
+ log.Warningf("error adding: %s", err) |
+ } |
+ if !checkAnd(refresh) { |
+ return ErrFailedToLock |
+ } |
+ } |
+ |
+ // At this point we nominally have the lock (at least for memcacheLockTime). |
+ finished := make(chan struct{}) |
+ subCtx, cancelFunc := context.WithCancel(ctx) |
+ defer func() { |
+ cancelFunc() |
+ <-finished |
+ }() |
+ |
+ testStopCB, _ := ctx.Value(testStopCBKey).(func()) |
+ |
+ // This goroutine checks to see if we still posess the lock, and refreshes it |
+ // if we do. |
+ go func() { |
+ defer func() { |
+ cancelFunc() |
+ close(finished) |
+ }() |
+ |
+ checkLoop: |
+ for { |
+ select { |
+ case <-subCtx.Done(): |
+ break checkLoop |
+ case <-clock.Get(ctx).After(delay): |
+ } |
+ if !checkAnd(refresh) { |
+ log.Warningf("lost lock: %s", err) |
+ return |
+ } |
+ } |
+ |
+ if testStopCB != nil { |
+ testStopCB() |
+ } |
+ checkAnd(release) |
+ }() |
+ |
+ return f(subCtx) |
+} |