| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package authdbimpl | 5 package authdbimpl |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "strings" | 9 "strings" |
| 10 "time" | 10 "time" |
| 11 | 11 |
| 12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
| 13 | 13 |
| 14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 15 | 15 |
| 16 "github.com/luci/luci-go/common/clock" | 16 "github.com/luci/luci-go/common/clock" |
| 17 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
| 18 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/retry" |
| 19 "github.com/luci/luci-go/server/auth/service" | 20 "github.com/luci/luci-go/server/auth/service" |
| 20 "github.com/luci/luci-go/server/auth/service/protocol" | 21 "github.com/luci/luci-go/server/auth/service/protocol" |
| 21 ) | 22 ) |
| 22 | 23 |
| 23 // SnapshotInfo identifies some concrete AuthDB snapshot. | 24 // SnapshotInfo identifies some concrete AuthDB snapshot. |
| 24 // | 25 // |
| 25 // Singleton entity. Serves as a pointer to a blob with corresponding AuthDB | 26 // Singleton entity. Serves as a pointer to a blob with corresponding AuthDB |
| 26 // proto message (stored in separate Snapshot entity). | 27 // proto message (stored in separate Snapshot entity). |
| 27 type SnapshotInfo struct { | 28 type SnapshotInfo struct { |
| 28 AuthServiceURL string `gae:",noindex"` | 29 AuthServiceURL string `gae:",noindex"` |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 67 report := durationReporter(c, latestSnapshotInfoDuration) | 68 report := durationReporter(c, latestSnapshotInfoDuration) |
| 68 logging.Debugf(c, "Fetching AuthDB snapshot info from the datastore") | 69 logging.Debugf(c, "Fetching AuthDB snapshot info from the datastore") |
| 69 c = ds.WithoutTransaction(defaultNS(c)) | 70 c = ds.WithoutTransaction(defaultNS(c)) |
| 70 info := SnapshotInfo{} | 71 info := SnapshotInfo{} |
| 71 switch err := ds.Get(c, &info); { | 72 switch err := ds.Get(c, &info); { |
| 72 case err == ds.ErrNoSuchEntity: | 73 case err == ds.ErrNoSuchEntity: |
| 73 report("SUCCESS") | 74 report("SUCCESS") |
| 74 return nil, nil | 75 return nil, nil |
| 75 case err != nil: | 76 case err != nil: |
| 76 report("ERROR_TRANSIENT") | 77 report("ERROR_TRANSIENT") |
| 77 » » return nil, errors.WrapTransient(err) | 78 » » return nil, retry.Tag.Apply(err) |
| 78 default: | 79 default: |
| 79 report("SUCCESS") | 80 report("SUCCESS") |
| 80 return &info, nil | 81 return &info, nil |
| 81 } | 82 } |
| 82 } | 83 } |
| 83 | 84 |
| 84 // deleteSnapshotInfo removes SnapshotInfo entity from the datastore. | 85 // deleteSnapshotInfo removes SnapshotInfo entity from the datastore. |
| 85 // | 86 // |
| 86 // Used to detach the service from auth_service. | 87 // Used to detach the service from auth_service. |
| 87 func deleteSnapshotInfo(c context.Context) error { | 88 func deleteSnapshotInfo(c context.Context) error { |
| 88 c = ds.WithoutTransaction(c) | 89 c = ds.WithoutTransaction(c) |
| 89 return ds.Delete(c, ds.KeyForObj(c, &SnapshotInfo{})) | 90 return ds.Delete(c, ds.KeyForObj(c, &SnapshotInfo{})) |
| 90 } | 91 } |
| 91 | 92 |
| 92 // GetAuthDBSnapshot fetches, inflates and deserializes AuthDB snapshot. | 93 // GetAuthDBSnapshot fetches, inflates and deserializes AuthDB snapshot. |
| 93 func GetAuthDBSnapshot(c context.Context, id string) (*protocol.AuthDB, error) { | 94 func GetAuthDBSnapshot(c context.Context, id string) (*protocol.AuthDB, error) { |
| 94 report := durationReporter(c, getSnapshotDuration) | 95 report := durationReporter(c, getSnapshotDuration) |
| 95 logging.Debugf(c, "Fetching AuthDB snapshot from the datastore") | 96 logging.Debugf(c, "Fetching AuthDB snapshot from the datastore") |
| 96 defer logging.Debugf(c, "AuthDB snapshot fetched") | 97 defer logging.Debugf(c, "AuthDB snapshot fetched") |
| 97 | 98 |
| 98 c = ds.WithoutTransaction(defaultNS(c)) | 99 c = ds.WithoutTransaction(defaultNS(c)) |
| 99 snap := Snapshot{ID: id} | 100 snap := Snapshot{ID: id} |
| 100 switch err := ds.Get(c, &snap); { | 101 switch err := ds.Get(c, &snap); { |
| 101 case err == ds.ErrNoSuchEntity: | 102 case err == ds.ErrNoSuchEntity: |
| 102 report("ERROR_NO_SNAPSHOT") | 103 report("ERROR_NO_SNAPSHOT") |
| 103 return nil, err // not transient | 104 return nil, err // not transient |
| 104 case err != nil: | 105 case err != nil: |
| 105 report("ERROR_TRANSIENT") | 106 report("ERROR_TRANSIENT") |
| 106 » » return nil, errors.WrapTransient(err) | 107 » » return nil, retry.Tag.Apply(err) |
| 107 } | 108 } |
| 108 | 109 |
| 109 db, err := service.InflateAuthDB(snap.AuthDBDeflated) | 110 db, err := service.InflateAuthDB(snap.AuthDBDeflated) |
| 110 if err != nil { | 111 if err != nil { |
| 111 report("ERROR_INFLATION") | 112 report("ERROR_INFLATION") |
| 112 return nil, err | 113 return nil, err |
| 113 } | 114 } |
| 114 | 115 |
| 115 report("SUCCESS") | 116 report("SUCCESS") |
| 116 return db, nil | 117 return db, nil |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 166 // Configure PubSub subscription to receive future updates. | 167 // Configure PubSub subscription to receive future updates. |
| 167 if err := setupPubSub(c, baseURL, authServiceURL); err != nil { | 168 if err := setupPubSub(c, baseURL, authServiceURL); err != nil { |
| 168 logging.Errorf(c, "Failed to configure pubsub subscription - %s"
, err) | 169 logging.Errorf(c, "Failed to configure pubsub subscription - %s"
, err) |
| 169 return err | 170 return err |
| 170 } | 171 } |
| 171 | 172 |
| 172 // All is configured. Switch SnapshotInfo entity to point to new snapsho
t. | 173 // All is configured. Switch SnapshotInfo entity to point to new snapsho
t. |
| 173 // It makes syncAuthDB fetch changes from `authServiceURL`, thus promoti
ng | 174 // It makes syncAuthDB fetch changes from `authServiceURL`, thus promoti
ng |
| 174 // `authServiceURL` to the status of main auth service. | 175 // `authServiceURL` to the status of main auth service. |
| 175 if err := ds.Put(ds.WithoutTransaction(c), info); err != nil { | 176 if err := ds.Put(ds.WithoutTransaction(c), info); err != nil { |
| 176 » » return errors.WrapTransient(err) | 177 » » return retry.Tag.Apply(err) |
| 177 } | 178 } |
| 178 | 179 |
| 179 // Stop getting notifications from previously used auth service. | 180 // Stop getting notifications from previously used auth service. |
| 180 if prevAuthServiceURL != "" && prevAuthServiceURL != authServiceURL { | 181 if prevAuthServiceURL != "" && prevAuthServiceURL != authServiceURL { |
| 181 return killPubSub(c, prevAuthServiceURL) | 182 return killPubSub(c, prevAuthServiceURL) |
| 182 } | 183 } |
| 183 | 184 |
| 184 return nil | 185 return nil |
| 185 } | 186 } |
| 186 | 187 |
| (...skipping 12 matching lines...) Expand all Loading... |
| 199 if err != nil { | 200 if err != nil { |
| 200 return err | 201 return err |
| 201 } | 202 } |
| 202 ent := Snapshot{ | 203 ent := Snapshot{ |
| 203 ID: info.GetSnapshotID(), | 204 ID: info.GetSnapshotID(), |
| 204 AuthDBDeflated: blob, | 205 AuthDBDeflated: blob, |
| 205 CreatedAt: snap.Created.UTC(), | 206 CreatedAt: snap.Created.UTC(), |
| 206 FetchedAt: clock.Now(c).UTC(), | 207 FetchedAt: clock.Now(c).UTC(), |
| 207 } | 208 } |
| 208 logging.Infof(c, "Lag: %s", ent.FetchedAt.Sub(ent.CreatedAt)) | 209 logging.Infof(c, "Lag: %s", ent.FetchedAt.Sub(ent.CreatedAt)) |
| 209 » return errors.WrapTransient(ds.Put(ds.WithoutTransaction(c), &ent)) | 210 » return retry.Tag.Apply(ds.Put(ds.WithoutTransaction(c), &ent)) |
| 210 } | 211 } |
| 211 | 212 |
| 212 // syncAuthDB fetches latest AuthDB snapshot from the configured auth service, | 213 // syncAuthDB fetches latest AuthDB snapshot from the configured auth service, |
| 213 // puts it into the datastore and updates SnapshotInfo entity to point to it. | 214 // puts it into the datastore and updates SnapshotInfo entity to point to it. |
| 214 // | 215 // |
| 215 // Expects authenticating transport to be in the context. Called when receiving | 216 // Expects authenticating transport to be in the context. Called when receiving |
| 216 // PubSub notifications. | 217 // PubSub notifications. |
| 217 // | 218 // |
| 218 // Returns SnapshotInfo of the most recent snapshot. | 219 // Returns SnapshotInfo of the most recent snapshot. |
| 219 func syncAuthDB(c context.Context) (*SnapshotInfo, error) { | 220 func syncAuthDB(c context.Context) (*SnapshotInfo, error) { |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 281 case latest.Rev >= info.Rev: | 282 case latest.Rev >= info.Rev: |
| 282 logging.Warningf(c, "Already have rev %d", info.Rev) | 283 logging.Warningf(c, "Already have rev %d", info.Rev) |
| 283 return nil | 284 return nil |
| 284 } | 285 } |
| 285 latest = info | 286 latest = info |
| 286 return ds.Put(c, info) | 287 return ds.Put(c, info) |
| 287 }, nil) | 288 }, nil) |
| 288 | 289 |
| 289 if err != nil { | 290 if err != nil { |
| 290 report("ERROR_COMMITTING") | 291 report("ERROR_COMMITTING") |
| 291 » » return nil, errors.WrapTransient(err) | 292 » » return nil, retry.Tag.Apply(err) |
| 292 } | 293 } |
| 293 | 294 |
| 294 report("SUCCESS_UPDATED") | 295 report("SUCCESS_UPDATED") |
| 295 return latest, nil | 296 return latest, nil |
| 296 } | 297 } |
| OLD | NEW |