OLD | NEW |
1 // Copyright 2017 The LUCI Authors. All rights reserved. | 1 // Copyright 2017 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package policy | 5 package policy |
6 | 6 |
7 import ( | 7 import ( |
8 "crypto/sha256" | 8 "crypto/sha256" |
9 "encoding/hex" | 9 "encoding/hex" |
10 "sync" | 10 "sync" |
(...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
116 c = logging.SetField(c, "policy", p.Name) | 116 c = logging.SetField(c, "policy", p.Name) |
117 | 117 |
118 // Fetch and parse text protos stored in LUCI config. The fetcher will a
lso | 118 // Fetch and parse text protos stored in LUCI config. The fetcher will a
lso |
119 // record the revision of the fetched files. | 119 // record the revision of the fetched files. |
120 fetcher := luciConfigFetcher{} | 120 fetcher := luciConfigFetcher{} |
121 bundle, err := p.Fetch(c, &fetcher) | 121 bundle, err := p.Fetch(c, &fetcher) |
122 if err == nil && len(bundle) == 0 { | 122 if err == nil && len(bundle) == 0 { |
123 err = errors.New("no configs fetched by the callback") | 123 err = errors.New("no configs fetched by the callback") |
124 } | 124 } |
125 if err != nil { | 125 if err != nil { |
126 » » return "", errors.Annotate(err).Reason("failed to fetch policy c
onfigs").Err() | 126 » » return "", errors.Annotate(err, "failed to fetch policy configs"
).Err() |
127 } | 127 } |
128 rev = fetcher.Revision() | 128 rev = fetcher.Revision() |
129 | 129 |
130 // Convert configs into a form appropriate for the datastore. We'll skip
the | 130 // Convert configs into a form appropriate for the datastore. We'll skip
the |
131 // rest of the import if this exact blob is already in the datastore (ba
sed on | 131 // rest of the import if this exact blob is already in the datastore (ba
sed on |
132 // SHA256 digest). | 132 // SHA256 digest). |
133 cfgBlob, err := serializeBundle(bundle) | 133 cfgBlob, err := serializeBundle(bundle) |
134 if err != nil { | 134 if err != nil { |
135 » » return "", errors.Annotate(err).Reason("failed to serialize the
configs").Err() | 135 » » return "", errors.Annotate(err, "failed to serialize the configs
").Err() |
136 } | 136 } |
137 digest := sha256.Sum256(cfgBlob) | 137 digest := sha256.Sum256(cfgBlob) |
138 digestHex := hex.EncodeToString(digest[:]) | 138 digestHex := hex.EncodeToString(digest[:]) |
139 logging.Infof(c, "Got %d bytes of configs (SHA256 %s)", len(cfgBlob), di
gestHex) | 139 logging.Infof(c, "Got %d bytes of configs (SHA256 %s)", len(cfgBlob), di
gestHex) |
140 | 140 |
141 // Do we have it already? | 141 // Do we have it already? |
142 existingHdr, err := getImportedPolicyHeader(c, p.Name) | 142 existingHdr, err := getImportedPolicyHeader(c, p.Name) |
143 if err != nil { | 143 if err != nil { |
144 » » return "", errors.Annotate(err).Reason("failed to grab ImportedP
olicyHeader").Err() | 144 » » return "", errors.Annotate(err, "failed to grab ImportedPolicyHe
ader").Err() |
145 } | 145 } |
146 if existingHdr != nil && digestHex == existingHdr.SHA256 { | 146 if existingHdr != nil && digestHex == existingHdr.SHA256 { |
147 logging.Infof( | 147 logging.Infof( |
148 c, "Configs are up-to-date. Last changed at rev %s, last
checked rev is %s.", | 148 c, "Configs are up-to-date. Last changed at rev %s, last
checked rev is %s.", |
149 existingHdr.Revision, rev) | 149 existingHdr.Revision, rev) |
150 return existingHdr.Revision, nil | 150 return existingHdr.Revision, nil |
151 } | 151 } |
152 | 152 |
153 existingRev := "(nil)" | 153 existingRev := "(nil)" |
154 if existingHdr != nil { | 154 if existingHdr != nil { |
155 existingRev = existingHdr.Revision | 155 existingRev = existingHdr.Revision |
156 } | 156 } |
157 logging.Infof(c, "Policy config changed: %s -> %s", existingRev, rev) | 157 logging.Infof(c, "Policy config changed: %s -> %s", existingRev, rev) |
158 | 158 |
159 if p.Validate != nil { | 159 if p.Validate != nil { |
160 v := validation.Context{Logger: logging.Get(c)} | 160 v := validation.Context{Logger: logging.Get(c)} |
161 p.Validate(bundle, &v) | 161 p.Validate(bundle, &v) |
162 if err := v.Finalize(); err != nil { | 162 if err := v.Finalize(); err != nil { |
163 » » » return "", errors.Annotate(err). | 163 » » » return "", errors.Annotate(err, "configs at rev %s are i
nvalid", rev).Err() |
164 » » » » Reason("configs at rev %(rev)s are invalid"). | |
165 » » » » D("rev", rev).Err() | |
166 } | 164 } |
167 } | 165 } |
168 | 166 |
169 // Double check that they actually can be parsed into a queryable form.
If | 167 // Double check that they actually can be parsed into a queryable form.
If |
170 // not, the Policy callbacks are buggy. | 168 // not, the Policy callbacks are buggy. |
171 queriable, err := p.Prepare(bundle, rev) | 169 queriable, err := p.Prepare(bundle, rev) |
172 if err == nil && queriable.ConfigRevision() != rev { | 170 if err == nil && queriable.ConfigRevision() != rev { |
173 err = errors.New("wrong revision in result of Prepare callback") | 171 err = errors.New("wrong revision in result of Prepare callback") |
174 } | 172 } |
175 if err != nil { | 173 if err != nil { |
176 » » return "", errors.Annotate(err).Reason("failed to convert config
s into a queryable form").Err() | 174 » » return "", errors.Annotate(err, "failed to convert configs into
a queryable form").Err() |
177 } | 175 } |
178 | 176 |
179 logging.Infof(c, "Storing new configs") | 177 logging.Infof(c, "Storing new configs") |
180 if err := updateImportedPolicy(c, p.Name, rev, digestHex, cfgBlob); err
!= nil { | 178 if err := updateImportedPolicy(c, p.Name, rev, digestHex, cfgBlob); err
!= nil { |
181 return "", err | 179 return "", err |
182 } | 180 } |
183 | 181 |
184 return rev, nil | 182 return rev, nil |
185 } | 183 } |
186 | 184 |
(...skipping 16 matching lines...) Expand all Loading... |
203 } | 201 } |
204 | 202 |
205 // grabQueryable is called whenever cached Queryable in p.cache expires. | 203 // grabQueryable is called whenever cached Queryable in p.cache expires. |
206 func (p *Policy) grabQueryable(c context.Context, prev lazyslot.Value) (val lazy
slot.Value, err error) { | 204 func (p *Policy) grabQueryable(c context.Context, prev lazyslot.Value) (val lazy
slot.Value, err error) { |
207 c = logging.SetField(c, "policy", p.Name) | 205 c = logging.SetField(c, "policy", p.Name) |
208 | 206 |
209 logging.Infof(c, "Checking version of the policy in the datastore") | 207 logging.Infof(c, "Checking version of the policy in the datastore") |
210 hdr, err := getImportedPolicyHeader(c, p.Name) | 208 hdr, err := getImportedPolicyHeader(c, p.Name) |
211 switch { | 209 switch { |
212 case err != nil: | 210 case err != nil: |
213 » » err = errors.Annotate(err).Reason("failed to fetch importedPolic
yHeader entity").Err() | 211 » » err = errors.Annotate(err, "failed to fetch importedPolicyHeader
entity").Err() |
214 return | 212 return |
215 case hdr == nil: | 213 case hdr == nil: |
216 err = ErrNoPolicy | 214 err = ErrNoPolicy |
217 return | 215 return |
218 } | 216 } |
219 | 217 |
220 // Reuse existing Queryable object if configs didn't change. | 218 // Reuse existing Queryable object if configs didn't change. |
221 prevQ, _ := prev.Value.(Queryable) | 219 prevQ, _ := prev.Value.(Queryable) |
222 if prevQ != nil && prevQ.ConfigRevision() == hdr.Revision { | 220 if prevQ != nil && prevQ.ConfigRevision() == hdr.Revision { |
223 return lazyslot.Value{ | 221 return lazyslot.Value{ |
224 Value: prevQ, | 222 Value: prevQ, |
225 Expiration: nextCheckTime(c), | 223 Expiration: nextCheckTime(c), |
226 }, nil | 224 }, nil |
227 } | 225 } |
228 | 226 |
229 // Fetch new configs. | 227 // Fetch new configs. |
230 logging.Infof(c, "Fetching policy configs from the datastore") | 228 logging.Infof(c, "Fetching policy configs from the datastore") |
231 body, err := getImportedPolicyBody(c, p.Name) | 229 body, err := getImportedPolicyBody(c, p.Name) |
232 switch { | 230 switch { |
233 case err != nil: | 231 case err != nil: |
234 » » err = errors.Annotate(err).Reason("failed to fetch importedPolic
yBody entity").Err() | 232 » » err = errors.Annotate(err, "failed to fetch importedPolicyBody e
ntity").Err() |
235 return | 233 return |
236 case body == nil: // this is rare, the body shouldn't disappear | 234 case body == nil: // this is rare, the body shouldn't disappear |
237 logging.Errorf(c, "The policy body is unexpectedly gone") | 235 logging.Errorf(c, "The policy body is unexpectedly gone") |
238 err = ErrNoPolicy | 236 err = ErrNoPolicy |
239 return | 237 return |
240 } | 238 } |
241 | 239 |
242 // An error here and below can happen if previously validated config is
no | 240 // An error here and below can happen if previously validated config is
no |
243 // longer valid (e.g. if the service code is updated and new code doesn'
t like | 241 // longer valid (e.g. if the service code is updated and new code doesn'
t like |
244 // the stored config anymore). | 242 // the stored config anymore). |
245 // | 243 // |
246 // If this check fails, the service is effectively offline until configs
are | 244 // If this check fails, the service is effectively offline until configs
are |
247 // updated. Presumably, it is better than silently using no longer valid | 245 // updated. Presumably, it is better than silently using no longer valid |
248 // config. | 246 // config. |
249 logging.Infof(c, "Using configs at rev %s", body.Revision) | 247 logging.Infof(c, "Using configs at rev %s", body.Revision) |
250 configs, unknown, err := deserializeBundle(body.Data) | 248 configs, unknown, err := deserializeBundle(body.Data) |
251 if err != nil { | 249 if err != nil { |
252 » » err = errors.Annotate(err).Reason("failed to deserialize cached
configs").Err() | 250 » » err = errors.Annotate(err, "failed to deserialize cached configs
").Err() |
253 return | 251 return |
254 } | 252 } |
255 if len(unknown) != 0 { | 253 if len(unknown) != 0 { |
256 for _, cfg := range unknown { | 254 for _, cfg := range unknown { |
257 logging.Errorf(c, "Unknown proto type %q in cached confi
g %q", cfg.Kind, cfg.Path) | 255 logging.Errorf(c, "Unknown proto type %q in cached confi
g %q", cfg.Kind, cfg.Path) |
258 } | 256 } |
259 err = errors.New("failed to deserialize some cached configs") | 257 err = errors.New("failed to deserialize some cached configs") |
260 return | 258 return |
261 } | 259 } |
262 queryable, err := p.Prepare(configs, body.Revision) | 260 queryable, err := p.Prepare(configs, body.Revision) |
263 if err != nil { | 261 if err != nil { |
264 » » err = errors.Annotate(err).Reason("failed to process cached conf
igs").Err() | 262 » » err = errors.Annotate(err, "failed to process cached configs").E
rr() |
265 return | 263 return |
266 } | 264 } |
267 | 265 |
268 return lazyslot.Value{ | 266 return lazyslot.Value{ |
269 Value: queryable, | 267 Value: queryable, |
270 Expiration: nextCheckTime(c), | 268 Expiration: nextCheckTime(c), |
271 }, nil | 269 }, nil |
272 } | 270 } |
273 | 271 |
274 // nextCheckTime returns a random time from [now+4 min, now+5 min). | 272 // nextCheckTime returns a random time from [now+4 min, now+5 min). |
275 // | 273 // |
276 // It's used to define when to refresh in-memory Queryable cache. We randomize | 274 // It's used to define when to refresh in-memory Queryable cache. We randomize |
277 // it to desynchronize cache updates of different Policy instances. | 275 // it to desynchronize cache updates of different Policy instances. |
278 func nextCheckTime(c context.Context) time.Time { | 276 func nextCheckTime(c context.Context) time.Time { |
279 rnd := time.Duration(mathrand.Int63n(c, int64(time.Minute))) | 277 rnd := time.Duration(mathrand.Int63n(c, int64(time.Minute))) |
280 return clock.Now(c).Add(4*time.Minute + rnd) | 278 return clock.Now(c).Add(4*time.Minute + rnd) |
281 } | 279 } |
OLD | NEW |