OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 // Package memlock allows multiple appengine handlers to coordinate best-effort | 5 // Package memlock allows multiple appengine handlers to coordinate best-effort |
6 // mutual execution via memcache. "best-effort" here means "best-effort"... | 6 // mutual execution via memcache. "best-effort" here means "best-effort"... |
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper | 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper |
8 // than, for example, colliding with datastore transactions. | 8 // than, for example, colliding with datastore transactions. |
9 package memlock | 9 package memlock |
10 | 10 |
11 import ( | 11 import ( |
12 "bytes" | 12 "bytes" |
13 "errors" | 13 "errors" |
14 "golang.org/x/net/context" | 14 "golang.org/x/net/context" |
15 "sync/atomic" | |
16 "time" | 15 "time" |
17 | 16 |
18 "infra/gae/libs/gae" | 17 "infra/gae/libs/gae" |
19 | 18 |
20 "github.com/luci/luci-go/common/clock" | 19 "github.com/luci/luci-go/common/clock" |
21 "github.com/luci/luci-go/common/logging" | 20 "github.com/luci/luci-go/common/logging" |
22 ) | 21 ) |
23 | 22 |
24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock | 23 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
25 // prior to invoking the user-supplied function. | 24 // prior to invoking the user-supplied function. |
(...skipping 24 matching lines...) Expand all Loading... | |
50 // TryWithLock attempts to obtains the lock once, and then invokes f if | 49 // TryWithLock attempts to obtains the lock once, and then invokes f if |
51 // sucessful. The `check` function can be used within f to see if the lock is | 50 // sucessful. The `check` function can be used within f to see if the lock is |
52 // still held. | 51 // still held. |
53 // | 52 // |
54 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, | 53 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, |
55 // otherwise returns the error that f returns. | 54 // otherwise returns the error that f returns. |
56 // | 55 // |
57 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking | 56 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking |
58 // the same data must use the same key. clientID is the unique identifier for | 57 // the same data must use the same key. clientID is the unique identifier for |
59 // this client (lock-holder). If it's empty then TryWithLock() will return | 58 // this client (lock-holder). If it's empty then TryWithLock() will return |
60 // ErrEmptyClientID. | 59 // ErrEmptyClientID. |
dnj
2015/07/17 18:01:35
Document that the context that "f" is called with
iannucci
2015/07/17 18:08:25
I think you're misunderstanding how the cancelatio
dnj
2015/07/17 18:10:29
Ah bad comment on my part, I was mostly suggesting
| |
61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func() bool) error) error { | 60 func TryWithLock(ctx context.Context, key, clientID string, f func(context.Conte xt) error) error { |
62 if len(clientID) == 0 { | 61 if len(clientID) == 0 { |
63 return ErrEmptyClientID | 62 return ErrEmptyClientID |
64 } | 63 } |
65 | 64 |
66 ctx = logging.SetField(ctx, "key", key) | 65 ctx = logging.SetField(ctx, "key", key) |
67 ctx = logging.SetField(ctx, "clientID", clientID) | 66 ctx = logging.SetField(ctx, "clientID", clientID) |
68 log := logging.Get(ctx) | 67 log := logging.Get(ctx) |
69 mc := gae.GetMC(ctx) | 68 mc := gae.GetMC(ctx) |
70 | 69 |
71 key = memlockKeyPrefix + key | 70 key = memlockKeyPrefix + key |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
121 if err != nil { | 120 if err != nil { |
122 if err != gae.ErrMCNotStored { | 121 if err != gae.ErrMCNotStored { |
123 log.Warningf("error adding: %s", err) | 122 log.Warningf("error adding: %s", err) |
124 } | 123 } |
125 if !checkAnd(refresh) { | 124 if !checkAnd(refresh) { |
126 return ErrFailedToLock | 125 return ErrFailedToLock |
127 } | 126 } |
128 } | 127 } |
129 | 128 |
130 // At this point we nominally have the lock (at least for memcacheLockTi me). | 129 // At this point we nominally have the lock (at least for memcacheLockTi me). |
131 | 130 » finished := make(chan struct{}) |
132 » stopChan := make(chan struct{}) | 131 » subCtx, cancelFunc := context.WithCancel(ctx) |
dnj
2015/07/17 18:01:35
Main concern using contexts here is that if the pa
iannucci
2015/07/17 18:08:25
Yes
| |
133 » stoppedChan := make(chan struct{}) | |
134 » held := uint32(1) | |
135 | |
136 defer func() { | 132 defer func() { |
137 » » close(stopChan) | 133 » » cancelFunc() |
138 » » <-stoppedChan // this blocks TryWithLock until the goroutine bel ow quits. | 134 » » <-finished |
139 }() | 135 }() |
140 | 136 |
141 // This goroutine checks to see if we still posess the lock, and refresh es it | 137 // This goroutine checks to see if we still posess the lock, and refresh es it |
142 » // if we do. It will stop doing this when either stopChan is activated ( e.g. | 138 » // if we do. |
143 » // the user's function returns) or we lose the lock (memcache flake, etc .). | |
144 go func() { | 139 go func() { |
145 » » defer close(stoppedChan) | 140 » » defer func() { |
141 » » » cancelFunc() | |
142 » » » close(finished) | |
143 » » }() | |
146 | 144 |
147 checkLoop: | 145 checkLoop: |
148 for { | 146 for { |
149 select { | 147 select { |
150 » » » case <-stopChan: | 148 » » » case <-subCtx.Done(): |
151 break checkLoop | 149 break checkLoop |
152 case <-clock.Get(ctx).After(delay): | 150 case <-clock.Get(ctx).After(delay): |
153 } | 151 } |
154 if !checkAnd(refresh) { | 152 if !checkAnd(refresh) { |
155 atomic.StoreUint32(&held, 0) | |
156 log.Warningf("lost lock: %s", err) | 153 log.Warningf("lost lock: %s", err) |
157 return | 154 return |
158 } | 155 } |
159 } | 156 } |
160 | 157 |
161 checkAnd(release) | 158 checkAnd(release) |
162 atomic.StoreUint32(&held, 0) | |
163 }() | 159 }() |
164 | 160 |
165 » return f(func() bool { return atomic.LoadUint32(&held) == 1 }) | 161 » return f(subCtx) |
166 } | 162 } |
OLD | NEW |