Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(395)

Side by Side Diff: go/src/infra/gae/libs/memlock/memlock.go

Issue 986553002: A simple memcache lock for appengine. (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@meta
Patch Set: and fix a boog Created 5 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package memlock
6
7 import (
8 "bytes"
9 "crypto/rand"
10 "sync/atomic"
11 "time"
12
13 "appengine"
14 "appengine/memcache"
15 )
16
17 type checkOp string
18
19 const (
20 release checkOp = "release"
21 refresh = "refresh"
22 )
23
24 // MemcacheLockTime is the expiration time of the memcache entry. If the lock
25 // is correctly released, then it will be released before this time.
26 const MemcacheLockTime = 16 * time.Second
27
28 // MemcacheLock allows multiple appengine handlers to coordinate best-effort
29 // mutual execution via memcache. "best-effort" here means "best-effort"...
30 // memcache is not reliable. However, colliding on memcache is a lot cheaper
31 // than, for example, colliding with datastore transactions.
32 //
33 // Construct a new MemcacheLock with the New() method.
34 type MemcacheLock struct {
35 ctx appengine.Context
36 key string
37 clientID []byte
38 }
39
40 // New creates a MemcacheLock. `key` is the memcache key to use. Clients locking
41 // the same data must use the same key. clientID is the unique identifier for
42 // this client (lock-holder). If it's empty then New() will generate a random
43 // one.
44 //
45 // Using a deterministid clientID (like the taskqueue task name, for example) ha s
46 // the benefit that on an error the re-tried taskqueue handler may be able to
47 // re-obtain the lock, assuming it wasn't released properly.
48 func New(c appengine.Context, key, clientID string) (*MemcacheLock, error) {
49 clientIDbytes := []byte(clientID)
50 if len(clientIDbytes) == 0 {
51 clientIDbytes = make([]byte, 32)
52 _, err := rand.Read(clientIDbytes)
53 if err != nil {
54 return nil, err
55 }
56 c.Debugf("memlock: generated clientId: %v", clientIDbytes)
57 }
58 return &MemcacheLock{
59 ctx: c,
60 key: "memlock:" + key,
61 clientID: clientIDbytes,
62 }, nil
63 }
64
65 // TryWithLock attempts to obtains the lock once, and then invokes f if
66 // sucessful. The `check` function can be used within f to see if the lock is
67 // still held. This function returns true iff the lock was acquired and f was
68 // invoked.
69 func (m *MemcacheLock) TryWithLock(f func(check func() bool)) bool {
70 err := memcache.Add(m.ctx, &memcache.Item{
71 Key: m.key, Expiration: MemcacheLockTime, Value: m.clientID})
72 if err != nil {
73 if err != memcache.ErrNotStored {
74 m.warningf("error adding: %s", err)
75 }
76 if !m.checkAnd(refresh) {
Vadim Sh. 2015/03/07 02:55:50 actually, it makes the lock reentrant (same gorout
iannucci 2015/03/29 21:22:36 oh, yeah, good point... I'll make it prevent this.
77 return false
78 }
79 }
80
81 stopChan := make(chan struct{}, 1)
Vadim Sh. 2015/03/07 02:55:50 these "-chan" make the impression I'm watching som
iannucci 2015/03/29 21:22:36 Chan-chan!
82 stoppedChan := make(chan struct{}, 1)
83 var held uint32 = 1
84
85 check := func() bool {
86 return atomic.LoadUint32(&held) == 1
87 }
88 defer func() {
89 stopChan <- struct{}{}
Vadim Sh. 2015/03/07 02:55:50 I think "idiomatic" (I start to not like this word
iannucci 2015/03/29 21:22:36 Hm, ok I guess that'll work.
90 <-stoppedChan
91 }()
92
93 go func() {
94 defer func() { stoppedChan <- struct{}{} }()
95 checkLoop:
96 for {
97 select {
98 case <-stopChan:
99 break checkLoop
Vadim Sh. 2015/03/07 02:55:50 do you really need a label here?
iannucci 2015/03/29 21:22:35 yeah. a plain break stops the select statement (wh
100 case <-time.After(time.Second):
101 }
102 if !m.checkAnd(refresh) {
103 atomic.StoreUint32(&held, 0)
104 m.warningf("lost lock: %s", err)
105 break
106 }
107 }
108
109 m.checkAnd(release)
110 atomic.StoreUint32(&held, 0)
111 }()
112
113 f(check)
114 return true
115 }
116
117 func (m *MemcacheLock) infof(fmt string, args ...interface{}) {
118 args = append([]interface{}{m.key, string(m.clientID)}, args...)
119 m.ctx.Infof("memlock(%s:%q): "+fmt, args)
120 }
121
122 func (m *MemcacheLock) warningf(fmt string, args ...interface{}) {
123 args = append([]interface{}{m.key, string(m.clientID)}, args...)
124 m.ctx.Warningf("memlock(%s:%q): "+fmt, args)
125 }
126
127 func (m *MemcacheLock) checkAnd(op checkOp) bool {
128 itm, err := memcache.Get(m.ctx, m.key)
129 if err != nil {
130 m.warningf("error getting: %s", err)
131 return false
132 }
133
134 if len(itm.Value) != 0 && !bytes.Equal(itm.Value, m.clientID) {
135 m.infof("lock owned by %q", string(itm.Value))
136 return false
137 }
138
139 if op == refresh {
140 itm.Value = m.clientID
141 itm.Expiration = MemcacheLockTime
142 } else {
143 if len(itm.Value) == 0 {
144 // it's already unlocked, no need to CAS
145 m.infof("lock already released")
146 return true
147 }
148 itm.Value = []byte{}
149 itm.Expiration = time.Second
150 }
151
152 err = memcache.CompareAndSwap(m.ctx, itm)
153 if err != nil {
154 m.warningf("failed to %s lock: %s", op, err)
155 return false
156 }
157
158 m.infof("%sed lock", op)
159 return true
160 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698