| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 // Package memlock allows multiple appengine handlers to coordinate best-effort | 5 // Package memlock allows multiple appengine handlers to coordinate best-effort |
| 6 // mutual execution via memcache. "best-effort" here means "best-effort"... | 6 // mutual execution via memcache. "best-effort" here means "best-effort"... |
| 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper | 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper |
| 8 // than, for example, colliding with datastore transactions. | 8 // than, for example, colliding with datastore transactions. |
| 9 package memlock | 9 package memlock |
| 10 | 10 |
| 11 import ( | 11 import ( |
| 12 "bytes" | 12 "bytes" |
| 13 "errors" | 13 "errors" |
| 14 » "infra/gae/libs/wrapper" | 14 » "golang.org/x/net/context" |
| 15 » "infra/libs/clock" | |
| 16 "sync/atomic" | 15 "sync/atomic" |
| 17 "time" | 16 "time" |
| 18 | 17 |
| 18 "infra/gae/libs/gae" |
| 19 |
| 20 "github.com/luci/luci-go/common/clock" |
| 19 "github.com/luci/luci-go/common/logging" | 21 "github.com/luci/luci-go/common/logging" |
| 20 "golang.org/x/net/context" | |
| 21 | |
| 22 "appengine/memcache" | |
| 23 ) | 22 ) |
| 24 | 23 |
| 25 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock | 24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
| 26 // prior to invoking the user-supplied function. | 25 // prior to invoking the user-supplied function. |
| 27 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") | 26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") |
| 28 | 27 |
| 29 // ErrEmptyClientID is returned from TryWithLock when you specify an empty | 28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty |
| 30 // clientID. | 29 // clientID. |
| 31 var ErrEmptyClientID = errors.New("memlock: empty clientID") | 30 var ErrEmptyClientID = errors.New("memlock: empty clientID") |
| 32 | 31 |
| (...skipping 27 matching lines...) Expand all Loading... |
| 60 // this client (lock-holder). If it's empty then TryWithLock() will return | 59 // this client (lock-holder). If it's empty then TryWithLock() will return |
| 61 // ErrEmptyClientID. | 60 // ErrEmptyClientID. |
| 62 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { | 61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { |
| 63 if len(clientID) == 0 { | 62 if len(clientID) == 0 { |
| 64 return ErrEmptyClientID | 63 return ErrEmptyClientID |
| 65 } | 64 } |
| 66 | 65 |
| 67 ctx = logging.SetField(ctx, "key", key) | 66 ctx = logging.SetField(ctx, "key", key) |
| 68 ctx = logging.SetField(ctx, "clientID", clientID) | 67 ctx = logging.SetField(ctx, "clientID", clientID) |
| 69 log := logging.Get(ctx) | 68 log := logging.Get(ctx) |
| 70 » mc := wrapper.GetMC(ctx) | 69 » mc := gae.GetMC(ctx) |
| 71 | 70 |
| 72 key = memlockKeyPrefix + key | 71 key = memlockKeyPrefix + key |
| 73 cid := []byte(clientID) | 72 cid := []byte(clientID) |
| 74 | 73 |
| 75 // checkAnd gets the current value from memcache, and then attempts to d
o the | 74 // checkAnd gets the current value from memcache, and then attempts to d
o the |
| 76 // checkOp (which can either be `refresh` or `release`). These pieces of | 75 // checkOp (which can either be `refresh` or `release`). These pieces of |
| 77 // functionality are necessarially intertwined, because CAS only works w
ith | 76 // functionality are necessarially intertwined, because CAS only works w
ith |
| 78 // the exact-same *Item which was returned from a Get. | 77 // the exact-same *Item which was returned from a Get. |
| 79 // | 78 // |
| 80 // refresh will attempt to CAS the item with the same content to reset i
t's | 79 // refresh will attempt to CAS the item with the same content to reset i
t's |
| 81 // timeout. | 80 // timeout. |
| 82 // | 81 // |
| 83 // release will attempt to CAS the item to remove it's contents (clientI
D). | 82 // release will attempt to CAS the item to remove it's contents (clientI
D). |
| 84 // another lock observing an empty clientID will know that the lock is | 83 // another lock observing an empty clientID will know that the lock is |
| 85 // obtainable. | 84 // obtainable. |
| 86 checkAnd := func(op checkOp) bool { | 85 checkAnd := func(op checkOp) bool { |
| 87 itm, err := mc.Get(key) | 86 itm, err := mc.Get(key) |
| 88 if err != nil { | 87 if err != nil { |
| 89 log.Warningf("error getting: %s", err) | 88 log.Warningf("error getting: %s", err) |
| 90 return false | 89 return false |
| 91 } | 90 } |
| 92 | 91 |
| 93 » » if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) { | 92 » » if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) { |
| 94 » » » log.Infof("lock owned by %q", string(itm.Value)) | 93 » » » log.Infof("lock owned by %q", string(itm.Value())) |
| 95 return false | 94 return false |
| 96 } | 95 } |
| 97 | 96 |
| 98 if op == refresh { | 97 if op == refresh { |
| 99 » » » itm.Value = cid | 98 » » » itm.SetValue(cid).SetExpiration(memcacheLockTime) |
| 100 » » » itm.Expiration = memcacheLockTime | |
| 101 } else { | 99 } else { |
| 102 » » » if len(itm.Value) == 0 { | 100 » » » if len(itm.Value()) == 0 { |
| 103 // it's already unlocked, no need to CAS | 101 // it's already unlocked, no need to CAS |
| 104 log.Infof("lock already released") | 102 log.Infof("lock already released") |
| 105 return true | 103 return true |
| 106 } | 104 } |
| 107 » » » itm.Value = []byte{} | 105 » » » itm.SetValue([]byte{}).SetExpiration(delay) |
| 108 » » » itm.Expiration = delay | |
| 109 } | 106 } |
| 110 | 107 |
| 111 err = mc.CompareAndSwap(itm) | 108 err = mc.CompareAndSwap(itm) |
| 112 if err != nil { | 109 if err != nil { |
| 113 log.Warningf("failed to %s lock: %q", op, err) | 110 log.Warningf("failed to %s lock: %q", op, err) |
| 114 return false | 111 return false |
| 115 } | 112 } |
| 116 | 113 |
| 117 return true | 114 return true |
| 118 } | 115 } |
| 119 | 116 |
| 120 // Now the actual logic begins. First we 'Add' the item, which will set
it if | 117 // Now the actual logic begins. First we 'Add' the item, which will set
it if |
| 121 // it's not present in the memcache, otherwise leaves it alone. | 118 // it's not present in the memcache, otherwise leaves it alone. |
| 122 » err := mc.Add(&memcache.Item{ | 119 |
| 123 » » Key: key, Value: cid, Expiration: memcacheLockTime}) | 120 » err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi
me)) |
| 124 if err != nil { | 121 if err != nil { |
| 125 » » if err != memcache.ErrNotStored { | 122 » » if err != gae.ErrMCNotStored { |
| 126 log.Warningf("error adding: %s", err) | 123 log.Warningf("error adding: %s", err) |
| 127 } | 124 } |
| 128 if !checkAnd(refresh) { | 125 if !checkAnd(refresh) { |
| 129 return ErrFailedToLock | 126 return ErrFailedToLock |
| 130 } | 127 } |
| 131 } | 128 } |
| 132 | 129 |
| 133 // At this point we nominally have the lock (at least for memcacheLockTi
me). | 130 // At this point we nominally have the lock (at least for memcacheLockTi
me). |
| 134 | 131 |
| 135 stopChan := make(chan struct{}) | 132 stopChan := make(chan struct{}) |
| (...skipping 24 matching lines...) Expand all Loading... |
| 160 return | 157 return |
| 161 } | 158 } |
| 162 } | 159 } |
| 163 | 160 |
| 164 checkAnd(release) | 161 checkAnd(release) |
| 165 atomic.StoreUint32(&held, 0) | 162 atomic.StoreUint32(&held, 0) |
| 166 }() | 163 }() |
| 167 | 164 |
| 168 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) | 165 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) |
| 169 } | 166 } |
| OLD | NEW |