| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package memlock allows multiple appengine handlers to coordinate best-effort | 5 // Package memlock allows multiple appengine handlers to coordinate best-effort |
| 6 // mutual execution via memcache. "best-effort" here means "best-effort"... | 6 // mutual execution via memcache. "best-effort" here means "best-effort"... |
| 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper | 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper |
| 8 // than, for example, colliding with datastore transactions. | 8 // than, for example, colliding with datastore transactions. |
| 9 package memlock | 9 package memlock |
| 10 | 10 |
| 11 import ( | 11 import ( |
| 12 "bytes" | 12 "bytes" |
| 13 "errors" | 13 "errors" |
| 14 » "golang.org/x/net/context" | 14 » "infra/gae/libs/wrapper" |
| 15 » "infra/libs/clock" |
| 15 "sync/atomic" | 16 "sync/atomic" |
| 16 "time" | 17 "time" |
| 17 | 18 |
| 18 » "infra/gae/libs/gae" | 19 » "github.com/luci/luci-go/common/logging" |
| 20 » "golang.org/x/net/context" |
| 19 | 21 |
| 20 » "github.com/luci/luci-go/common/clock" | 22 » "appengine/memcache" |
| 21 » "github.com/luci/luci-go/common/logging" | |
| 22 ) | 23 ) |
| 23 | 24 |
| 24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock | 25 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
| 25 // prior to invoking the user-supplied function. | 26 // prior to invoking the user-supplied function. |
| 26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") | 27 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") |
| 27 | 28 |
| 28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty | 29 // ErrEmptyClientID is returned from TryWithLock when you specify an empty |
| 29 // clientID. | 30 // clientID. |
| 30 var ErrEmptyClientID = errors.New("memlock: empty clientID") | 31 var ErrEmptyClientID = errors.New("memlock: empty clientID") |
| 31 | 32 |
| (...skipping 27 matching lines...) Expand all Loading... |
| 59 // this client (lock-holder). If it's empty then TryWithLock() will return | 60 // this client (lock-holder). If it's empty then TryWithLock() will return |
| 60 // ErrEmptyClientID. | 61 // ErrEmptyClientID. |
| 61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { | 62 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { |
| 62 if len(clientID) == 0 { | 63 if len(clientID) == 0 { |
| 63 return ErrEmptyClientID | 64 return ErrEmptyClientID |
| 64 } | 65 } |
| 65 | 66 |
| 66 ctx = logging.SetField(ctx, "key", key) | 67 ctx = logging.SetField(ctx, "key", key) |
| 67 ctx = logging.SetField(ctx, "clientID", clientID) | 68 ctx = logging.SetField(ctx, "clientID", clientID) |
| 68 log := logging.Get(ctx) | 69 log := logging.Get(ctx) |
| 69 » mc := gae.GetMC(ctx) | 70 » mc := wrapper.GetMC(ctx) |
| 70 | 71 |
| 71 key = memlockKeyPrefix + key | 72 key = memlockKeyPrefix + key |
| 72 cid := []byte(clientID) | 73 cid := []byte(clientID) |
| 73 | 74 |
| 74 // checkAnd gets the current value from memcache, and then attempts to d
o the | 75 // checkAnd gets the current value from memcache, and then attempts to d
o the |
| 75 // checkOp (which can either be `refresh` or `release`). These pieces of | 76 // checkOp (which can either be `refresh` or `release`). These pieces of |
| 76 // functionality are necessarially intertwined, because CAS only works w
ith | 77 // functionality are necessarially intertwined, because CAS only works w
ith |
| 77 // the exact-same *Item which was returned from a Get. | 78 // the exact-same *Item which was returned from a Get. |
| 78 // | 79 // |
| 79 // refresh will attempt to CAS the item with the same content to reset i
t's | 80 // refresh will attempt to CAS the item with the same content to reset i
t's |
| 80 // timeout. | 81 // timeout. |
| 81 // | 82 // |
| 82 // release will attempt to CAS the item to remove it's contents (clientI
D). | 83 // release will attempt to CAS the item to remove it's contents (clientI
D). |
| 83 // another lock observing an empty clientID will know that the lock is | 84 // another lock observing an empty clientID will know that the lock is |
| 84 // obtainable. | 85 // obtainable. |
| 85 checkAnd := func(op checkOp) bool { | 86 checkAnd := func(op checkOp) bool { |
| 86 itm, err := mc.Get(key) | 87 itm, err := mc.Get(key) |
| 87 if err != nil { | 88 if err != nil { |
| 88 log.Warningf("error getting: %s", err) | 89 log.Warningf("error getting: %s", err) |
| 89 return false | 90 return false |
| 90 } | 91 } |
| 91 | 92 |
| 92 » » if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) { | 93 » » if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) { |
| 93 » » » log.Infof("lock owned by %q", string(itm.Value())) | 94 » » » log.Infof("lock owned by %q", string(itm.Value)) |
| 94 return false | 95 return false |
| 95 } | 96 } |
| 96 | 97 |
| 97 if op == refresh { | 98 if op == refresh { |
| 98 » » » itm.SetValue(cid).SetExpiration(memcacheLockTime) | 99 » » » itm.Value = cid |
| 100 » » » itm.Expiration = memcacheLockTime |
| 99 } else { | 101 } else { |
| 100 » » » if len(itm.Value()) == 0 { | 102 » » » if len(itm.Value) == 0 { |
| 101 // it's already unlocked, no need to CAS | 103 // it's already unlocked, no need to CAS |
| 102 log.Infof("lock already released") | 104 log.Infof("lock already released") |
| 103 return true | 105 return true |
| 104 } | 106 } |
| 105 » » » itm.SetValue([]byte{}).SetExpiration(delay) | 107 » » » itm.Value = []byte{} |
| 108 » » » itm.Expiration = delay |
| 106 } | 109 } |
| 107 | 110 |
| 108 err = mc.CompareAndSwap(itm) | 111 err = mc.CompareAndSwap(itm) |
| 109 if err != nil { | 112 if err != nil { |
| 110 log.Warningf("failed to %s lock: %q", op, err) | 113 log.Warningf("failed to %s lock: %q", op, err) |
| 111 return false | 114 return false |
| 112 } | 115 } |
| 113 | 116 |
| 114 return true | 117 return true |
| 115 } | 118 } |
| 116 | 119 |
| 117 // Now the actual logic begins. First we 'Add' the item, which will set
it if | 120 // Now the actual logic begins. First we 'Add' the item, which will set
it if |
| 118 // it's not present in the memcache, otherwise leaves it alone. | 121 // it's not present in the memcache, otherwise leaves it alone. |
| 119 | 122 » err := mc.Add(&memcache.Item{ |
| 120 » err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi
me)) | 123 » » Key: key, Value: cid, Expiration: memcacheLockTime}) |
| 121 if err != nil { | 124 if err != nil { |
| 122 » » if err != gae.ErrMCNotStored { | 125 » » if err != memcache.ErrNotStored { |
| 123 log.Warningf("error adding: %s", err) | 126 log.Warningf("error adding: %s", err) |
| 124 } | 127 } |
| 125 if !checkAnd(refresh) { | 128 if !checkAnd(refresh) { |
| 126 return ErrFailedToLock | 129 return ErrFailedToLock |
| 127 } | 130 } |
| 128 } | 131 } |
| 129 | 132 |
| 130 // At this point we nominally have the lock (at least for memcacheLockTi
me). | 133 // At this point we nominally have the lock (at least for memcacheLockTi
me). |
| 131 | 134 |
| 132 stopChan := make(chan struct{}) | 135 stopChan := make(chan struct{}) |
| (...skipping 24 matching lines...) Expand all Loading... |
| 157 return | 160 return |
| 158 } | 161 } |
| 159 } | 162 } |
| 160 | 163 |
| 161 checkAnd(release) | 164 checkAnd(release) |
| 162 atomic.StoreUint32(&held, 0) | 165 atomic.StoreUint32(&held, 0) |
| 163 }() | 166 }() |
| 164 | 167 |
| 165 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) | 168 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) |
| 166 } | 169 } |
| OLD | NEW |