| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | 8 "net/http" |
| 9 "sync" | 9 "sync" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/appengine/gaemiddleware" | 12 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 13 luciConfig "github.com/luci/luci-go/common/config" | 13 luciConfig "github.com/luci/luci-go/common/config" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/gs" | 15 "github.com/luci/luci-go/common/gcloud/gs" |
| 16 "github.com/luci/luci-go/common/gcloud/pubsub" | 16 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 20 "github.com/luci/luci-go/logdog/common/storage" | 20 "github.com/luci/luci-go/logdog/common/storage" |
| 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 22 "github.com/luci/luci-go/server/auth" | 22 "github.com/luci/luci-go/server/auth" |
| 23 "github.com/luci/luci-go/server/router" | 23 "github.com/luci/luci-go/server/router" |
| 24 |
| 25 gcps "cloud.google.com/go/pubsub" |
| 24 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 25 » "google.golang.org/cloud" | 27 » "google.golang.org/api/option" |
| 26 » gcps "google.golang.org/cloud/pubsub" | |
| 27 "google.golang.org/grpc/metadata" | 28 "google.golang.org/grpc/metadata" |
| 28 ) | 29 ) |
| 29 | 30 |
| 30 // Services is a set of support services used by Coordinator. | 31 // Services is a set of support services used by Coordinator. |
| 31 // | 32 // |
| 32 // Each Services instance is valid for a singel request, but can be re-used | 33 // Each Services instance is valid for a singel request, but can be re-used |
| 33 // throughout that request. This is advised, as the Services instance may | 34 // throughout that request. This is advised, as the Services instance may |
| 34 // optionally cache values. | 35 // optionally cache values. |
| 35 // | 36 // |
| 36 // Services methods are goroutine-safe. | 37 // Services methods are goroutine-safe. |
| (...skipping 157 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 194 | 195 |
| 195 // Explicitly clear gRPC metadata from the Context. It is forwarded to | 196 // Explicitly clear gRPC metadata from the Context. It is forwarded to |
| 196 // delegate calls by default, and standard request metadata can break Bi
gTable | 197 // delegate calls by default, and standard request metadata can break Bi
gTable |
| 197 // calls. | 198 // calls. |
| 198 c = metadata.NewContext(c, nil) | 199 c = metadata.NewContext(c, nil) |
| 199 | 200 |
| 200 st, err := bigtable.New(c, bigtable.Options{ | 201 st, err := bigtable.New(c, bigtable.Options{ |
| 201 Project: bt.Project, | 202 Project: bt.Project, |
| 202 Instance: bt.Instance, | 203 Instance: bt.Instance, |
| 203 LogTable: bt.LogTableName, | 204 LogTable: bt.LogTableName, |
| 204 » » ClientOptions: []cloud.ClientOption{ | 205 » » ClientOptions: []option.ClientOption{ |
| 205 » » » cloud.WithBaseHTTP(&http.Client{Transport: transport}), | 206 » » » option.WithHTTPClient(&http.Client{Transport: transport}
), |
| 206 }, | 207 }, |
| 207 }) | 208 }) |
| 208 if err != nil { | 209 if err != nil { |
| 209 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") | 210 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") |
| 210 return nil, err | 211 return nil, err |
| 211 } | 212 } |
| 212 return st, nil | 213 return st, nil |
| 213 } | 214 } |
| 214 | 215 |
| 215 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { | 216 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { |
| (...skipping 23 matching lines...) Expand all Loading... |
| 239 } | 240 } |
| 240 project, topic := fullTopic.Split() | 241 project, topic := fullTopic.Split() |
| 241 | 242 |
| 242 // Create an authenticated Pub/Sub client. | 243 // Create an authenticated Pub/Sub client. |
| 243 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(p
ubsub.PublisherScopes...)) | 244 transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(p
ubsub.PublisherScopes...)) |
| 244 if err != nil { | 245 if err != nil { |
| 245 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic
ator.") | 246 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic
ator.") |
| 246 return nil, errors.New("failed to create Pub/Sub authenticator") | 247 return nil, errors.New("failed to create Pub/Sub authenticator") |
| 247 } | 248 } |
| 248 client := &http.Client{Transport: transport} | 249 client := &http.Client{Transport: transport} |
| 249 » psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client)) | 250 » psClient, err := gcps.NewClient(c, project, option.WithHTTPClient(client
)) |
| 250 if err != nil { | 251 if err != nil { |
| 251 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 252 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 252 return nil, errors.New("failed to create Pub/Sub client") | 253 return nil, errors.New("failed to create Pub/Sub client") |
| 253 } | 254 } |
| 254 | 255 |
| 255 return &pubsubArchivalPublisher{ | 256 return &pubsubArchivalPublisher{ |
| 256 topic: psClient.Topic(topic), | 257 topic: psClient.Topic(topic), |
| 257 publishIndexFunc: s.nextArchiveIndex, | 258 publishIndexFunc: s.nextArchiveIndex, |
| 258 }, nil | 259 }, nil |
| 259 } | 260 } |
| 260 | 261 |
| 261 func (s *prodServicesInst) nextArchiveIndex() uint64 { | 262 func (s *prodServicesInst) nextArchiveIndex() uint64 { |
| 262 // We use a 32-bit value for this because it avoids atomic memory bounar
y | 263 // We use a 32-bit value for this because it avoids atomic memory bounar
y |
| 263 // issues. Furthermore, we constrain it to be positive, using a negative | 264 // issues. Furthermore, we constrain it to be positive, using a negative |
| 264 // value as a sentinel that the archival index has wrapped. | 265 // value as a sentinel that the archival index has wrapped. |
| 265 // | 266 // |
| 266 // This is reasonable, as it is very unlikely that a single request will
issue | 267 // This is reasonable, as it is very unlikely that a single request will
issue |
| 267 // more than MaxInt32 archival tasks. | 268 // more than MaxInt32 archival tasks. |
| 268 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 269 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
| 269 if v < 0 { | 270 if v < 0 { |
| 270 panic("archival index has wrapped") | 271 panic("archival index has wrapped") |
| 271 } | 272 } |
| 272 return uint64(v) | 273 return uint64(v) |
| 273 } | 274 } |
| OLD | NEW |