OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package coordinator | 5 package coordinator |
6 | 6 |
7 import ( | 7 import ( |
8 "net/http" | 8 "net/http" |
9 "sync" | 9 "sync" |
10 "sync/atomic" | 10 "sync/atomic" |
11 | 11 |
12 "github.com/luci/luci-go/appengine/gaemiddleware" | 12 "github.com/luci/luci-go/appengine/gaemiddleware" |
13 luciConfig "github.com/luci/luci-go/common/config" | 13 luciConfig "github.com/luci/luci-go/common/config" |
14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
15 "github.com/luci/luci-go/common/gcloud/gs" | 15 "github.com/luci/luci-go/common/gcloud/gs" |
16 "github.com/luci/luci-go/common/gcloud/pubsub" | 16 "github.com/luci/luci-go/common/gcloud/pubsub" |
17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
18 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
20 "github.com/luci/luci-go/logdog/common/storage" | 20 "github.com/luci/luci-go/logdog/common/storage" |
21 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
22 "github.com/luci/luci-go/server/auth" | 22 "github.com/luci/luci-go/server/auth" |
23 "github.com/luci/luci-go/server/router" | 23 "github.com/luci/luci-go/server/router" |
| 24 |
| 25 gcps "cloud.google.com/go/pubsub" |
24 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
25 » "google.golang.org/cloud" | 27 » "google.golang.org/api/option" |
26 » gcps "google.golang.org/cloud/pubsub" | |
27 "google.golang.org/grpc/metadata" | 28 "google.golang.org/grpc/metadata" |
28 ) | 29 ) |
29 | 30 |
30 // Services is a set of support services used by Coordinator. | 31 // Services is a set of support services used by Coordinator. |
31 // | 32 // |
32 // Each Services instance is valid for a singel request, but can be re-used | 33 // Each Services instance is valid for a singel request, but can be re-used |
33 // throughout that request. This is advised, as the Services instance may | 34 // throughout that request. This is advised, as the Services instance may |
34 // optionally cache values. | 35 // optionally cache values. |
35 // | 36 // |
36 // Services methods are goroutine-safe. | 37 // Services methods are goroutine-safe. |
(...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
194 | 195 |
195 // Explicitly clear gRPC metadata from the Context. It is forwarded to | 196 // Explicitly clear gRPC metadata from the Context. It is forwarded to |
196 // delegate calls by default, and standard request metadata can break Bi
gTable | 197 // delegate calls by default, and standard request metadata can break Bi
gTable |
197 // calls. | 198 // calls. |
198 c = metadata.NewContext(c, nil) | 199 c = metadata.NewContext(c, nil) |
199 | 200 |
200 st, err := bigtable.New(c, bigtable.Options{ | 201 st, err := bigtable.New(c, bigtable.Options{ |
201 Project: bt.Project, | 202 Project: bt.Project, |
202 Instance: bt.Instance, | 203 Instance: bt.Instance, |
203 LogTable: bt.LogTableName, | 204 LogTable: bt.LogTableName, |
204 » » ClientOptions: []cloud.ClientOption{ | 205 » » ClientOptions: []option.ClientOption{ |
205 » » » cloud.WithBaseHTTP(&http.Client{Transport: transport}), | 206 » » » option.WithHTTPClient(&http.Client{Transport: transport}
), |
206 }, | 207 }, |
207 }) | 208 }) |
208 if err != nil { | 209 if err != nil { |
209 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") | 210 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") |
210 return nil, err | 211 return nil, err |
211 } | 212 } |
212 return st, nil | 213 return st, nil |
213 } | 214 } |
214 | 215 |
215 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { | 216 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { |
(...skipping 23 matching lines...) Expand all Loading... |
239 } | 240 } |
240 project, topic := fullTopic.Split() | 241 project, topic := fullTopic.Split() |
241 | 242 |
242 // Create an authenticated Pub/Sub client. | 243 // Create an authenticated Pub/Sub client. |
243 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(p
ubsub.PublisherScopes...)) | 244 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(p
ubsub.PublisherScopes...)) |
244 if err != nil { | 245 if err != nil { |
245 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic
ator.") | 246 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic
ator.") |
246 return nil, errors.New("failed to create Pub/Sub authenticator") | 247 return nil, errors.New("failed to create Pub/Sub authenticator") |
247 } | 248 } |
248 client := &http.Client{Transport: transport} | 249 client := &http.Client{Transport: transport} |
249 » psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client)) | 250 » psClient, err := gcps.NewClient(c, project, option.WithHTTPClient(client
)) |
250 if err != nil { | 251 if err != nil { |
251 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 252 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
252 return nil, errors.New("failed to create Pub/Sub client") | 253 return nil, errors.New("failed to create Pub/Sub client") |
253 } | 254 } |
254 | 255 |
255 return &pubsubArchivalPublisher{ | 256 return &pubsubArchivalPublisher{ |
256 topic: psClient.Topic(topic), | 257 topic: psClient.Topic(topic), |
257 publishIndexFunc: s.nextArchiveIndex, | 258 publishIndexFunc: s.nextArchiveIndex, |
258 }, nil | 259 }, nil |
259 } | 260 } |
260 | 261 |
261 func (s *prodServicesInst) nextArchiveIndex() uint64 { | 262 func (s *prodServicesInst) nextArchiveIndex() uint64 { |
262 // We use a 32-bit value for this because it avoids atomic memory bounar
y | 263 // We use a 32-bit value for this because it avoids atomic memory bounar
y |
263 // issues. Furthermore, we constrain it to be positive, using a negative | 264 // issues. Furthermore, we constrain it to be positive, using a negative |
264 // value as a sentinel that the archival index has wrapped. | 265 // value as a sentinel that the archival index has wrapped. |
265 // | 266 // |
266 // This is reasonable, as it is very unlikely that a single request will
issue | 267 // This is reasonable, as it is very unlikely that a single request will
issue |
267 // more than MaxInt32 archival tasks. | 268 // more than MaxInt32 archival tasks. |
268 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 269 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
269 if v < 0 { | 270 if v < 0 { |
270 panic("archival index has wrapped") | 271 panic("archival index has wrapped") |
271 } | 272 } |
272 return uint64(v) | 273 return uint64(v) |
273 } | 274 } |
OLD | NEW |