Chromium Code Reviews| Index: go/src/infra/gae/libs/memlock/memlock.go |
| diff --git a/go/src/infra/gae/libs/memlock/memlock.go b/go/src/infra/gae/libs/memlock/memlock.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..0b5284d13b360ad9eb7a6aece773bec2598527da |
| --- /dev/null |
| +++ b/go/src/infra/gae/libs/memlock/memlock.go |
| @@ -0,0 +1,160 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package memlock |
| + |
| +import ( |
| + "bytes" |
| + "crypto/rand" |
| + "sync/atomic" |
| + "time" |
| + |
| + "appengine" |
| + "appengine/memcache" |
| +) |
| + |
| +type checkOp string |
| + |
| +const ( |
| + release checkOp = "release" |
| + refresh = "refresh" |
| +) |
| + |
| +// MemcacheLockTime is the expiration time of the memcache entry. If the lock |
| +// is correctly released, then it will be released before this time. |
| +const MemcacheLockTime = 16 * time.Second |
| + |
| +// MemcacheLock allows multiple appengine handlers to coordinate best-effort |
| +// mutual execution via memcache. "best-effort" here means "best-effort"... |
| +// memcache is not reliable. However, colliding on memcache is a lot cheaper |
| +// than, for example, colliding with datastore transactions. |
| +// |
| +// Construct a new MemcacheLock with the New() method. |
| +type MemcacheLock struct { |
| + ctx appengine.Context |
| + key string |
| + clientID []byte |
| +} |
| + |
| +// New creates a MemcacheLock. `key` is the memcache key to use. Clients locking |
| +// the same data must use the same key. clientID is the unique identifier for |
| +// this client (lock-holder). If it's empty then New() will generate a random |
| +// one. |
| +// |
| +// Using a deterministid clientID (like the taskqueue task name, for example) has |
| +// the benefit that on an error the re-tried taskqueue handler may be able to |
| +// re-obtain the lock, assuming it wasn't released properly. |
| +func New(c appengine.Context, key, clientID string) (*MemcacheLock, error) { |
| + clientIDbytes := []byte(clientID) |
| + if len(clientIDbytes) == 0 { |
| + clientIDbytes = make([]byte, 32) |
| + _, err := rand.Read(clientIDbytes) |
| + if err != nil { |
| + return nil, err |
| + } |
| + c.Debugf("memlock: generated clientId: %v", clientIDbytes) |
| + } |
| + return &MemcacheLock{ |
| + ctx: c, |
| + key: "memlock:" + key, |
| + clientID: clientIDbytes, |
| + }, nil |
| +} |
| + |
| +// TryWithLock attempts to obtains the lock once, and then invokes f if |
| +// sucessful. The `check` function can be used within f to see if the lock is |
| +// still held. This function returns true iff the lock was acquired and f was |
| +// invoked. |
| +func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool { |
| + err := memcache.Add(m.ctx, &memcache.Item{ |
| + Key: m.key, Expiration: MemcacheLockTime, Value: m.clientID}) |
| + if err != nil { |
| + if err != memcache.ErrNotStored { |
| + m.warningf("error adding: %s", err) |
| + } |
| + if !m.checkAnd(refresh) { |
|
Vadim Sh.
2015/03/07 02:55:50
actually, it makes the lock reentrant (same gorout
iannucci
2015/03/29 21:22:36
oh, yeah, good point... I'll make it prevent this.
|
| + return false |
| + } |
| + } |
| + |
| + stopChan := make(chan struct{}, 1) |
|
Vadim Sh.
2015/03/07 02:55:50
these "-chan" make the impression I'm watching som
iannucci
2015/03/29 21:22:36
Chan-chan!
|
| + stoppedChan := make(chan struct{}, 1) |
| + var held uint32 = 1 |
| + |
| + check := func() bool { |
| + return atomic.LoadUint32(&held) == 1 |
| + } |
| + defer func() { |
| + stopChan <- struct{}{} |
|
Vadim Sh.
2015/03/07 02:55:50
I think "idiomatic" (I start to not like this word
iannucci
2015/03/29 21:22:36
Hm, ok I guess that'll work.
|
| + <-stoppedChan |
| + }() |
| + |
| + go func() { |
| + defer func() { stoppedChan <- struct{}{} }() |
| + checkLoop: |
| + for { |
| + select { |
| + case <-stopChan: |
| + break checkLoop |
|
Vadim Sh.
2015/03/07 02:55:50
do you really need a label here?
iannucci
2015/03/29 21:22:35
yeah. a plain break stops the select statement (wh
|
| + case <-time.After(time.Second): |
| + } |
| + if !m.checkAnd(refresh) { |
| + atomic.StoreUint32(&held, 0) |
| + m.warningf("lost lock: %s", err) |
| + break |
| + } |
| + } |
| + |
| + m.checkAnd(release) |
| + atomic.StoreUint32(&held, 0) |
| + }() |
| + |
| + f(check) |
| + return true |
| +} |
| + |
| +func (m *MemcacheLock) infof(fmt string, args ...interface{}) { |
| + args = append([]interface{}{m.key, string(m.clientID)}, args...) |
| + m.ctx.Infof("memlock(%s:%q): "+fmt, args) |
| +} |
| + |
| +func (m *MemcacheLock) warningf(fmt string, args ...interface{}) { |
| + args = append([]interface{}{m.key, string(m.clientID)}, args...) |
| + m.ctx.Warningf("memlock(%s:%q): "+fmt, args) |
| +} |
| + |
| +func (m *MemcacheLock) checkAnd(op checkOp) bool { |
| + itm, err := memcache.Get(m.ctx, m.key) |
| + if err != nil { |
| + m.warningf("error getting: %s", err) |
| + return false |
| + } |
| + |
| + if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) { |
| + m.infof("lock owned by %q", string(itm.Value)) |
| + return false |
| + } |
| + |
| + if op == refresh { |
| + itm.Value = m.clientID |
| + itm.Expiration = MemcacheLockTime |
| + } else { |
| + if len(itm.Value) == 0 { |
| + // it's already unlocked, no need to CAS |
| + m.infof("lock already released") |
| + return true |
| + } |
| + itm.Value = []byte{} |
| + itm.Expiration = time.Second |
| + } |
| + |
| + err = memcache.CompareAndSwap(m.ctx, itm) |
| + if err != nil { |
| + m.warningf("failed to %s lock: %s", op, err) |
| + return false |
| + } |
| + |
| + m.infof("%sed lock", op) |
| + return true |
| +} |