OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Package memlock allows multiple appengine handlers to coordinate best-effort | 5 // Package memlock allows multiple appengine handlers to coordinate best-effort |
6 // mutual execution via memcache. "best-effort" here means "best-effort"... | 6 // mutual execution via memcache. "best-effort" here means "best-effort"... |
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper | 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper |
8 // than, for example, colliding with datastore transactions. | 8 // than, for example, colliding with datastore transactions. |
9 package memlock | 9 package memlock |
10 | 10 |
11 import ( | 11 import ( |
12 "bytes" | 12 "bytes" |
13 "errors" | 13 "errors" |
14 » "golang.org/x/net/context" | 14 » "infra/gae/libs/wrapper" |
| 15 » "infra/libs/clock" |
15 "sync/atomic" | 16 "sync/atomic" |
16 "time" | 17 "time" |
17 | 18 |
18 » "infra/gae/libs/gae" | 19 » "github.com/luci/luci-go/common/logging" |
| 20 » "golang.org/x/net/context" |
19 | 21 |
20 » "github.com/luci/luci-go/common/clock" | 22 » "appengine/memcache" |
21 » "github.com/luci/luci-go/common/logging" | |
22 ) | 23 ) |
23 | 24 |
24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock | 25 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
25 // prior to invoking the user-supplied function. | 26 // prior to invoking the user-supplied function. |
26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") | 27 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") |
27 | 28 |
28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty | 29 // ErrEmptyClientID is returned from TryWithLock when you specify an empty |
29 // clientID. | 30 // clientID. |
30 var ErrEmptyClientID = errors.New("memlock: empty clientID") | 31 var ErrEmptyClientID = errors.New("memlock: empty clientID") |
31 | 32 |
(...skipping 27 matching lines...) Loading... |
59 // this client (lock-holder). If it's empty then TryWithLock() will return | 60 // this client (lock-holder). If it's empty then TryWithLock() will return |
60 // ErrEmptyClientID. | 61 // ErrEmptyClientID. |
61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { | 62 func TryWithLock(ctx context.Context, key, clientID string, f func(check func()
bool) error) error { |
62 if len(clientID) == 0 { | 63 if len(clientID) == 0 { |
63 return ErrEmptyClientID | 64 return ErrEmptyClientID |
64 } | 65 } |
65 | 66 |
66 ctx = logging.SetField(ctx, "key", key) | 67 ctx = logging.SetField(ctx, "key", key) |
67 ctx = logging.SetField(ctx, "clientID", clientID) | 68 ctx = logging.SetField(ctx, "clientID", clientID) |
68 log := logging.Get(ctx) | 69 log := logging.Get(ctx) |
69 » mc := gae.GetMC(ctx) | 70 » mc := wrapper.GetMC(ctx) |
70 | 71 |
71 key = memlockKeyPrefix + key | 72 key = memlockKeyPrefix + key |
72 cid := []byte(clientID) | 73 cid := []byte(clientID) |
73 | 74 |
74 // checkAnd gets the current value from memcache, and then attempts to d
o the | 75 // checkAnd gets the current value from memcache, and then attempts to d
o the |
75 // checkOp (which can either be `refresh` or `release`). These pieces of | 76 // checkOp (which can either be `refresh` or `release`). These pieces of |
76 // functionality are necessarially intertwined, because CAS only works w
ith | 77 // functionality are necessarially intertwined, because CAS only works w
ith |
77 // the exact-same *Item which was returned from a Get. | 78 // the exact-same *Item which was returned from a Get. |
78 // | 79 // |
79 // refresh will attempt to CAS the item with the same content to reset i
t's | 80 // refresh will attempt to CAS the item with the same content to reset i
t's |
80 // timeout. | 81 // timeout. |
81 // | 82 // |
82 // release will attempt to CAS the item to remove it's contents (clientI
D). | 83 // release will attempt to CAS the item to remove it's contents (clientI
D). |
83 // another lock observing an empty clientID will know that the lock is | 84 // another lock observing an empty clientID will know that the lock is |
84 // obtainable. | 85 // obtainable. |
85 checkAnd := func(op checkOp) bool { | 86 checkAnd := func(op checkOp) bool { |
86 itm, err := mc.Get(key) | 87 itm, err := mc.Get(key) |
87 if err != nil { | 88 if err != nil { |
88 log.Warningf("error getting: %s", err) | 89 log.Warningf("error getting: %s", err) |
89 return false | 90 return false |
90 } | 91 } |
91 | 92 |
92 » » if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) { | 93 » » if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) { |
93 » » » log.Infof("lock owned by %q", string(itm.Value())) | 94 » » » log.Infof("lock owned by %q", string(itm.Value)) |
94 return false | 95 return false |
95 } | 96 } |
96 | 97 |
97 if op == refresh { | 98 if op == refresh { |
98 » » » itm.SetValue(cid).SetExpiration(memcacheLockTime) | 99 » » » itm.Value = cid |
| 100 » » » itm.Expiration = memcacheLockTime |
99 } else { | 101 } else { |
100 » » » if len(itm.Value()) == 0 { | 102 » » » if len(itm.Value) == 0 { |
101 // it's already unlocked, no need to CAS | 103 // it's already unlocked, no need to CAS |
102 log.Infof("lock already released") | 104 log.Infof("lock already released") |
103 return true | 105 return true |
104 } | 106 } |
105 » » » itm.SetValue([]byte{}).SetExpiration(delay) | 107 » » » itm.Value = []byte{} |
| 108 » » » itm.Expiration = delay |
106 } | 109 } |
107 | 110 |
108 err = mc.CompareAndSwap(itm) | 111 err = mc.CompareAndSwap(itm) |
109 if err != nil { | 112 if err != nil { |
110 log.Warningf("failed to %s lock: %q", op, err) | 113 log.Warningf("failed to %s lock: %q", op, err) |
111 return false | 114 return false |
112 } | 115 } |
113 | 116 |
114 return true | 117 return true |
115 } | 118 } |
116 | 119 |
117 // Now the actual logic begins. First we 'Add' the item, which will set
it if | 120 // Now the actual logic begins. First we 'Add' the item, which will set
it if |
118 // it's not present in the memcache, otherwise leaves it alone. | 121 // it's not present in the memcache, otherwise leaves it alone. |
119 | 122 » err := mc.Add(&memcache.Item{ |
120 » err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi
me)) | 123 » » Key: key, Value: cid, Expiration: memcacheLockTime}) |
121 if err != nil { | 124 if err != nil { |
122 » » if err != gae.ErrMCNotStored { | 125 » » if err != memcache.ErrNotStored { |
123 log.Warningf("error adding: %s", err) | 126 log.Warningf("error adding: %s", err) |
124 } | 127 } |
125 if !checkAnd(refresh) { | 128 if !checkAnd(refresh) { |
126 return ErrFailedToLock | 129 return ErrFailedToLock |
127 } | 130 } |
128 } | 131 } |
129 | 132 |
130 // At this point we nominally have the lock (at least for memcacheLockTi
me). | 133 // At this point we nominally have the lock (at least for memcacheLockTi
me). |
131 | 134 |
132 stopChan := make(chan struct{}) | 135 stopChan := make(chan struct{}) |
(...skipping 24 matching lines...) Loading... |
157 return | 160 return |
158 } | 161 } |
159 } | 162 } |
160 | 163 |
161 checkAnd(release) | 164 checkAnd(release) |
162 atomic.StoreUint32(&held, 0) | 165 atomic.StoreUint32(&held, 0) |
163 }() | 166 }() |
164 | 167 |
165 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) | 168 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) |
166 } | 169 } |
OLD | NEW |