| OLD | NEW |
| 1 // Copyright 2017 The LUCI Authors. All rights reserved. | 1 // Copyright 2017 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package policy | 5 package policy |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "crypto/sha256" | 8 "crypto/sha256" |
| 9 "encoding/hex" | 9 "encoding/hex" |
| 10 "sync" | 10 "sync" |
| (...skipping 105 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 116 c = logging.SetField(c, "policy", p.Name) | 116 c = logging.SetField(c, "policy", p.Name) |
| 117 | 117 |
| 118 // Fetch and parse text protos stored in LUCI config. The fetcher will a
lso | 118 // Fetch and parse text protos stored in LUCI config. The fetcher will a
lso |
| 119 // record the revision of the fetched files. | 119 // record the revision of the fetched files. |
| 120 fetcher := luciConfigFetcher{} | 120 fetcher := luciConfigFetcher{} |
| 121 bundle, err := p.Fetch(c, &fetcher) | 121 bundle, err := p.Fetch(c, &fetcher) |
| 122 if err == nil && len(bundle) == 0 { | 122 if err == nil && len(bundle) == 0 { |
| 123 err = errors.New("no configs fetched by the callback") | 123 err = errors.New("no configs fetched by the callback") |
| 124 } | 124 } |
| 125 if err != nil { | 125 if err != nil { |
| 126 » » return "", errors.Annotate(err).Reason("failed to fetch policy c
onfigs").Err() | 126 » » return "", errors.Annotate(err, "failed to fetch policy configs"
).Err() |
| 127 } | 127 } |
| 128 rev = fetcher.Revision() | 128 rev = fetcher.Revision() |
| 129 | 129 |
| 130 // Convert configs into a form appropriate for the datastore. We'll skip
the | 130 // Convert configs into a form appropriate for the datastore. We'll skip
the |
| 131 // rest of the import if this exact blob is already in the datastore (ba
sed on | 131 // rest of the import if this exact blob is already in the datastore (ba
sed on |
| 132 // SHA256 digest). | 132 // SHA256 digest). |
| 133 cfgBlob, err := serializeBundle(bundle) | 133 cfgBlob, err := serializeBundle(bundle) |
| 134 if err != nil { | 134 if err != nil { |
| 135 » » return "", errors.Annotate(err).Reason("failed to serialize the
configs").Err() | 135 » » return "", errors.Annotate(err, "failed to serialize the configs
").Err() |
| 136 } | 136 } |
| 137 digest := sha256.Sum256(cfgBlob) | 137 digest := sha256.Sum256(cfgBlob) |
| 138 digestHex := hex.EncodeToString(digest[:]) | 138 digestHex := hex.EncodeToString(digest[:]) |
| 139 logging.Infof(c, "Got %d bytes of configs (SHA256 %s)", len(cfgBlob), di
gestHex) | 139 logging.Infof(c, "Got %d bytes of configs (SHA256 %s)", len(cfgBlob), di
gestHex) |
| 140 | 140 |
| 141 // Do we have it already? | 141 // Do we have it already? |
| 142 existingHdr, err := getImportedPolicyHeader(c, p.Name) | 142 existingHdr, err := getImportedPolicyHeader(c, p.Name) |
| 143 if err != nil { | 143 if err != nil { |
| 144 » » return "", errors.Annotate(err).Reason("failed to grab ImportedP
olicyHeader").Err() | 144 » » return "", errors.Annotate(err, "failed to grab ImportedPolicyHe
ader").Err() |
| 145 } | 145 } |
| 146 if existingHdr != nil && digestHex == existingHdr.SHA256 { | 146 if existingHdr != nil && digestHex == existingHdr.SHA256 { |
| 147 logging.Infof( | 147 logging.Infof( |
| 148 c, "Configs are up-to-date. Last changed at rev %s, last
checked rev is %s.", | 148 c, "Configs are up-to-date. Last changed at rev %s, last
checked rev is %s.", |
| 149 existingHdr.Revision, rev) | 149 existingHdr.Revision, rev) |
| 150 return existingHdr.Revision, nil | 150 return existingHdr.Revision, nil |
| 151 } | 151 } |
| 152 | 152 |
| 153 existingRev := "(nil)" | 153 existingRev := "(nil)" |
| 154 if existingHdr != nil { | 154 if existingHdr != nil { |
| 155 existingRev = existingHdr.Revision | 155 existingRev = existingHdr.Revision |
| 156 } | 156 } |
| 157 logging.Infof(c, "Policy config changed: %s -> %s", existingRev, rev) | 157 logging.Infof(c, "Policy config changed: %s -> %s", existingRev, rev) |
| 158 | 158 |
| 159 if p.Validate != nil { | 159 if p.Validate != nil { |
| 160 v := validation.Context{Logger: logging.Get(c)} | 160 v := validation.Context{Logger: logging.Get(c)} |
| 161 p.Validate(bundle, &v) | 161 p.Validate(bundle, &v) |
| 162 if err := v.Finalize(); err != nil { | 162 if err := v.Finalize(); err != nil { |
| 163 » » » return "", errors.Annotate(err). | 163 » » » return "", errors.Annotate(err, "configs at rev %s are i
nvalid", rev).Err() |
| 164 » » » » Reason("configs at rev %(rev)s are invalid"). | |
| 165 » » » » D("rev", rev).Err() | |
| 166 } | 164 } |
| 167 } | 165 } |
| 168 | 166 |
| 169 // Double check that they actually can be parsed into a queryable form.
If | 167 // Double check that they actually can be parsed into a queryable form.
If |
| 170 // not, the Policy callbacks are buggy. | 168 // not, the Policy callbacks are buggy. |
| 171 queriable, err := p.Prepare(bundle, rev) | 169 queriable, err := p.Prepare(bundle, rev) |
| 172 if err == nil && queriable.ConfigRevision() != rev { | 170 if err == nil && queriable.ConfigRevision() != rev { |
| 173 err = errors.New("wrong revision in result of Prepare callback") | 171 err = errors.New("wrong revision in result of Prepare callback") |
| 174 } | 172 } |
| 175 if err != nil { | 173 if err != nil { |
| 176 » » return "", errors.Annotate(err).Reason("failed to convert config
s into a queryable form").Err() | 174 » » return "", errors.Annotate(err, "failed to convert configs into
a queryable form").Err() |
| 177 } | 175 } |
| 178 | 176 |
| 179 logging.Infof(c, "Storing new configs") | 177 logging.Infof(c, "Storing new configs") |
| 180 if err := updateImportedPolicy(c, p.Name, rev, digestHex, cfgBlob); err
!= nil { | 178 if err := updateImportedPolicy(c, p.Name, rev, digestHex, cfgBlob); err
!= nil { |
| 181 return "", err | 179 return "", err |
| 182 } | 180 } |
| 183 | 181 |
| 184 return rev, nil | 182 return rev, nil |
| 185 } | 183 } |
| 186 | 184 |
| (...skipping 16 matching lines...) Expand all Loading... |
| 203 } | 201 } |
| 204 | 202 |
| 205 // grabQueryable is called whenever cached Queryable in p.cache expires. | 203 // grabQueryable is called whenever cached Queryable in p.cache expires. |
| 206 func (p *Policy) grabQueryable(c context.Context, prev lazyslot.Value) (val lazy
slot.Value, err error) { | 204 func (p *Policy) grabQueryable(c context.Context, prev lazyslot.Value) (val lazy
slot.Value, err error) { |
| 207 c = logging.SetField(c, "policy", p.Name) | 205 c = logging.SetField(c, "policy", p.Name) |
| 208 | 206 |
| 209 logging.Infof(c, "Checking version of the policy in the datastore") | 207 logging.Infof(c, "Checking version of the policy in the datastore") |
| 210 hdr, err := getImportedPolicyHeader(c, p.Name) | 208 hdr, err := getImportedPolicyHeader(c, p.Name) |
| 211 switch { | 209 switch { |
| 212 case err != nil: | 210 case err != nil: |
| 213 » » err = errors.Annotate(err).Reason("failed to fetch importedPolic
yHeader entity").Err() | 211 » » err = errors.Annotate(err, "failed to fetch importedPolicyHeader
entity").Err() |
| 214 return | 212 return |
| 215 case hdr == nil: | 213 case hdr == nil: |
| 216 err = ErrNoPolicy | 214 err = ErrNoPolicy |
| 217 return | 215 return |
| 218 } | 216 } |
| 219 | 217 |
| 220 // Reuse existing Queryable object if configs didn't change. | 218 // Reuse existing Queryable object if configs didn't change. |
| 221 prevQ, _ := prev.Value.(Queryable) | 219 prevQ, _ := prev.Value.(Queryable) |
| 222 if prevQ != nil && prevQ.ConfigRevision() == hdr.Revision { | 220 if prevQ != nil && prevQ.ConfigRevision() == hdr.Revision { |
| 223 return lazyslot.Value{ | 221 return lazyslot.Value{ |
| 224 Value: prevQ, | 222 Value: prevQ, |
| 225 Expiration: nextCheckTime(c), | 223 Expiration: nextCheckTime(c), |
| 226 }, nil | 224 }, nil |
| 227 } | 225 } |
| 228 | 226 |
| 229 // Fetch new configs. | 227 // Fetch new configs. |
| 230 logging.Infof(c, "Fetching policy configs from the datastore") | 228 logging.Infof(c, "Fetching policy configs from the datastore") |
| 231 body, err := getImportedPolicyBody(c, p.Name) | 229 body, err := getImportedPolicyBody(c, p.Name) |
| 232 switch { | 230 switch { |
| 233 case err != nil: | 231 case err != nil: |
| 234 » » err = errors.Annotate(err).Reason("failed to fetch importedPolic
yBody entity").Err() | 232 » » err = errors.Annotate(err, "failed to fetch importedPolicyBody e
ntity").Err() |
| 235 return | 233 return |
| 236 case body == nil: // this is rare, the body shouldn't disappear | 234 case body == nil: // this is rare, the body shouldn't disappear |
| 237 logging.Errorf(c, "The policy body is unexpectedly gone") | 235 logging.Errorf(c, "The policy body is unexpectedly gone") |
| 238 err = ErrNoPolicy | 236 err = ErrNoPolicy |
| 239 return | 237 return |
| 240 } | 238 } |
| 241 | 239 |
| 242 // An error here and below can happen if previously validated config is
no | 240 // An error here and below can happen if previously validated config is
no |
| 243 // longer valid (e.g. if the service code is updated and new code doesn'
t like | 241 // longer valid (e.g. if the service code is updated and new code doesn'
t like |
| 244 // the stored config anymore). | 242 // the stored config anymore). |
| 245 // | 243 // |
| 246 // If this check fails, the service is effectively offline until configs
are | 244 // If this check fails, the service is effectively offline until configs
are |
| 247 // updated. Presumably, it is better than silently using no longer valid | 245 // updated. Presumably, it is better than silently using no longer valid |
| 248 // config. | 246 // config. |
| 249 logging.Infof(c, "Using configs at rev %s", body.Revision) | 247 logging.Infof(c, "Using configs at rev %s", body.Revision) |
| 250 configs, unknown, err := deserializeBundle(body.Data) | 248 configs, unknown, err := deserializeBundle(body.Data) |
| 251 if err != nil { | 249 if err != nil { |
| 252 » » err = errors.Annotate(err).Reason("failed to deserialize cached
configs").Err() | 250 » » err = errors.Annotate(err, "failed to deserialize cached configs
").Err() |
| 253 return | 251 return |
| 254 } | 252 } |
| 255 if len(unknown) != 0 { | 253 if len(unknown) != 0 { |
| 256 for _, cfg := range unknown { | 254 for _, cfg := range unknown { |
| 257 logging.Errorf(c, "Unknown proto type %q in cached confi
g %q", cfg.Kind, cfg.Path) | 255 logging.Errorf(c, "Unknown proto type %q in cached confi
g %q", cfg.Kind, cfg.Path) |
| 258 } | 256 } |
| 259 err = errors.New("failed to deserialize some cached configs") | 257 err = errors.New("failed to deserialize some cached configs") |
| 260 return | 258 return |
| 261 } | 259 } |
| 262 queryable, err := p.Prepare(configs, body.Revision) | 260 queryable, err := p.Prepare(configs, body.Revision) |
| 263 if err != nil { | 261 if err != nil { |
| 264 » » err = errors.Annotate(err).Reason("failed to process cached conf
igs").Err() | 262 » » err = errors.Annotate(err, "failed to process cached configs").E
rr() |
| 265 return | 263 return |
| 266 } | 264 } |
| 267 | 265 |
| 268 return lazyslot.Value{ | 266 return lazyslot.Value{ |
| 269 Value: queryable, | 267 Value: queryable, |
| 270 Expiration: nextCheckTime(c), | 268 Expiration: nextCheckTime(c), |
| 271 }, nil | 269 }, nil |
| 272 } | 270 } |
| 273 | 271 |
| 274 // nextCheckTime returns a random time from [now+4 min, now+5 min). | 272 // nextCheckTime returns a random time from [now+4 min, now+5 min). |
| 275 // | 273 // |
| 276 // It's used to define when to refresh in-memory Queryable cache. We randomize | 274 // It's used to define when to refresh in-memory Queryable cache. We randomize |
| 277 // it to desynchronize cache updates of different Policy instances. | 275 // it to desynchronize cache updates of different Policy instances. |
| 278 func nextCheckTime(c context.Context) time.Time { | 276 func nextCheckTime(c context.Context) time.Time { |
| 279 rnd := time.Duration(mathrand.Int63n(c, int64(time.Minute))) | 277 rnd := time.Duration(mathrand.Int63n(c, int64(time.Minute))) |
| 280 return clock.Now(c).Add(4*time.Minute + rnd) | 278 return clock.Now(c).Add(4*time.Minute + rnd) |
| 281 } | 279 } |
| OLD | NEW |