| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package memlock allows multiple appengine handlers to coordinate best-effort | 5 // Package memlock allows multiple appengine handlers to coordinate best-effort |
| 6 // mutual execution via memcache. "best-effort" here means "best-effort"... | 6 // mutual execution via memcache. "best-effort" here means "best-effort"... |
| 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper | 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper |
| 8 // than, for example, colliding with datastore transactions. | 8 // than, for example, colliding with datastore transactions. |
| 9 package memlock | 9 package memlock |
| 10 | 10 |
| 11 import ( | 11 import ( |
| 12 "bytes" | 12 "bytes" |
| 13 "errors" | 13 "errors" |
| 14 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
| 15 "sync/atomic" | |
| 16 "time" | 15 "time" |
| 17 | 16 |
| 18 "infra/gae/libs/gae" | 17 "infra/gae/libs/gae" |
| 19 | 18 |
| 20 "github.com/luci/luci-go/common/clock" | 19 "github.com/luci/luci-go/common/clock" |
| 21 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
| 22 ) | 21 ) |
| 23 | 22 |
| 24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock | 23 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
| 25 // prior to invoking the user-supplied function. | 24 // prior to invoking the user-supplied function. |
| (...skipping 15 matching lines...) Expand all Loading... |
| 41 release checkOp = "release" | 40 release checkOp = "release" |
| 42 refresh = "refresh" | 41 refresh = "refresh" |
| 43 ) | 42 ) |
| 44 | 43 |
| 45 // memcacheLockTime is the expiration time of the memcache entry. If the lock | 44 // memcacheLockTime is the expiration time of the memcache entry. If the lock |
| 46 // is correctly released, then it will be released before this time. It's a | 45 // is correctly released, then it will be released before this time. It's a |
| 47 // var so we can override it in the tests. | 46 // var so we can override it in the tests. |
| 48 var memcacheLockTime = 16 * time.Second | 47 var memcacheLockTime = 16 * time.Second |
| 49 | 48 |
| 50 // TryWithLock attempts to obtains the lock once, and then invokes f if | 49 // TryWithLock attempts to obtains the lock once, and then invokes f if |
| 51 // sucessful. The `check` function can be used within f to see if the lock is | 50 // sucessful. The context provided to f will be canceled (e.g. ctx.Done() will |
| 52 // still held. | 51 // be closed) if memlock detects that we've lost the lock. |
| 53 // | 52 // |
| 54 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, | 53 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, |
| 55 // otherwise returns the error that f returns. | 54 // otherwise returns the error that f returns. |
| 56 // | 55 // |
| 57 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking | 56 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking |
| 58 // the same data must use the same key. clientID is the unique identifier for | 57 // the same data must use the same key. clientID is the unique identifier for |
| 59 // this client (lock-holder). If it's empty then TryWithLock() will return | 58 // this client (lock-holder). If it's empty then TryWithLock() will return |
| 60 // ErrEmptyClientID. | 59 // ErrEmptyClientID. |
| 61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { | 60 // |
| 61 // Note that the lock provided by TryWithLock is a best-effort lock... some |
| 62 // other form of locking or synchronization should be used inside of f (such as |
| 63 // Datastore transactions) to ensure that f is, in fact, operating exclusively. |
| 64 // The purpose of TryWithLock is to have a cheap filter to prevent unnecessary |
| 65 // contention on heavier synchronization primitives like transactions. |
| 66 func TryWithLock(ctx context.Context, key, clientID string, f func(context.Conte
xt) error) error { |
| 62 if len(clientID) == 0 { | 67 if len(clientID) == 0 { |
| 63 return ErrEmptyClientID | 68 return ErrEmptyClientID |
| 64 } | 69 } |
| 65 | 70 |
| 66 ctx = logging.SetField(ctx, "key", key) | 71 ctx = logging.SetField(ctx, "key", key) |
| 67 ctx = logging.SetField(ctx, "clientID", clientID) | 72 ctx = logging.SetField(ctx, "clientID", clientID) |
| 68 log := logging.Get(ctx) | 73 log := logging.Get(ctx) |
| 69 mc := gae.GetMC(ctx) | 74 mc := gae.GetMC(ctx) |
| 70 | 75 |
| 71 key = memlockKeyPrefix + key | 76 key = memlockKeyPrefix + key |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 121 if err != nil { | 126 if err != nil { |
| 122 if err != gae.ErrMCNotStored { | 127 if err != gae.ErrMCNotStored { |
| 123 log.Warningf("error adding: %s", err) | 128 log.Warningf("error adding: %s", err) |
| 124 } | 129 } |
| 125 if !checkAnd(refresh) { | 130 if !checkAnd(refresh) { |
| 126 return ErrFailedToLock | 131 return ErrFailedToLock |
| 127 } | 132 } |
| 128 } | 133 } |
| 129 | 134 |
| 130 // At this point we nominally have the lock (at least for memcacheLockTi
me). | 135 // At this point we nominally have the lock (at least for memcacheLockTi
me). |
| 131 | 136 » finished := make(chan struct{}) |
| 132 » stopChan := make(chan struct{}) | 137 » subCtx, cancelFunc := context.WithCancel(ctx) |
| 133 » stoppedChan := make(chan struct{}) | |
| 134 » held := uint32(1) | |
| 135 | |
| 136 defer func() { | 138 defer func() { |
| 137 » » close(stopChan) | 139 » » cancelFunc() |
| 138 » » <-stoppedChan // this blocks TryWithLock until the goroutine bel
ow quits. | 140 » » <-finished |
| 139 }() | 141 }() |
| 140 | 142 |
| 141 // This goroutine checks to see if we still posess the lock, and refresh
es it | 143 // This goroutine checks to see if we still posess the lock, and refresh
es it |
| 142 » // if we do. It will stop doing this when either stopChan is activated (
e.g. | 144 » // if we do. |
| 143 » // the user's function returns) or we lose the lock (memcache flake, etc
.). | |
| 144 go func() { | 145 go func() { |
| 145 » » defer close(stoppedChan) | 146 » » defer func() { |
| 147 » » » cancelFunc() |
| 148 » » » close(finished) |
| 149 » » }() |
| 146 | 150 |
| 147 checkLoop: | 151 checkLoop: |
| 148 for { | 152 for { |
| 149 select { | 153 select { |
| 150 » » » case <-stopChan: | 154 » » » case <-subCtx.Done(): |
| 151 break checkLoop | 155 break checkLoop |
| 152 case <-clock.Get(ctx).After(delay): | 156 case <-clock.Get(ctx).After(delay): |
| 153 } | 157 } |
| 154 if !checkAnd(refresh) { | 158 if !checkAnd(refresh) { |
| 155 atomic.StoreUint32(&held, 0) | |
| 156 log.Warningf("lost lock: %s", err) | 159 log.Warningf("lost lock: %s", err) |
| 157 return | 160 return |
| 158 } | 161 } |
| 159 } | 162 } |
| 160 | 163 |
| 161 checkAnd(release) | 164 checkAnd(release) |
| 162 atomic.StoreUint32(&held, 0) | |
| 163 }() | 165 }() |
| 164 | 166 |
| 165 » return f(func() bool { return atomic.LoadUint32(&held) == 1 }) | 167 » return f(subCtx) |
| 166 } | 168 } |
| OLD | NEW |