Chromium Code Reviews

Side by Side Diff: go/src/infra/gae/libs/memlock/memlock.go

Issue 1230303003: Revert "Refactor current GAE abstraction library to be free of the SDK*" (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff |
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package memlock allows multiple appengine handlers to coordinate best-effort 5 // Package memlock allows multiple appengine handlers to coordinate best-effort
6 // mutual execution via memcache. "best-effort" here means "best-effort"... 6 // mutual execution via memcache. "best-effort" here means "best-effort"...
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper
8 // than, for example, colliding with datastore transactions. 8 // than, for example, colliding with datastore transactions.
9 package memlock 9 package memlock
10 10
11 import ( 11 import (
12 "bytes" 12 "bytes"
13 "errors" 13 "errors"
14 » "golang.org/x/net/context" 14 » "infra/gae/libs/wrapper"
15 » "infra/libs/clock"
15 "sync/atomic" 16 "sync/atomic"
16 "time" 17 "time"
17 18
18 » "infra/gae/libs/gae" 19 » "github.com/luci/luci-go/common/logging"
20 » "golang.org/x/net/context"
19 21
20 » "github.com/luci/luci-go/common/clock" 22 » "appengine/memcache"
21 » "github.com/luci/luci-go/common/logging"
22 ) 23 )
23 24
24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock 25 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock
25 // prior to invoking the user-supplied function. 26 // prior to invoking the user-supplied function.
26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") 27 var ErrFailedToLock = errors.New("memlock: failed to obtain lock")
27 28
28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty 29 // ErrEmptyClientID is returned from TryWithLock when you specify an empty
29 // clientID. 30 // clientID.
30 var ErrEmptyClientID = errors.New("memlock: empty clientID") 31 var ErrEmptyClientID = errors.New("memlock: empty clientID")
31 32
(...skipping 27 matching lines...)
59 // this client (lock-holder). If it's empty then TryWithLock() will return 60 // this client (lock-holder). If it's empty then TryWithLock() will return
60 // ErrEmptyClientID. 61 // ErrEmptyClientID.
61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func() bool) error) error { 62 func TryWithLock(ctx context.Context, key, clientID string, f func(check func() bool) error) error {
62 if len(clientID) == 0 { 63 if len(clientID) == 0 {
63 return ErrEmptyClientID 64 return ErrEmptyClientID
64 } 65 }
65 66
66 ctx = logging.SetField(ctx, "key", key) 67 ctx = logging.SetField(ctx, "key", key)
67 ctx = logging.SetField(ctx, "clientID", clientID) 68 ctx = logging.SetField(ctx, "clientID", clientID)
68 log := logging.Get(ctx) 69 log := logging.Get(ctx)
69 » mc := gae.GetMC(ctx) 70 » mc := wrapper.GetMC(ctx)
70 71
71 key = memlockKeyPrefix + key 72 key = memlockKeyPrefix + key
72 cid := []byte(clientID) 73 cid := []byte(clientID)
73 74
74 // checkAnd gets the current value from memcache, and then attempts to d o the 75 // checkAnd gets the current value from memcache, and then attempts to d o the
75 // checkOp (which can either be `refresh` or `release`). These pieces of 76 // checkOp (which can either be `refresh` or `release`). These pieces of
76 // functionality are necessarially intertwined, because CAS only works w ith 77 // functionality are necessarially intertwined, because CAS only works w ith
77 // the exact-same *Item which was returned from a Get. 78 // the exact-same *Item which was returned from a Get.
78 // 79 //
79 // refresh will attempt to CAS the item with the same content to reset i t's 80 // refresh will attempt to CAS the item with the same content to reset i t's
80 // timeout. 81 // timeout.
81 // 82 //
82 // release will attempt to CAS the item to remove it's contents (clientI D). 83 // release will attempt to CAS the item to remove it's contents (clientI D).
83 // another lock observing an empty clientID will know that the lock is 84 // another lock observing an empty clientID will know that the lock is
84 // obtainable. 85 // obtainable.
85 checkAnd := func(op checkOp) bool { 86 checkAnd := func(op checkOp) bool {
86 itm, err := mc.Get(key) 87 itm, err := mc.Get(key)
87 if err != nil { 88 if err != nil {
88 log.Warningf("error getting: %s", err) 89 log.Warningf("error getting: %s", err)
89 return false 90 return false
90 } 91 }
91 92
92 » » if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) { 93 » » if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) {
93 » » » log.Infof("lock owned by %q", string(itm.Value())) 94 » » » log.Infof("lock owned by %q", string(itm.Value))
94 return false 95 return false
95 } 96 }
96 97
97 if op == refresh { 98 if op == refresh {
98 » » » itm.SetValue(cid).SetExpiration(memcacheLockTime) 99 » » » itm.Value = cid
100 » » » itm.Expiration = memcacheLockTime
99 } else { 101 } else {
100 » » » if len(itm.Value()) == 0 { 102 » » » if len(itm.Value) == 0 {
101 // it's already unlocked, no need to CAS 103 // it's already unlocked, no need to CAS
102 log.Infof("lock already released") 104 log.Infof("lock already released")
103 return true 105 return true
104 } 106 }
105 » » » itm.SetValue([]byte{}).SetExpiration(delay) 107 » » » itm.Value = []byte{}
108 » » » itm.Expiration = delay
106 } 109 }
107 110
108 err = mc.CompareAndSwap(itm) 111 err = mc.CompareAndSwap(itm)
109 if err != nil { 112 if err != nil {
110 log.Warningf("failed to %s lock: %q", op, err) 113 log.Warningf("failed to %s lock: %q", op, err)
111 return false 114 return false
112 } 115 }
113 116
114 return true 117 return true
115 } 118 }
116 119
117 // Now the actual logic begins. First we 'Add' the item, which will set it if 120 // Now the actual logic begins. First we 'Add' the item, which will set it if
118 // it's not present in the memcache, otherwise leaves it alone. 121 // it's not present in the memcache, otherwise leaves it alone.
119 122 » err := mc.Add(&memcache.Item{
120 » err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi me)) 123 » » Key: key, Value: cid, Expiration: memcacheLockTime})
121 if err != nil { 124 if err != nil {
122 » » if err != gae.ErrMCNotStored { 125 » » if err != memcache.ErrNotStored {
123 log.Warningf("error adding: %s", err) 126 log.Warningf("error adding: %s", err)
124 } 127 }
125 if !checkAnd(refresh) { 128 if !checkAnd(refresh) {
126 return ErrFailedToLock 129 return ErrFailedToLock
127 } 130 }
128 } 131 }
129 132
130 // At this point we nominally have the lock (at least for memcacheLockTi me). 133 // At this point we nominally have the lock (at least for memcacheLockTi me).
131 134
132 stopChan := make(chan struct{}) 135 stopChan := make(chan struct{})
(...skipping 24 matching lines...)
157 return 160 return
158 } 161 }
159 } 162 }
160 163
161 checkAnd(release) 164 checkAnd(release)
162 atomic.StoreUint32(&held, 0) 165 atomic.StoreUint32(&held, 0)
163 }() 166 }()
164 167
165 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) 168 return f(func() bool { return atomic.LoadUint32(&held) == 1 })
166 } 169 }
OLDNEW
« no previous file with comments | « go/src/infra/gae/libs/gae/upstream_types.go ('k') | go/src/infra/gae/libs/memlock/memlock_test.go » ('j') | no next file with comments »

Powered by Google App Engine