OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package datastore | 5 package datastore |
6 | 6 |
7 import ( | 7 import ( |
8 "time" | 8 "time" |
9 | 9 |
10 "github.com/luci/luci-go/appengine/datastorecache" | 10 "github.com/luci/luci-go/appengine/datastorecache" |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
136 // Always ask for full content. | 136 // Always ask for full content. |
137 key.Content = true | 137 key.Content = true |
138 } | 138 } |
139 | 139 |
140 // Encode our caching key, and use this for our datastore cache key. | 140 // Encode our caching key, and use this for our datastore cache key. |
141 // | 141 // |
142 // This gets recoded in dsCacheHandler's "Refresh" to identify the cache | 142 // This gets recoded in dsCacheHandler's "Refresh" to identify the cache |
143 // operation that is being performed. | 143 // operation that is being performed. |
144 encKey, err := caching.Encode(&key) | 144 encKey, err := caching.Encode(&key) |
145 if err != nil { | 145 if err != nil { |
146 » » return nil, errors.Annotate(err).Reason("failed to encode cache
key").Err() | 146 » » return nil, errors.Annotate(err, "failed to encode cache key").E
rr() |
147 } | 147 } |
148 | 148 |
149 // Construct a cache handler. | 149 // Construct a cache handler. |
150 v, err := cache.Get(dc.WithHandler(c, l, 0), encKey) | 150 v, err := cache.Get(dc.WithHandler(c, l, 0), encKey) |
151 if err != nil { | 151 if err != nil { |
152 return nil, err | 152 return nil, err |
153 } | 153 } |
154 | 154 |
155 // Decode our response. | 155 // Decode our response. |
156 if v.Schema != dsCacheSchema { | 156 if v.Schema != dsCacheSchema { |
157 » » return nil, errors.Reason("response schema (%(resp)q) doesn't ma
tch current (%(cur)q)"). | 157 » » return nil, errors.Reason("response schema (%q) doesn't match cu
rrent (%q)", |
158 » » » D("resp", v.Schema).D("cur", dsCacheSchema).Err() | 158 » » » v.Schema, dsCacheSchema).Err() |
159 } | 159 } |
160 | 160 |
161 cacheValue, err := caching.DecodeValue(v.Data) | 161 cacheValue, err := caching.DecodeValue(v.Data) |
162 if err != nil { | 162 if err != nil { |
163 » » return nil, errors.Annotate(err).Reason("failed to decode cached
value").Err() | 163 » » return nil, errors.Annotate(err, "failed to decode cached value"
).Err() |
164 } | 164 } |
165 | 165 |
166 // Prune any responses that are not permitted for the supplied Authority
. | 166 // Prune any responses that are not permitted for the supplied Authority
. |
167 switch key.Op { | 167 switch key.Op { |
168 case caching.OpGetAll: | 168 case caching.OpGetAll: |
169 if len(cacheValue.Items) > 0 { | 169 if len(cacheValue.Items) > 0 { |
170 // Shift over any elements that can't be accessed. | 170 // Shift over any elements that can't be accessed. |
171 ptr := 0 | 171 ptr := 0 |
172 for _, itm := range cacheValue.Items { | 172 for _, itm := range cacheValue.Items { |
173 if dc.accessConfigSet(c, origAuthority, itm.Conf
igSet) { | 173 if dc.accessConfigSet(c, origAuthority, itm.Conf
igSet) { |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
243 // operation. This is used for cron operations. | 243 // operation. This is used for cron operations. |
244 loaderTimeout time.Duration | 244 loaderTimeout time.Duration |
245 } | 245 } |
246 | 246 |
247 func (dch *dsCacheHandler) FailOpen() bool { return dch.fa
ilOpen } | 247 func (dch *dsCacheHandler) FailOpen() bool { return dch.fa
ilOpen } |
248 func (dch *dsCacheHandler) RefreshInterval([]byte) time.Duration { return dch.re
freshInterval } | 248 func (dch *dsCacheHandler) RefreshInterval([]byte) time.Duration { return dch.re
freshInterval } |
249 func (dch *dsCacheHandler) Refresh(c context.Context, key []byte, v datastorecac
he.Value) (datastorecache.Value, error) { | 249 func (dch *dsCacheHandler) Refresh(c context.Context, key []byte, v datastorecac
he.Value) (datastorecache.Value, error) { |
250 // Decode the key into our caching key. | 250 // Decode the key into our caching key. |
251 var ck caching.Key | 251 var ck caching.Key |
252 if err := caching.Decode(key, &ck); err != nil { | 252 if err := caching.Decode(key, &ck); err != nil { |
253 » » return v, errors.Annotate(err).Reason("failed to decode cache ke
y").Err() | 253 » » return v, errors.Annotate(err, "failed to decode cache key").Err
() |
254 } | 254 } |
255 | 255 |
256 var cv *caching.Value | 256 var cv *caching.Value |
257 if v.Schema == dsCacheSchema && len(v.Data) > 0 { | 257 if v.Schema == dsCacheSchema && len(v.Data) > 0 { |
258 // We have a currently-cached value, so decode it into "cv". | 258 // We have a currently-cached value, so decode it into "cv". |
259 var err error | 259 var err error |
260 if cv, err = caching.DecodeValue(v.Data); err != nil { | 260 if cv, err = caching.DecodeValue(v.Data); err != nil { |
261 » » » return v, errors.Annotate(err).Reason("failed to decode
cache value").Err() | 261 » » » return v, errors.Annotate(err, "failed to decode cache v
alue").Err() |
262 } | 262 } |
263 } | 263 } |
264 | 264 |
265 // Apply our timeout, if configured (influences urlfetch). | 265 // Apply our timeout, if configured (influences urlfetch). |
266 if dch.loaderTimeout > 0 { | 266 if dch.loaderTimeout > 0 { |
267 var cancelFunc context.CancelFunc | 267 var cancelFunc context.CancelFunc |
268 c, cancelFunc = clock.WithTimeout(c, dch.loaderTimeout) | 268 c, cancelFunc = clock.WithTimeout(c, dch.loaderTimeout) |
269 defer cancelFunc() | 269 defer cancelFunc() |
270 } | 270 } |
271 | 271 |
272 // Perform a cache load on this value. | 272 // Perform a cache load on this value. |
273 cv, err := dch.loader(c, ck, cv) | 273 cv, err := dch.loader(c, ck, cv) |
274 if err != nil { | 274 if err != nil { |
275 » » return v, errors.Annotate(err).Reason("failed to load cache valu
e").Err() | 275 » » return v, errors.Annotate(err, "failed to load cache value").Err
() |
276 } | 276 } |
277 | 277 |
278 keyDesc := ck.String() | 278 keyDesc := ck.String() |
279 valueDesc := cv.Description() | 279 valueDesc := cv.Description() |
280 log.Infof(c, "Loaded [%s]: %s", keyDesc, valueDesc) | 280 log.Infof(c, "Loaded [%s]: %s", keyDesc, valueDesc) |
281 | 281 |
282 // Encode the resulting cache value. | 282 // Encode the resulting cache value. |
283 if v.Data, err = cv.Encode(); err != nil { | 283 if v.Data, err = cv.Encode(); err != nil { |
284 » » return v, errors.Annotate(err).Reason("failed to encode cache va
lue").Err() | 284 » » return v, errors.Annotate(err, "failed to encode cache value").E
rr() |
285 } | 285 } |
286 v.Schema = dsCacheSchema | 286 v.Schema = dsCacheSchema |
287 v.Description = keyDesc + ": " + valueDesc | 287 v.Description = keyDesc + ": " + valueDesc |
288 return v, nil | 288 return v, nil |
289 } | 289 } |
290 | 290 |
291 func (dch *dsCacheHandler) Locker(c context.Context) datastorecache.Locker { | 291 func (dch *dsCacheHandler) Locker(c context.Context) datastorecache.Locker { |
292 if dch.lockerFunc != nil { | 292 if dch.lockerFunc != nil { |
293 return dch.lockerFunc(c) | 293 return dch.lockerFunc(c) |
294 } | 294 } |
295 return nil | 295 return nil |
296 } | 296 } |
297 | 297 |
298 // CronLoader returns a caching.Loader implementation to be used | 298 // CronLoader returns a caching.Loader implementation to be used |
299 // by the Cron task. | 299 // by the Cron task. |
300 func CronLoader(b backend.B) caching.Loader { | 300 func CronLoader(b backend.B) caching.Loader { |
301 return func(c context.Context, k caching.Key, v *caching.Value) (*cachin
g.Value, error) { | 301 return func(c context.Context, k caching.Key, v *caching.Value) (*cachin
g.Value, error) { |
302 return caching.CacheLoad(c, b, k, v) | 302 return caching.CacheLoad(c, b, k, v) |
303 } | 303 } |
304 } | 304 } |
OLD | NEW |