Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(952)

Side by Side Diff: go/src/infra/gae/libs/memlock/memlock.go

Issue 1222903002: Refactor current GAE abstraction library to be free of the SDK* (Closed) Base URL: https://chromium.googlesource.com/infra/infra.git@master
Patch Set: more fixes Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 // Package memlock allows multiple appengine handlers to coordinate best-effort 5 // Package memlock allows multiple appengine handlers to coordinate best-effort
6 // mutual execution via memcache. "best-effort" here means "best-effort"... 6 // mutual execution via memcache. "best-effort" here means "best-effort"...
7 // memcache is not reliable. However, colliding on memcache is a lot cheaper 7 // memcache is not reliable. However, colliding on memcache is a lot cheaper
8 // than, for example, colliding with datastore transactions. 8 // than, for example, colliding with datastore transactions.
9 package memlock 9 package memlock
10 10
11 import ( 11 import (
12 "bytes" 12 "bytes"
13 "errors" 13 "errors"
14 » "infra/gae/libs/wrapper" 14 » "golang.org/x/net/context"
15 » "infra/libs/clock"
16 "sync/atomic" 15 "sync/atomic"
17 "time" 16 "time"
18 17
18 "infra/gae/libs/gae"
19
20 "github.com/luci/luci-go/common/clock"
19 "github.com/luci/luci-go/common/logging" 21 "github.com/luci/luci-go/common/logging"
20 "golang.org/x/net/context"
21
22 "appengine/memcache"
23 ) 22 )
24 23
25 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock 24 // ErrFailedToLock is returned from TryWithLock when it fails to obtain a lock
26 // prior to invoking the user-supplied function. 25 // prior to invoking the user-supplied function.
27 var ErrFailedToLock = errors.New("memlock: failed to obtain lock") 26 var ErrFailedToLock = errors.New("memlock: failed to obtain lock")
28 27
29 // ErrEmptyClientID is returned from TryWithLock when you specify an empty 28 // ErrEmptyClientID is returned from TryWithLock when you specify an empty
30 // clientID. 29 // clientID.
31 var ErrEmptyClientID = errors.New("memlock: empty clientID") 30 var ErrEmptyClientID = errors.New("memlock: empty clientID")
32 31
(...skipping 27 matching lines...) Expand all
60 // this client (lock-holder). If it's empty then TryWithLock() will return 59 // this client (lock-holder). If it's empty then TryWithLock() will return
61 // ErrEmptyClientID. 60 // ErrEmptyClientID.
62 func TryWithLock(ctx context.Context, key, clientID string, f func(check func() bool) error) error { 61 func TryWithLock(ctx context.Context, key, clientID string, f func(check func() bool) error) error {
63 if len(clientID) == 0 { 62 if len(clientID) == 0 {
64 return ErrEmptyClientID 63 return ErrEmptyClientID
65 } 64 }
66 65
67 ctx = logging.SetField(ctx, "key", key) 66 ctx = logging.SetField(ctx, "key", key)
68 ctx = logging.SetField(ctx, "clientID", clientID) 67 ctx = logging.SetField(ctx, "clientID", clientID)
69 log := logging.Get(ctx) 68 log := logging.Get(ctx)
70 » mc := wrapper.GetMC(ctx) 69 » mc := gae.GetMC(ctx)
71 70
72 key = memlockKeyPrefix + key 71 key = memlockKeyPrefix + key
73 cid := []byte(clientID) 72 cid := []byte(clientID)
74 73
75 // checkAnd gets the current value from memcache, and then attempts to d o the 74 // checkAnd gets the current value from memcache, and then attempts to d o the
76 // checkOp (which can either be `refresh` or `release`). These pieces of 75 // checkOp (which can either be `refresh` or `release`). These pieces of
77 // functionality are necessarially intertwined, because CAS only works w ith 76 // functionality are necessarially intertwined, because CAS only works w ith
78 // the exact-same *Item which was returned from a Get. 77 // the exact-same *Item which was returned from a Get.
79 // 78 //
80 // refresh will attempt to CAS the item with the same content to reset i t's 79 // refresh will attempt to CAS the item with the same content to reset i t's
81 // timeout. 80 // timeout.
82 // 81 //
83 // release will attempt to CAS the item to remove it's contents (clientI D). 82 // release will attempt to CAS the item to remove it's contents (clientI D).
84 // another lock observing an empty clientID will know that the lock is 83 // another lock observing an empty clientID will know that the lock is
85 // obtainable. 84 // obtainable.
86 checkAnd := func(op checkOp) bool { 85 checkAnd := func(op checkOp) bool {
87 itm, err := mc.Get(key) 86 itm, err := mc.Get(key)
88 if err != nil { 87 if err != nil {
89 log.Warningf("error getting: %s", err) 88 log.Warningf("error getting: %s", err)
90 return false 89 return false
91 } 90 }
92 91
93 » » if len(itm.Value) > 0 && !bytes.Equal(itm.Value, cid) { 92 » » if len(itm.Value()) > 0 && !bytes.Equal(itm.Value(), cid) {
94 » » » log.Infof("lock owned by %q", string(itm.Value)) 93 » » » log.Infof("lock owned by %q", string(itm.Value()))
95 return false 94 return false
96 } 95 }
97 96
98 if op == refresh { 97 if op == refresh {
99 » » » itm.Value = cid 98 » » » itm.SetValue(cid).SetExpiration(memcacheLockTime)
100 » » » itm.Expiration = memcacheLockTime
101 } else { 99 } else {
102 » » » if len(itm.Value) == 0 { 100 » » » if len(itm.Value()) == 0 {
103 // it's already unlocked, no need to CAS 101 // it's already unlocked, no need to CAS
104 log.Infof("lock already released") 102 log.Infof("lock already released")
105 return true 103 return true
106 } 104 }
107 » » » itm.Value = []byte{} 105 » » » itm.SetValue([]byte{}).SetExpiration(delay)
108 » » » itm.Expiration = delay
109 } 106 }
110 107
111 err = mc.CompareAndSwap(itm) 108 err = mc.CompareAndSwap(itm)
112 if err != nil { 109 if err != nil {
113 log.Warningf("failed to %s lock: %q", op, err) 110 log.Warningf("failed to %s lock: %q", op, err)
114 return false 111 return false
115 } 112 }
116 113
117 return true 114 return true
118 } 115 }
119 116
120 // Now the actual logic begins. First we 'Add' the item, which will set it if 117 // Now the actual logic begins. First we 'Add' the item, which will set it if
121 // it's not present in the memcache, otherwise leaves it alone. 118 // it's not present in the memcache, otherwise leaves it alone.
122 » err := mc.Add(&memcache.Item{ 119
123 » » Key: key, Value: cid, Expiration: memcacheLockTime}) 120 » err := mc.Add(mc.NewItem(key).SetValue(cid).SetExpiration(memcacheLockTi me))
124 if err != nil { 121 if err != nil {
125 » » if err != memcache.ErrNotStored { 122 » » if err != gae.ErrMCNotStored {
126 log.Warningf("error adding: %s", err) 123 log.Warningf("error adding: %s", err)
127 } 124 }
128 if !checkAnd(refresh) { 125 if !checkAnd(refresh) {
129 return ErrFailedToLock 126 return ErrFailedToLock
130 } 127 }
131 } 128 }
132 129
133 // At this point we nominally have the lock (at least for memcacheLockTi me). 130 // At this point we nominally have the lock (at least for memcacheLockTi me).
134 131
135 stopChan := make(chan struct{}) 132 stopChan := make(chan struct{})
(...skipping 24 matching lines...) Expand all
160 return 157 return
161 } 158 }
162 } 159 }
163 160
164 checkAnd(release) 161 checkAnd(release)
165 atomic.StoreUint32(&held, 0) 162 atomic.StoreUint32(&held, 0)
166 }() 163 }()
167 164
168 return f(func() bool { return atomic.LoadUint32(&held) == 1 }) 165 return f(func() bool { return atomic.LoadUint32(&held) == 1 })
169 } 166 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698