OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Package memlock allows multiple appengine handlers to coordinate best-effort | 5 // Package memlock allows multiple appengine handlers to coordinate best-effort |
6 // mutual execution via memcache. "best-effort" here means "best-effort"... | 6 // mutual execution via memcache. "best-effort" here means "best-effort"... |
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper | 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper |
8 // than, for example, colliding with datastore transactions. | 8 // than, for example, colliding with datastore transactions. |
9 package memlock | 9 package memlock |
10 | 10 |
11 import ( | 11 import ( |
12 "bytes" | 12 "bytes" |
13 "errors" | 13 "errors" |
14 » "infra/gae/libs/wrapper" | 14 » "golang.org/x/net/context" |
15 » "infra/libs/clock" | |
16 "sync/atomic" | 15 "sync/atomic" |
17 "time" | 16 "time" |
18 | 17 |
| 18 "infra/gae/libs/gae" |
| 19 |
| 20 "github.com/luci/luci-go/common/clock" |
19 "github.com/luci/luci-go/common/logging" | 21 "github.com/luci/luci-go/common/logging" |
20 "golang.org/x/net/context" | |
21 | |
22 "appengine/memcache" | |
23 ) | 22 ) |
24 | 23 |
25 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock | 24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
26 // prior to invoking the user-supplied function. | 25 // prior to invoking the user-supplied function. |
27 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") | 26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") |
28 | 27 |
29 // ErrEmptyClientID is returned from TryWithLock when you specify an empty | 28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty |
30 // clientID. | 29 // clientID. |
31 var ErrEmptyClientID = errors.New("memlock: empty clientID") | 30 var ErrEmptyClientID = errors.New("memlock: empty clientID") |
32 | 31 |
(...skipping 27 matching lines...) Expand all Loading... |
60 // this client (lock-holder). If it's empty then TryWithLock() will return | 59 // this client (lock-holder). If it's empty then TryWithLock() will return |
61 // ErrEmptyClientID. | 60 // ErrEmptyClientID. |
62 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { | 61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { |
63 if len(clientID) == 0 { | 62 if len(clientID) == 0 { |
64 return ErrEmptyClientID | 63 return ErrEmptyClientID |
65 } | 64 } |
66 | 65 |
67 ctx = logging.SetField(ctx, "key", key) | 66 ctx = logging.SetField(ctx, "key", key) |
68 ctx = logging.SetField(ctx, "clientID", clientID) | 67 ctx = logging.SetField(ctx, "clientID", clientID) |
69 log := logging.Get(ctx) | 68 log := logging.Get(ctx) |
70 » mc := wrapper.GetMC(ctx) | 69 » mc := gae.GetMC(ctx) |
71 | 70 |
72 key = memlockKeyPrefix + key | 71 key = memlockKeyPrefix + key |
73 cid := []byte(clientID) | 72 cid := []byte(clientID) |
74 | 73 |
75 // checkAnd gets the current value from memcache, and then attempts to d
o the | 74 // checkAnd gets the current value from memcache, and then attempts to d
o the |
76 // checkOp (which can either be `refresh` or `release`). These pieces of | 75 // checkOp (which can either be `refresh` or `release`). These pieces of |
77 // functionality are necessarially intertwined, because CAS only works w
ith | 76 // functionality are necessarially intertwined, because CAS only works w
ith |
78 // the exact-same *Item which was returned from a Get. | 77 // the exact-same *Item which was returned from a Get. |
79 // | 78 // |
80 // refresh will attempt to CAS the item with the same content to reset i
t's | 79 // refresh will attempt to CAS the item with the same content to reset i
t's |
81 // timeout. | 80 // timeout. |
82 // | 81 // |
83 // release will attempt to CAS the item to remove it's contents (clientI
D). | 82 // release will attempt to CAS the item to remove it's contents (clientI
D). |
84 // another lock observing an empty clientID will know that the lock is | 83 // another lock observing an empty clientID will know that the lock is |
85 // obtainable. | 84 // obtainable. |
86 checkAnd := func(op checkOp) bool { | 85 checkAnd := func(op checkOp) bool { |
87 itm, err := mc.Get(key) | 86 itm, err := mc.Get(key) |
88 if err != nil { | 87 if err != nil { |
89 log.Warningf("error getting: %s", err) | 88 log.Warningf("error getting: %s", err) |
90 return false | 89 return false |
91 } | 90 } |
92 | 91 |
93 » » if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) { | 92 » » if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) { |
94 » » » log.Infof("lock owned by %q", string(itm.Value)) | 93 » » » log.Infof("lock owned by %q", string(itm.Value())) |
95 return false | 94 return false |
96 } | 95 } |
97 | 96 |
98 if op == refresh { | 97 if op == refresh { |
99 » » » itm.Value = cid | 98 » » » itm.SetValue(cid).SetExpiration(memcacheLockTime) |
100 » » » itm.Expiration = memcacheLockTime | |
101 } else { | 99 } else { |
102 » » » if len(itm.Value) == 0 { | 100 » » » if len(itm.Value()) == 0 { |
103 // it's already unlocked, no need to CAS | 101 // it's already unlocked, no need to CAS |
104 log.Infof("lock already released") | 102 log.Infof("lock already released") |
105 return true | 103 return true |
106 } | 104 } |
107 » » » itm.Value = []byte{} | 105 » » » itm.SetValue([]byte{}).SetExpiration(delay) |
108 » » » itm.Expiration = delay | |
109 } | 106 } |
110 | 107 |
111 err = mc.CompareAndSwap(itm) | 108 err = mc.CompareAndSwap(itm) |
112 if err != nil { | 109 if err != nil { |
113 log.Warningf("failed to %s lock: %q", op, err) | 110 log.Warningf("failed to %s lock: %q", op, err) |
114 return false | 111 return false |
115 } | 112 } |
116 | 113 |
117 return true | 114 return true |
118 } | 115 } |
119 | 116 |
120 // Now the actual logic begins. First we 'Add' the item, which will set
it if | 117 // Now the actual logic begins. First we 'Add' the item, which will set
it if |
121 // it's not present in the memcache, otherwise leaves it alone. | 118 // it's not present in the memcache, otherwise leaves it alone. |
122 » err := mc.Add(&memcache.Item{ | 119 |
123 » » Key: key, Value: cid, Expiration: memcacheLockTime}) | 120 » err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi
me)) |
124 if err != nil { | 121 if err != nil { |
125 » » if err != memcache.ErrNotStored { | 122 » » if err != gae.ErrMCNotStored { |
126 log.Warningf("error adding: %s", err) | 123 log.Warningf("error adding: %s", err) |
127 } | 124 } |
128 if !checkAnd(refresh) { | 125 if !checkAnd(refresh) { |
129 return ErrFailedToLock | 126 return ErrFailedToLock |
130 } | 127 } |
131 } | 128 } |
132 | 129 |
133 // At this point we nominally have the lock (at least for memcacheLockTi
me). | 130 // At this point we nominally have the lock (at least for memcacheLockTi
me). |
134 | 131 |
135 stopChan := make(chan struct{}) | 132 stopChan := make(chan struct{}) |
(...skipping 24 matching lines...) Expand all Loading... |
160 return | 157 return |
161 } | 158 } |
162 } | 159 } |
163 | 160 |
164 checkAnd(release) | 161 checkAnd(release) |
165 atomic.StoreUint32(&held, 0) | 162 atomic.StoreUint32(&held, 0) |
166 }() | 163 }() |
167 | 164 |
168 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) | 165 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) |
169 } | 166 } |
OLD | NEW |