OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 // Package memlock allows multiple appengine handlers to coordinate best-effort |
| 6 // mutual execution via memcache. "best-effort" here means "best-effort"... |
| 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper |
| 8 // than, for example, colliding with datastore transactions. |
| 9 package memlock |
| 10 |
| 11 import ( |
| 12 "bytes" |
| 13 "errors" |
| 14 "time" |
| 15 |
| 16 "github.com/luci/gae/service/memcache" |
| 17 "github.com/luci/luci-go/common/clock" |
| 18 "github.com/luci/luci-go/common/logging" |
| 19 "golang.org/x/net/context" |
| 20 ) |
| 21 |
| 22 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock |
| 23 // prior to invoking the user-supplied function. |
| 24 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") |
| 25 |
| 26 // ErrEmptyClientID is returned from TryWithLock when you specify an empty |
| 27 // clientID. |
| 28 var ErrEmptyClientID = errors.New("memlock: empty clientID") |
| 29 |
| 30 // memlockKeyPrefix is the memcache Key prefix for all user-supplied keys. |
| 31 const memlockKeyPrefix = "memlock:" |
| 32 |
| 33 type checkOp string |
| 34 |
| 35 // var so we can override it in the tests |
| 36 var delay = time.Second |
| 37 |
| 38 type testStopCBKeyType int |
| 39 |
| 40 var testStopCBKey testStopCBKeyType |
| 41 |
| 42 const ( |
| 43 release checkOp = "release" |
| 44 refresh = "refresh" |
| 45 ) |
| 46 |
| 47 // memcacheLockTime is the expiration time of the memcache entry. If the lock |
| 48 // is correctly released, then it will be released before this time. It's a |
| 49 // var so we can override it in the tests. |
| 50 var memcacheLockTime = 16 * time.Second |
| 51 |
| 52 // TryWithLock attempts to obtains the lock once, and then invokes f if |
| 53 // sucessful. The context provided to f will be canceled (e.g. ctx.Done() will |
| 54 // be closed) if memlock detects that we've lost the lock. |
| 55 // |
| 56 // TryWithLock function returns ErrFailedToLock if it fails to obtain the lock, |
| 57 // otherwise returns the error that f returns. |
| 58 // |
| 59 // `key` is the memcache key to use (i.e. the name of the lock). Clients locking |
| 60 // the same data must use the same key. clientID is the unique identifier for |
| 61 // this client (lock-holder). If it's empty then TryWithLock() will return |
| 62 // ErrEmptyClientID. |
| 63 // |
| 64 // Note that the lock provided by TryWithLock is a best-effort lock... some |
| 65 // other form of locking or synchronization should be used inside of f (such as |
| 66 // Datastore transactions) to ensure that f is, in fact, operating exclusively. |
| 67 // The purpose of TryWithLock is to have a cheap filter to prevent unnecessary |
| 68 // contention on heavier synchronization primitives like transactions. |
| 69 func TryWithLock(ctx context.Context, key, clientID string, f func(context.Conte
xt) error) error { |
| 70 if len(clientID) == 0 { |
| 71 return ErrEmptyClientID |
| 72 } |
| 73 |
| 74 ctx = logging.SetField(ctx, "key", key) |
| 75 ctx = logging.SetField(ctx, "clientID", clientID) |
| 76 log := logging.Get(ctx) |
| 77 mc := memcache.Get(ctx) |
| 78 |
| 79 key = memlockKeyPrefix + key |
| 80 cid := []byte(clientID) |
| 81 |
| 82 // checkAnd gets the current value from memcache, and then attempts to d
o the |
| 83 // checkOp (which can either be `refresh` or `release`). These pieces of |
| 84 // functionality are necessarially intertwined, because CAS only works w
ith |
| 85 // the exact-same *Item which was returned from a Get. |
| 86 // |
| 87 // refresh will attempt to CAS the item with the same content to reset i
t's |
| 88 // timeout. |
| 89 // |
| 90 // release will attempt to CAS the item to remove it's contents (clientI
D). |
| 91 // another lock observing an empty clientID will know that the lock is |
| 92 // obtainable. |
| 93 checkAnd := func(op checkOp) bool { |
| 94 itm, err := mc.Get(key) |
| 95 if err != nil { |
| 96 log.Warningf("error getting: %s", err) |
| 97 return false |
| 98 } |
| 99 |
| 100 if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) { |
| 101 log.Infof("lock owned by %q", string(itm.Value())) |
| 102 return false |
| 103 } |
| 104 |
| 105 if op == refresh { |
| 106 itm.SetValue(cid).SetExpiration(memcacheLockTime) |
| 107 } else { |
| 108 if len(itm.Value()) == 0 { |
| 109 // it's already unlocked, no need to CAS |
| 110 log.Infof("lock already released") |
| 111 return true |
| 112 } |
| 113 itm.SetValue([]byte{}).SetExpiration(delay) |
| 114 } |
| 115 |
| 116 if err := mc.CompareAndSwap(itm); err != nil { |
| 117 log.Warningf("failed to %s lock: %q", op, err) |
| 118 return false |
| 119 } |
| 120 |
| 121 return true |
| 122 } |
| 123 |
| 124 // Now the actual logic begins. First we 'Add' the item, which will set
it if |
| 125 // it's not present in the memcache, otherwise leaves it alone. |
| 126 |
| 127 err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi
me)) |
| 128 if err != nil { |
| 129 if err != memcache.ErrNotStored { |
| 130 log.Warningf("error adding: %s", err) |
| 131 } |
| 132 if !checkAnd(refresh) { |
| 133 return ErrFailedToLock |
| 134 } |
| 135 } |
| 136 |
| 137 // At this point we nominally have the lock (at least for memcacheLockTi
me). |
| 138 finished := make(chan struct{}) |
| 139 subCtx, cancelFunc := context.WithCancel(ctx) |
| 140 defer func() { |
| 141 cancelFunc() |
| 142 <-finished |
| 143 }() |
| 144 |
| 145 testStopCB, _ := ctx.Value(testStopCBKey).(func()) |
| 146 |
| 147 // This goroutine checks to see if we still posess the lock, and refresh
es it |
| 148 // if we do. |
| 149 go func() { |
| 150 defer func() { |
| 151 cancelFunc() |
| 152 close(finished) |
| 153 }() |
| 154 |
| 155 checkLoop: |
| 156 for { |
| 157 select { |
| 158 case <-subCtx.Done(): |
| 159 break checkLoop |
| 160 case <-clock.Get(ctx).After(delay): |
| 161 } |
| 162 if !checkAnd(refresh) { |
| 163 log.Warningf("lost lock: %s", err) |
| 164 return |
| 165 } |
| 166 } |
| 167 |
| 168 if testStopCB != nil { |
| 169 testStopCB() |
| 170 } |
| 171 checkAnd(release) |
| 172 }() |
| 173 |
| 174 return f(subCtx) |
| 175 } |
OLD | NEW |