| Index: go/src/infra/gae/libs/memlock/memlock.go
|
| diff --git a/go/src/infra/gae/libs/memlock/memlock.go b/go/src/infra/gae/libs/memlock/memlock.go
|
| index fee7dcd2374ddd9690f0325b29c06f8321001e77..bf8ebbe0091ffb8d1d6e1586503fdbab0c4d39ab 100644
|
| --- a/go/src/infra/gae/libs/memlock/memlock.go
|
| +++ b/go/src/infra/gae/libs/memlock/memlock.go
|
| @@ -12,7 +12,6 @@ import (
|
| "bytes"
|
| "errors"
|
| "golang.org/x/net/context"
|
| - "sync/atomic"
|
| "time"
|
|
|
| "infra/gae/libs/gae"
|
| @@ -48,8 +47,8 @@ const (
|
| var memcacheLockTime = 16 * time.Second
|
|
|
| // TryWithLock attempts to obtains the lock once, and then invokes f if
|
| -// sucessful. The `check` function can be used within f to see if the lock is
|
| -// still held.
|
| +// sucessful. The context provided to f will be canceled (e.g. ctx.Done() will
|
| +// be closed) if memlock detects that we've lost the lock.
|
| //
|
| // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock,
|
| // otherwise returns the error that f returns.
|
| @@ -58,7 +57,13 @@ var memcacheLockTime = 16 * time.Second
|
| // the same data must use the same key. clientID is the unique identifier for
|
| // this client (lock-holder). If it's empty then TryWithLock() will return
|
| // ErrEmptyClientID.
|
| -func TryWithLock(ctx context.Context, key, clientID string, f func(check func() bool) error) error {
|
| +//
|
| +// Note that the lock provided by TryWithLock is a best-effort lock... some
|
| +// other form of locking or synchronization should be used inside of f (such as
|
| +// Datastore transactions) to ensure that f is, in fact, operating exclusively.
|
| +// The purpose of TryWithLock is to have a cheap filter to prevent unnecessary
|
| +// contention on heavier synchronization primitives like transactions.
|
| +func TryWithLock(ctx context.Context, key, clientID string, f func(context.Context) error) error {
|
| if len(clientID) == 0 {
|
| return ErrEmptyClientID
|
| }
|
| @@ -128,39 +133,36 @@ func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
|
| }
|
|
|
| // At this point we nominally have the lock (at least for memcacheLockTime).
|
| -
|
| - stopChan := make(chan struct{})
|
| - stoppedChan := make(chan struct{})
|
| - held := uint32(1)
|
| -
|
| + finished := make(chan struct{})
|
| + subCtx, cancelFunc := context.WithCancel(ctx)
|
| defer func() {
|
| - close(stopChan)
|
| - <-stoppedChan // this blocks TryWithLock until the goroutine below quits.
|
| + cancelFunc()
|
| + <-finished
|
| }()
|
|
|
| // This goroutine checks to see if we still posess the lock, and refreshes it
|
| - // if we do. It will stop doing this when either stopChan is activated (e.g.
|
| - // the user's function returns) or we lose the lock (memcache flake, etc.).
|
| + // if we do.
|
| go func() {
|
| - defer close(stoppedChan)
|
| + defer func() {
|
| + cancelFunc()
|
| + close(finished)
|
| + }()
|
|
|
| checkLoop:
|
| for {
|
| select {
|
| - case <-stopChan:
|
| + case <-subCtx.Done():
|
| break checkLoop
|
| case <-clock.Get(ctx).After(delay):
|
| }
|
| if !checkAnd(refresh) {
|
| - atomic.StoreUint32(&held, 0)
|
| log.Warningf("lost lock: %s", err)
|
| return
|
| }
|
| }
|
|
|
| checkAnd(release)
|
| - atomic.StoreUint32(&held, 0)
|
| }()
|
|
|
| - return f(func() bool { return atomic.LoadUint32(&held) == 1 })
|
| + return f(subCtx)
|
| }
|
|
|