Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(22)

Side by Side Diff: appengine/memlock/memlock.go

Issue 1399533003: Move appengine/memlock from infra.git (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@add_meta
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/memlock/memlock_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // Package memlock allows multiple appengine handlers to coordinate best-effort
6 // mutual execution via memcache. "best-effort" here means "best-effort"...
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper
8 // than, for example, colliding with datastore transactions.
9 package memlock
10
11 import (
12 "bytes"
13 "errors"
14 "time"
15
16 "github.com/luci/gae/service/memcache"
17 "github.com/luci/luci-go/common/clock"
18 "github.com/luci/luci-go/common/logging"
19 "golang.org/x/net/context"
20 )
21
22 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock
23 // prior to invoking the user-supplied function.
24 var ErrFailedToLock = errors.New("memlock: failed to obtain lock")
25
26 // ErrEmptyClientID is returned from TryWithLock when you specify an empty
27 // clientID.
28 var ErrEmptyClientID = errors.New("memlock: empty clientID")
29
30 // memlockKeyPrefix is the memcache Key prefix for all user-supplied keys.
31 const memlockKeyPrefix = "memlock:"
32
33 type checkOp string
34
35 // var so we can override it in the tests
36 var delay = time.Second
37
38 type testStopCBKeyType int
39
40 var testStopCBKey testStopCBKeyType
41
42 const (
43 release checkOp = "release"
44 refresh = "refresh"
45 )
46
47 // memcacheLockTime is the expiration time of the memcache entry. If the lock
48 // is correctly released, then it will be released before this time. It's a
49 // var so we can override it in the tests.
50 var memcacheLockTime = 16 * time.Second
51
52 // TryWithLock attempts to obtains the lock once, and then invokes f if
53 // sucessful. The context provided to f will be canceled (e.g. ctx.Done() will
54 // be closed) if memlock detects that we've lost the lock.
55 //
56 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock,
57 // otherwise returns the error that f returns.
58 //
59 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking
60 // the same data must use the same key. clientID is the unique identifier for
61 // this client (lock-holder). If it's empty then TryWithLock() will return
62 // ErrEmptyClientID.
63 //
64 // Note that the lock provided by TryWithLock is a best-effort lock... some
65 // other form of locking or synchronization should be used inside of f (such as
66 // Datastore transactions) to ensure that f is, in fact, operating exclusively.
67 // The purpose of TryWithLock is to have a cheap filter to prevent unnecessary
68 // contention on heavier synchronization primitives like transactions.
69 func TryWithLock(ctx context.Context, key, clientID string, f func(context.Conte xt) error) error {
70 if len(clientID) == 0 {
71 return ErrEmptyClientID
72 }
73
74 ctx = logging.SetField(ctx, "key", key)
75 ctx = logging.SetField(ctx, "clientID", clientID)
76 log := logging.Get(ctx)
77 mc := memcache.Get(ctx)
78
79 key = memlockKeyPrefix + key
80 cid := []byte(clientID)
81
82 // checkAnd gets the current value from memcache, and then attempts to d o the
83 // checkOp (which can either be `refresh` or `release`). These pieces of
84 // functionality are necessarially intertwined, because CAS only works w ith
85 // the exact-same *Item which was returned from a Get.
86 //
87 // refresh will attempt to CAS the item with the same content to reset i t's
88 // timeout.
89 //
90 // release will attempt to CAS the item to remove it's contents (clientI D).
91 // another lock observing an empty clientID will know that the lock is
92 // obtainable.
93 checkAnd := func(op checkOp) bool {
94 itm, err := mc.Get(key)
95 if err != nil {
96 log.Warningf("error getting: %s", err)
97 return false
98 }
99
100 if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) {
101 log.Infof("lock owned by %q", string(itm.Value()))
102 return false
103 }
104
105 if op == refresh {
106 itm.SetValue(cid).SetExpiration(memcacheLockTime)
107 } else {
108 if len(itm.Value()) == 0 {
109 // it's already unlocked, no need to CAS
110 log.Infof("lock already released")
111 return true
112 }
113 itm.SetValue([]byte{}).SetExpiration(delay)
114 }
115
116 if err := mc.CompareAndSwap(itm); err != nil {
117 log.Warningf("failed to %s lock: %q", op, err)
118 return false
119 }
120
121 return true
122 }
123
124 // Now the actual logic begins. First we 'Add' the item, which will set it if
125 // it's not present in the memcache, otherwise leaves it alone.
126
127 err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi me))
128 if err != nil {
129 if err != memcache.ErrNotStored {
130 log.Warningf("error adding: %s", err)
131 }
132 if !checkAnd(refresh) {
133 return ErrFailedToLock
134 }
135 }
136
137 // At this point we nominally have the lock (at least for memcacheLockTi me).
138 finished := make(chan struct{})
139 subCtx, cancelFunc := context.WithCancel(ctx)
140 defer func() {
141 cancelFunc()
142 <-finished
143 }()
144
145 testStopCB, _ := ctx.Value(testStopCBKey).(func())
146
147 // This goroutine checks to see if we still posess the lock, and refresh es it
148 // if we do.
149 go func() {
150 defer func() {
151 cancelFunc()
152 close(finished)
153 }()
154
155 checkLoop:
156 for {
157 select {
158 case <-subCtx.Done():
159 break checkLoop
160 case <-clock.Get(ctx).After(delay):
161 }
162 if !checkAnd(refresh) {
163 log.Warningf("lost lock: %s", err)
164 return
165 }
166 }
167
168 if testStopCB != nil {
169 testStopCB()
170 }
171 checkAnd(release)
172 }()
173
174 return f(subCtx)
175 }
OLDNEW
« no previous file with comments | « no previous file | appengine/memlock/memlock_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698