Chromium Code Reviews| Index: go/src/infra/gae/libs/memlock/memlock.go |
| diff --git a/go/src/infra/gae/libs/memlock/memlock.go b/go/src/infra/gae/libs/memlock/memlock.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..1e3e8911595bfa6e40a8c50aecd335b24af3474a |
| --- /dev/null |
| +++ b/go/src/infra/gae/libs/memlock/memlock.go |
| @@ -0,0 +1,175 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +// +build appengine |
| + |
| +package memlock |
| + |
| +import ( |
| + "bytes" |
| + "crypto/rand" |
| + "sync/atomic" |
| + "time" |
| + |
| + "appengine/memcache" |
| + |
| + "infra/gae/libs/context" |
| + "infra/libs/logging" |
| +) |
| + |
| +type checkOp string |
| + |
| +var delay = time.Second |
|
M-A Ruel
2015/03/30 12:52:31
You can use a const.
|
| + |
| +const ( |
| + release checkOp = "release" |
|
M-A Ruel
2015/03/30 12:52:31
releaseOp
refreshOp
|
| + refresh = "refresh" |
| +) |
| + |
| +// memcacheLockTime is the expiration time of the memcache entry. If the lock |
| +// is correctly released, then it will be released before this time. |
| +var memcacheLockTime = 16 * time.Second |
|
M-A Ruel
2015/03/30 12:52:31
You can use a const.
16s is hell of a long time
|
| + |
| +type Context interface { |
|
M-A Ruel
2015/03/30 12:52:31
If it's exported, document it.
|
| + logging.Logger |
| + context.MCSingleReadWriter |
| +} |
| + |
| +// MemcacheLock allows multiple appengine handlers to coordinate best-effort |
| +// mutual execution via memcache. "best-effort" here means "best-effort"... |
| +// memcache is not reliable. However, colliding on memcache is a lot cheaper |
| +// than, for example, colliding with datastore transactions. |
| +// |
| +// Construct a new MemcacheLock with the New() method. |
| +type MemcacheLock struct { |
| + ctx Context |
| + key string |
| + clientID []byte |
| + held bool |
| +} |
| + |
| +// New creates a MemcacheLock. `key` is the memcache key to use. Clients locking |
| +// the same data must use the same key. clientID is the unique identifier for |
| +// this client (lock-holder). If it's empty then New() will generate a random |
|
M-A Ruel
2015/03/30 12:52:31
Why generate a random one? What's the use case?
|
| +// one. |
| +// |
| +// Using a deterministid clientID (like the taskqueue task name, for example) has |
| +// the benefit that on an error the re-tried taskqueue handler may be able to |
| +// re-obtain the lock, assuming it wasn't released properly. |
| +func New(c Context, key, clientID string) (*MemcacheLock, error) { |
| + clientIDbytes := []byte(clientID) |
| + if len(clientIDbytes) == 0 { |
| + clientIDbytes = make([]byte, 32) |
| + _, err := rand.Read(clientIDbytes) |
| + if err != nil { |
| + return nil, err |
|
M-A Ruel
2015/03/30 12:52:30
In this case specifically (using crypto/rand) I th
|
| + } |
| + c.Debugf("memlock: generated clientId: %v", clientIDbytes) |
| + } |
| + return &MemcacheLock{ |
| + ctx: c, |
| + key: "memlock:" + key, |
| + clientID: clientIDbytes, |
| + }, nil |
| +} |
| + |
| +// TryWithLock attempts to obtains the lock once, and then invokes f if |
| +// sucessful. The `check` function can be used within f to see if the lock is |
| +// still held. This function returns true iff the lock was acquired and f was |
| +// invoked. |
| +func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool { |
|
M-A Ruel
2015/03/30 12:52:31
The main issue with that is that you force the cal
|
| + if m.held { |
| + m.logf(m.ctx.Errorf, "re-entering memlock!") |
| + panic("re-entering memlock!") |
| + } |
| + m.held = true |
|
M-A Ruel
2015/03/30 12:52:30
To double check; MemcacheLock is not thread safe.
|
| + defer func() { m.held = false }() |
| + |
| + err := m.ctx.Add(&memcache.Item{ |
| + Key: m.key, Expiration: memcacheLockTime, Value: m.clientID}) |
| + if err != nil { |
| + if err != memcache.ErrNotStored { |
| + m.logf(m.ctx.Warningf, "error adding: %s", err) |
| + } |
| + if !m.checkAnd(refresh) { |
| + return false |
| + } |
| + } |
| + |
| + stopChan := make(chan struct{}) |
| + stoppedChan := make(chan struct{}) |
| + var held uint32 = 1 |
| + |
| + check := func() bool { |
| + return atomic.LoadUint32(&held) == 1 |
| + } |
| + defer func() { |
| + close(stopChan) |
| + <-stoppedChan |
| + }() |
| + |
| + go func() { |
| + defer close(stoppedChan) |
| + |
| + checkLoop: |
| + for { |
| + select { |
| + case <-stopChan: |
| + break checkLoop |
|
M-A Ruel
2015/03/30 12:52:31
use return instead, this removes need for goto
|
| + case <-time.After(delay): |
| + } |
| + if !m.checkAnd(refresh) { |
| + atomic.StoreUint32(&held, 0) |
| + m.logf(m.ctx.Warningf, "lost lock: %s", err) |
| + break |
|
M-A Ruel
2015/03/30 12:52:31
remove
|
| + } |
| + } |
| + |
| + m.checkAnd(release) |
|
M-A Ruel
2015/03/30 12:52:30
why not in defer?
|
| + atomic.StoreUint32(&held, 0) |
| + }() |
| + |
| + f(check) |
| + return true |
| +} |
| + |
| +func (m *MemcacheLock) logf(f func(string, ...interface{}), fmt string, args ...interface{}) { |
|
M-A Ruel
2015/03/30 12:52:30
I personally do not see much value in this, as it
|
| + args = append([]interface{}{m.key, string(m.clientID)}, args...) |
| + f("memlock(%s:%q): "+fmt, args...) |
| +} |
| + |
| +func (m *MemcacheLock) checkAnd(op checkOp) bool { |
|
M-A Ruel
2015/03/30 12:52:31
It's more like compareAndExchange(), which would b
|
| + itm, err := m.ctx.Get(m.key) |
| + if err != nil { |
| + m.logf(m.ctx.Warningf, "error getting: %s", err) |
| + return false |
| + } |
| + |
| + if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) { |
| + m.logf(m.ctx.Infof, "lock owned by %q", string(itm.Value)) |
|
M-A Ruel
2015/03/30 12:52:31
This will become very verbose eventually.
|
| + return false |
| + } |
| + |
| + if op == refresh { |
| + itm.Value = m.clientID |
| + itm.Expiration = memcacheLockTime |
| + } else { |
| + if len(itm.Value) == 0 { |
| + // it's already unlocked, no need to CAS |
| + m.logf(m.ctx.Infof, "lock already released") |
| + return true |
| + } |
| + itm.Value = []byte{} |
| + itm.Expiration = delay |
| + } |
| + |
| + err = m.ctx.CompareAndSwap(itm) |
|
M-A Ruel
2015/03/30 12:52:31
I don't like that it's doing 2 memcache operations
|
| + if err != nil { |
| + m.logf(m.ctx.Warningf, "failed to %s lock: %q", op, err) |
| + return false |
| + } |
| + |
| + m.logf(m.ctx.Infof, "%sed lock", op) |
|
M-A Ruel
2015/03/30 12:52:31
Remove.
|
| + return true |
| +} |