OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package memlock | |
6 | |
7 import ( | |
8 "bytes" | |
9 "crypto/rand" | |
10 "sync/atomic" | |
11 "time" | |
12 | |
13 "appengine" | |
14 "appengine/memcache" | |
15 ) | |
16 | |
17 type checkOp string | |
18 | |
19 const ( | |
20 release checkOp = "release" | |
21 refresh = "refresh" | |
22 ) | |
23 | |
24 // MemcacheLockTime is the expiration time of the memcache entry. If the lock | |
25 // is correctly released, then it will be released before this time. | |
26 const MemcacheLockTime = 16 * time.Second | |
27 | |
28 // MemcacheLock allows multiple appengine handlers to coordinate best-effort | |
29 // mutual execution via memcache. "best-effort" here means "best-effort"... | |
30 // memcache is not reliable. However, colliding on memcache is a lot cheaper | |
31 // than, for example, colliding with datastore transactions. | |
32 // | |
33 // Construct a new MemcacheLock with the New() method. | |
34 type MemcacheLock struct { | |
35 ctx appengine.Context | |
36 key string | |
37 clientID []byte | |
38 } | |
39 | |
40 // New creates a MemcacheLock. `key` is the memcache key to use. Clients locking | |
41 // the same data must use the same key. clientID is the unique identifier for | |
42 // this client (lock-holder). If it's empty then New() will generate a random | |
43 // one. | |
44 // | |
45 // Using a deterministid clientID (like the taskqueue task name, for example) ha s | |
46 // the benefit that on an error the re-tried taskqueue handler may be able to | |
47 // re-obtain the lock, assuming it wasn't released properly. | |
48 func New(c appengine.Context, key, clientID string) (*MemcacheLock, error) { | |
49 clientIDbytes := []byte(clientID) | |
50 if len(clientIDbytes) == 0 { | |
51 clientIDbytes = make([]byte, 32) | |
52 _, err := rand.Read(clientIDbytes) | |
53 if err != nil { | |
54 return nil, err | |
55 } | |
56 c.Debugf("memlock: generated clientId: %v", clientIDbytes) | |
57 } | |
58 return &MemcacheLock{ | |
59 ctx: c, | |
60 key: "memlock:" + key, | |
61 clientID: clientIDbytes, | |
62 }, nil | |
63 } | |
64 | |
65 // TryWithLock attempts to obtains the lock once, and then invokes f if | |
66 // sucessful. The `check` function can be used within f to see if the lock is | |
67 // still held. This function returns true iff the lock was acquired and f was | |
68 // invoked. | |
69 func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool { | |
70 err := memcache.Add(m.ctx, &memcache.Item{ | |
71 Key: m.key, Expiration: MemcacheLockTime, Value: m.clientID}) | |
72 if err != nil { | |
73 if err != memcache.ErrNotStored { | |
74 m.warningf("error adding: %s", err) | |
75 } | |
76 if !m.checkAnd(refresh) { | |
Vadim Sh.
2015/03/07 02:55:50
actually, it makes the lock reentrant (same gorout
iannucci
2015/03/29 21:22:36
oh, yeah, good point... I'll make it prevent this.
| |
77 return false | |
78 } | |
79 } | |
80 | |
81 stopChan := make(chan struct{}, 1) | |
Vadim Sh.
2015/03/07 02:55:50
these "-chan" make the impression I'm watching som
iannucci
2015/03/29 21:22:36
Chan-chan!
| |
82 stoppedChan := make(chan struct{}, 1) | |
83 var held uint32 = 1 | |
84 | |
85 check := func() bool { | |
86 return atomic.LoadUint32(&held) == 1 | |
87 } | |
88 defer func() { | |
89 stopChan <- struct{}{} | |
Vadim Sh.
2015/03/07 02:55:50
I think "idiomatic" (I start to not like this word
iannucci
2015/03/29 21:22:36
Hm, ok I guess that'll work.
| |
90 <-stoppedChan | |
91 }() | |
92 | |
93 go func() { | |
94 defer func() { stoppedChan <- struct{}{} }() | |
95 checkLoop: | |
96 for { | |
97 select { | |
98 case <-stopChan: | |
99 break checkLoop | |
Vadim Sh.
2015/03/07 02:55:50
do you really need a label here?
iannucci
2015/03/29 21:22:35
yeah. a plain break stops the select statement (wh
| |
100 case <-time.After(time.Second): | |
101 } | |
102 if !m.checkAnd(refresh) { | |
103 atomic.StoreUint32(&held, 0) | |
104 m.warningf("lost lock: %s", err) | |
105 break | |
106 } | |
107 } | |
108 | |
109 m.checkAnd(release) | |
110 atomic.StoreUint32(&held, 0) | |
111 }() | |
112 | |
113 f(check) | |
114 return true | |
115 } | |
116 | |
117 func (m *MemcacheLock) infof(fmt string, args ...interface{}) { | |
118 args = append([]interface{}{m.key, string(m.clientID)}, args...) | |
119 m.ctx.Infof("memlock(%s:%q): "+fmt, args) | |
120 } | |
121 | |
122 func (m *MemcacheLock) warningf(fmt string, args ...interface{}) { | |
123 args = append([]interface{}{m.key, string(m.clientID)}, args...) | |
124 m.ctx.Warningf("memlock(%s:%q): "+fmt, args) | |
125 } | |
126 | |
127 func (m *MemcacheLock) checkAnd(op checkOp) bool { | |
128 itm, err := memcache.Get(m.ctx, m.key) | |
129 if err != nil { | |
130 m.warningf("error getting: %s", err) | |
131 return false | |
132 } | |
133 | |
134 if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) { | |
135 m.infof("lock owned by %q", string(itm.Value)) | |
136 return false | |
137 } | |
138 | |
139 if op == refresh { | |
140 itm.Value = m.clientID | |
141 itm.Expiration = MemcacheLockTime | |
142 } else { | |
143 if len(itm.Value) == 0 { | |
144 // it's already unlocked, no need to CAS | |
145 m.infof("lock already released") | |
146 return true | |
147 } | |
148 itm.Value = []byte{} | |
149 itm.Expiration = time.Second | |
150 } | |
151 | |
152 err = memcache.CompareAndSwap(m.ctx, itm) | |
153 if err != nil { | |
154 m.warningf("failed to %s lock: %s", op, err) | |
155 return false | |
156 } | |
157 | |
158 m.infof("%sed lock", op) | |
159 return true | |
160 } | |
OLD | NEW |