OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 // +build appengine | |
6 | |
7 package memlock | |
8 | |
9 import ( | |
10 "bytes" | |
11 "crypto/rand" | |
12 "sync/atomic" | |
13 "time" | |
14 | |
15 "appengine/memcache" | |
16 | |
17 "infra/gae/libs/context" | |
18 "infra/libs/logging" | |
19 ) | |
20 | |
21 type checkOp string | |
22 | |
23 var delay = time.Second | |
M-A Ruel
2015/03/30 12:52:31
You can use a const.
| |
24 | |
25 const ( | |
26 release checkOp = "release" | |
M-A Ruel
2015/03/30 12:52:31
releaseOp
refreshOp
| |
27 refresh = "refresh" | |
28 ) | |
29 | |
30 // memcacheLockTime is the expiration time of the memcache entry. If the lock | |
31 // is correctly released, then it will be released before this time. | |
32 var memcacheLockTime = 16 * time.Second | |
M-A Ruel
2015/03/30 12:52:31
You can use a const.
16s is hell of a long time
| |
33 | |
34 type Context interface { | |
M-A Ruel
2015/03/30 12:52:31
If it's exported, document it.
| |
35 logging.Logger | |
36 context.MCSingleReadWriter | |
37 } | |
38 | |
39 // MemcacheLock allows multiple appengine handlers to coordinate best-effort | |
40 // mutual execution via memcache. "best-effort" here means "best-effort"... | |
41 // memcache is not reliable. However, colliding on memcache is a lot cheaper | |
42 // than, for example, colliding with datastore transactions. | |
43 // | |
44 // Construct a new MemcacheLock with the New() method. | |
45 type MemcacheLock struct { | |
46 ctx Context | |
47 key string | |
48 clientID []byte | |
49 held bool | |
50 } | |
51 | |
52 // New creates a MemcacheLock. `key` is the memcache key to use. Clients locking | |
53 // the same data must use the same key. clientID is the unique identifier for | |
54 // this client (lock-holder). If it's empty then New() will generate a random | |
M-A Ruel
2015/03/30 12:52:31
Why generate a random one? What's the use case?
| |
55 // one. | |
56 // | |
57 // Using a deterministid clientID (like the taskqueue task name, for example) ha s | |
58 // the benefit that on an error the re-tried taskqueue handler may be able to | |
59 // re-obtain the lock, assuming it wasn't released properly. | |
60 func New(c Context, key, clientID string) (*MemcacheLock, error) { | |
61 clientIDbytes := []byte(clientID) | |
62 if len(clientIDbytes) == 0 { | |
63 clientIDbytes = make([]byte, 32) | |
64 _, err := rand.Read(clientIDbytes) | |
65 if err != nil { | |
66 return nil, err | |
M-A Ruel
2015/03/30 12:52:30
In this case specifically (using crypto/rand) I th
| |
67 } | |
68 c.Debugf("memlock: generated clientId: %v", clientIDbytes) | |
69 } | |
70 return &MemcacheLock{ | |
71 ctx: c, | |
72 key: "memlock:" + key, | |
73 clientID: clientIDbytes, | |
74 }, nil | |
75 } | |
76 | |
77 // TryWithLock attempts to obtains the lock once, and then invokes f if | |
78 // sucessful. The `check` function can be used within f to see if the lock is | |
79 // still held. This function returns true iff the lock was acquired and f was | |
80 // invoked. | |
81 func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool { | |
M-A Ruel
2015/03/30 12:52:31
The main issue with that is that you force the cal
| |
82 if m.held { | |
83 m.logf(m.ctx.Errorf, "re-entering memlock!") | |
84 panic("re-entering memlock!") | |
85 } | |
86 m.held = true | |
M-A Ruel
2015/03/30 12:52:30
To double check; MemcacheLock is not thread safe.
| |
87 defer func() { m.held = false }() | |
88 | |
89 err := m.ctx.Add(&memcache.Item{ | |
90 Key: m.key, Expiration: memcacheLockTime, Value: m.clientID}) | |
91 if err != nil { | |
92 if err != memcache.ErrNotStored { | |
93 m.logf(m.ctx.Warningf, "error adding: %s", err) | |
94 } | |
95 if !m.checkAnd(refresh) { | |
96 return false | |
97 } | |
98 } | |
99 | |
100 stopChan := make(chan struct{}) | |
101 stoppedChan := make(chan struct{}) | |
102 var held uint32 = 1 | |
103 | |
104 check := func() bool { | |
105 return atomic.LoadUint32(&held) == 1 | |
106 } | |
107 defer func() { | |
108 close(stopChan) | |
109 <-stoppedChan | |
110 }() | |
111 | |
112 go func() { | |
113 defer close(stoppedChan) | |
114 | |
115 checkLoop: | |
116 for { | |
117 select { | |
118 case <-stopChan: | |
119 break checkLoop | |
M-A Ruel
2015/03/30 12:52:31
use return instead, this removes need for goto
| |
120 case <-time.After(delay): | |
121 } | |
122 if !m.checkAnd(refresh) { | |
123 atomic.StoreUint32(&held, 0) | |
124 m.logf(m.ctx.Warningf, "lost lock: %s", err) | |
125 break | |
M-A Ruel
2015/03/30 12:52:31
remove
| |
126 } | |
127 } | |
128 | |
129 m.checkAnd(release) | |
M-A Ruel
2015/03/30 12:52:30
why not in defer?
| |
130 atomic.StoreUint32(&held, 0) | |
131 }() | |
132 | |
133 f(check) | |
134 return true | |
135 } | |
136 | |
137 func (m *MemcacheLock) logf(f func(string, ...interface{}), fmt string, args ... interface{}) { | |
M-A Ruel
2015/03/30 12:52:30
I personally do not see much value in this, as it
| |
138 args = append([]interface{}{m.key, string(m.clientID)}, args...) | |
139 f("memlock(%s:%q): "+fmt, args...) | |
140 } | |
141 | |
142 func (m *MemcacheLock) checkAnd(op checkOp) bool { | |
M-A Ruel
2015/03/30 12:52:31
It's more like compareAndExchange(), which would b
| |
143 itm, err := m.ctx.Get(m.key) | |
144 if err != nil { | |
145 m.logf(m.ctx.Warningf, "error getting: %s", err) | |
146 return false | |
147 } | |
148 | |
149 if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) { | |
150 m.logf(m.ctx.Infof, "lock owned by %q", string(itm.Value)) | |
M-A Ruel
2015/03/30 12:52:31
This will become very verbose eventually.
| |
151 return false | |
152 } | |
153 | |
154 if op == refresh { | |
155 itm.Value = m.clientID | |
156 itm.Expiration = memcacheLockTime | |
157 } else { | |
158 if len(itm.Value) == 0 { | |
159 // it's already unlocked, no need to CAS | |
160 m.logf(m.ctx.Infof, "lock already released") | |
161 return true | |
162 } | |
163 itm.Value = []byte{} | |
164 itm.Expiration = delay | |
165 } | |
166 | |
167 err = m.ctx.CompareAndSwap(itm) | |
M-A Ruel
2015/03/30 12:52:31
I don't like that it's doing 2 memcache operations
| |
168 if err != nil { | |
169 m.logf(m.ctx.Warningf, "failed to %s lock: %q", op, err) | |
170 return false | |
171 } | |
172 | |
173 m.logf(m.ctx.Infof, "%sed lock", op) | |
M-A Ruel
2015/03/30 12:52:31
Remove.
| |
174 return true | |
175 } | |
OLD | NEW |