OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package authdbimpl | 5 package authdbimpl |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "strings" | 9 "strings" |
10 "time" | 10 "time" |
11 | 11 |
12 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
13 | 13 |
14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
15 | 15 |
16 "github.com/luci/luci-go/common/clock" | 16 "github.com/luci/luci-go/common/clock" |
17 "github.com/luci/luci-go/common/errors" | 17 "github.com/luci/luci-go/common/errors" |
18 "github.com/luci/luci-go/common/logging" | 18 "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/retry/transient" |
19 "github.com/luci/luci-go/server/auth/service" | 20 "github.com/luci/luci-go/server/auth/service" |
20 "github.com/luci/luci-go/server/auth/service/protocol" | 21 "github.com/luci/luci-go/server/auth/service/protocol" |
21 ) | 22 ) |
22 | 23 |
23 // SnapshotInfo identifies some concrete AuthDB snapshot. | 24 // SnapshotInfo identifies some concrete AuthDB snapshot. |
24 // | 25 // |
25 // Singleton entity. Serves as a pointer to a blob with corresponding AuthDB | 26 // Singleton entity. Serves as a pointer to a blob with corresponding AuthDB |
26 // proto message (stored in separate Snapshot entity). | 27 // proto message (stored in separate Snapshot entity). |
27 type SnapshotInfo struct { | 28 type SnapshotInfo struct { |
28 AuthServiceURL string `gae:",noindex"` | 29 AuthServiceURL string `gae:",noindex"` |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
67 report := durationReporter(c, latestSnapshotInfoDuration) | 68 report := durationReporter(c, latestSnapshotInfoDuration) |
68 logging.Debugf(c, "Fetching AuthDB snapshot info from the datastore") | 69 logging.Debugf(c, "Fetching AuthDB snapshot info from the datastore") |
69 c = ds.WithoutTransaction(defaultNS(c)) | 70 c = ds.WithoutTransaction(defaultNS(c)) |
70 info := SnapshotInfo{} | 71 info := SnapshotInfo{} |
71 switch err := ds.Get(c, &info); { | 72 switch err := ds.Get(c, &info); { |
72 case err == ds.ErrNoSuchEntity: | 73 case err == ds.ErrNoSuchEntity: |
73 report("SUCCESS") | 74 report("SUCCESS") |
74 return nil, nil | 75 return nil, nil |
75 case err != nil: | 76 case err != nil: |
76 report("ERROR_TRANSIENT") | 77 report("ERROR_TRANSIENT") |
77 » » return nil, errors.WrapTransient(err) | 78 » » return nil, transient.Tag.Apply(err) |
78 default: | 79 default: |
79 report("SUCCESS") | 80 report("SUCCESS") |
80 return &info, nil | 81 return &info, nil |
81 } | 82 } |
82 } | 83 } |
83 | 84 |
84 // deleteSnapshotInfo removes SnapshotInfo entity from the datastore. | 85 // deleteSnapshotInfo removes SnapshotInfo entity from the datastore. |
85 // | 86 // |
86 // Used to detach the service from auth_service. | 87 // Used to detach the service from auth_service. |
87 func deleteSnapshotInfo(c context.Context) error { | 88 func deleteSnapshotInfo(c context.Context) error { |
88 c = ds.WithoutTransaction(c) | 89 c = ds.WithoutTransaction(c) |
89 return ds.Delete(c, ds.KeyForObj(c, &SnapshotInfo{})) | 90 return ds.Delete(c, ds.KeyForObj(c, &SnapshotInfo{})) |
90 } | 91 } |
91 | 92 |
92 // GetAuthDBSnapshot fetches, inflates and deserializes AuthDB snapshot. | 93 // GetAuthDBSnapshot fetches, inflates and deserializes AuthDB snapshot. |
93 func GetAuthDBSnapshot(c context.Context, id string) (*protocol.AuthDB, error) { | 94 func GetAuthDBSnapshot(c context.Context, id string) (*protocol.AuthDB, error) { |
94 report := durationReporter(c, getSnapshotDuration) | 95 report := durationReporter(c, getSnapshotDuration) |
95 logging.Debugf(c, "Fetching AuthDB snapshot from the datastore") | 96 logging.Debugf(c, "Fetching AuthDB snapshot from the datastore") |
96 defer logging.Debugf(c, "AuthDB snapshot fetched") | 97 defer logging.Debugf(c, "AuthDB snapshot fetched") |
97 | 98 |
98 c = ds.WithoutTransaction(defaultNS(c)) | 99 c = ds.WithoutTransaction(defaultNS(c)) |
99 snap := Snapshot{ID: id} | 100 snap := Snapshot{ID: id} |
100 switch err := ds.Get(c, &snap); { | 101 switch err := ds.Get(c, &snap); { |
101 case err == ds.ErrNoSuchEntity: | 102 case err == ds.ErrNoSuchEntity: |
102 report("ERROR_NO_SNAPSHOT") | 103 report("ERROR_NO_SNAPSHOT") |
103 return nil, err // not transient | 104 return nil, err // not transient |
104 case err != nil: | 105 case err != nil: |
105 report("ERROR_TRANSIENT") | 106 report("ERROR_TRANSIENT") |
106 » » return nil, errors.WrapTransient(err) | 107 » » return nil, transient.Tag.Apply(err) |
107 } | 108 } |
108 | 109 |
109 db, err := service.InflateAuthDB(snap.AuthDBDeflated) | 110 db, err := service.InflateAuthDB(snap.AuthDBDeflated) |
110 if err != nil { | 111 if err != nil { |
111 report("ERROR_INFLATION") | 112 report("ERROR_INFLATION") |
112 return nil, err | 113 return nil, err |
113 } | 114 } |
114 | 115 |
115 report("SUCCESS") | 116 report("SUCCESS") |
116 return db, nil | 117 return db, nil |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
166 // Configure PubSub subscription to receive future updates. | 167 // Configure PubSub subscription to receive future updates. |
167 if err := setupPubSub(c, baseURL, authServiceURL); err != nil { | 168 if err := setupPubSub(c, baseURL, authServiceURL); err != nil { |
168 logging.Errorf(c, "Failed to configure pubsub subscription - %s"
, err) | 169 logging.Errorf(c, "Failed to configure pubsub subscription - %s"
, err) |
169 return err | 170 return err |
170 } | 171 } |
171 | 172 |
172 // All is configured. Switch SnapshotInfo entity to point to new snapsho
t. | 173 // All is configured. Switch SnapshotInfo entity to point to new snapsho
t. |
173 // It makes syncAuthDB fetch changes from `authServiceURL`, thus promoti
ng | 174 // It makes syncAuthDB fetch changes from `authServiceURL`, thus promoti
ng |
174 // `authServiceURL` to the status of main auth service. | 175 // `authServiceURL` to the status of main auth service. |
175 if err := ds.Put(ds.WithoutTransaction(c), info); err != nil { | 176 if err := ds.Put(ds.WithoutTransaction(c), info); err != nil { |
176 » » return errors.WrapTransient(err) | 177 » » return transient.Tag.Apply(err) |
177 } | 178 } |
178 | 179 |
179 // Stop getting notifications from previously used auth service. | 180 // Stop getting notifications from previously used auth service. |
180 if prevAuthServiceURL != "" && prevAuthServiceURL != authServiceURL { | 181 if prevAuthServiceURL != "" && prevAuthServiceURL != authServiceURL { |
181 return killPubSub(c, prevAuthServiceURL) | 182 return killPubSub(c, prevAuthServiceURL) |
182 } | 183 } |
183 | 184 |
184 return nil | 185 return nil |
185 } | 186 } |
186 | 187 |
(...skipping 12 matching lines...) Expand all Loading... |
199 if err != nil { | 200 if err != nil { |
200 return err | 201 return err |
201 } | 202 } |
202 ent := Snapshot{ | 203 ent := Snapshot{ |
203 ID: info.GetSnapshotID(), | 204 ID: info.GetSnapshotID(), |
204 AuthDBDeflated: blob, | 205 AuthDBDeflated: blob, |
205 CreatedAt: snap.Created.UTC(), | 206 CreatedAt: snap.Created.UTC(), |
206 FetchedAt: clock.Now(c).UTC(), | 207 FetchedAt: clock.Now(c).UTC(), |
207 } | 208 } |
208 logging.Infof(c, "Lag: %s", ent.FetchedAt.Sub(ent.CreatedAt)) | 209 logging.Infof(c, "Lag: %s", ent.FetchedAt.Sub(ent.CreatedAt)) |
209 » return errors.WrapTransient(ds.Put(ds.WithoutTransaction(c), &ent)) | 210 » return transient.Tag.Apply(ds.Put(ds.WithoutTransaction(c), &ent)) |
210 } | 211 } |
211 | 212 |
212 // syncAuthDB fetches latest AuthDB snapshot from the configured auth service, | 213 // syncAuthDB fetches latest AuthDB snapshot from the configured auth service, |
213 // puts it into the datastore and updates SnapshotInfo entity to point to it. | 214 // puts it into the datastore and updates SnapshotInfo entity to point to it. |
214 // | 215 // |
215 // Expects authenticating transport to be in the context. Called when receiving | 216 // Expects authenticating transport to be in the context. Called when receiving |
216 // PubSub notifications. | 217 // PubSub notifications. |
217 // | 218 // |
218 // Returns SnapshotInfo of the most recent snapshot. | 219 // Returns SnapshotInfo of the most recent snapshot. |
219 func syncAuthDB(c context.Context) (*SnapshotInfo, error) { | 220 func syncAuthDB(c context.Context) (*SnapshotInfo, error) { |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
281 case latest.Rev >= info.Rev: | 282 case latest.Rev >= info.Rev: |
282 logging.Warningf(c, "Already have rev %d", info.Rev) | 283 logging.Warningf(c, "Already have rev %d", info.Rev) |
283 return nil | 284 return nil |
284 } | 285 } |
285 latest = info | 286 latest = info |
286 return ds.Put(c, info) | 287 return ds.Put(c, info) |
287 }, nil) | 288 }, nil) |
288 | 289 |
289 if err != nil { | 290 if err != nil { |
290 report("ERROR_COMMITTING") | 291 report("ERROR_COMMITTING") |
291 » » return nil, errors.WrapTransient(err) | 292 » » return nil, transient.Tag.Apply(err) |
292 } | 293 } |
293 | 294 |
294 report("SUCCESS_UPDATED") | 295 report("SUCCESS_UPDATED") |
295 return latest, nil | 296 return latest, nil |
296 } | 297 } |
OLD | NEW |