| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | 8 "net/http" |
| 9 "sync" | 9 "sync" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/appengine/gaemiddleware" | 12 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 13 luciConfig "github.com/luci/luci-go/common/config" | 13 luciConfig "github.com/luci/luci-go/common/config" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/gs" | 15 "github.com/luci/luci-go/common/gcloud/gs" |
| 16 "github.com/luci/luci-go/common/gcloud/pubsub" | 16 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 20 "github.com/luci/luci-go/logdog/common/storage" | 20 "github.com/luci/luci-go/logdog/common/storage" |
| 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 22 "github.com/luci/luci-go/server/auth" | 22 "github.com/luci/luci-go/server/auth" |
| 23 "github.com/luci/luci-go/server/router" | 23 "github.com/luci/luci-go/server/router" |
| 24 | 24 |
| 25 gcps "cloud.google.com/go/pubsub" | 25 gcps "cloud.google.com/go/pubsub" |
| 26 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 27 "google.golang.org/api/option" | 27 "google.golang.org/api/option" |
| 28 "google.golang.org/grpc" |
| 28 "google.golang.org/grpc/metadata" | 29 "google.golang.org/grpc/metadata" |
| 29 ) | 30 ) |
| 30 | 31 |
| 31 // Services is a set of support services used by Coordinator. | 32 // Services is a set of support services used by Coordinator. |
| 32 // | 33 // |
| 33 // Each Services instance is valid for a singel request, but can be re-used | 34 // Each Services instance is valid for a singel request, but can be re-used |
| 34 // throughout that request. This is advised, as the Services instance may | 35 // throughout that request. This is advised, as the Services instance may |
| 35 // optionally cache values. | 36 // optionally cache values. |
| 36 // | 37 // |
| 37 // Services methods are goroutine-safe. | 38 // Services methods are goroutine-safe. |
| (...skipping 142 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 180 merr = append(merr, errors.New("missing instance")) | 181 merr = append(merr, errors.New("missing instance")) |
| 181 } | 182 } |
| 182 if bt.LogTableName == "" { | 183 if bt.LogTableName == "" { |
| 183 merr = append(merr, errors.New("missing log table name")) | 184 merr = append(merr, errors.New("missing log table name")) |
| 184 } | 185 } |
| 185 if len(merr) > 0 { | 186 if len(merr) > 0 { |
| 186 return nil, merr | 187 return nil, merr |
| 187 } | 188 } |
| 188 | 189 |
| 189 // Get an Authenticator bound to the token scopes that we need for BigTa
ble. | 190 // Get an Authenticator bound to the token scopes that we need for BigTa
ble. |
| 190 » transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(b
igtable.StorageScopes...)) | 191 » creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(big
table.StorageScopes...)) |
| 191 if err != nil { | 192 if err != nil { |
| 192 » » log.WithError(err).Errorf(c, "Failed to create BigTable authenti
cator.") | 193 » » log.WithError(err).Errorf(c, "Failed to create BigTable credenti
als.") |
| 193 » » return nil, errors.New("failed to create BigTable authenticator"
) | 194 » » return nil, errors.New("failed to create BigTable credentials") |
| 194 } | 195 } |
| 195 | 196 |
| 196 // Explicitly clear gRPC metadata from the Context. It is forwarded to | 197 // Explicitly clear gRPC metadata from the Context. It is forwarded to |
| 197 // delegate calls by default, and standard request metadata can break Bi
gTable | 198 // delegate calls by default, and standard request metadata can break Bi
gTable |
| 198 // calls. | 199 // calls. |
| 199 c = metadata.NewContext(c, nil) | 200 c = metadata.NewContext(c, nil) |
| 200 | 201 |
| 201 st, err := bigtable.New(c, bigtable.Options{ | 202 st, err := bigtable.New(c, bigtable.Options{ |
| 202 Project: bt.Project, | 203 Project: bt.Project, |
| 203 Instance: bt.Instance, | 204 Instance: bt.Instance, |
| 204 LogTable: bt.LogTableName, | 205 LogTable: bt.LogTableName, |
| 205 ClientOptions: []option.ClientOption{ | 206 ClientOptions: []option.ClientOption{ |
| 206 » » » option.WithHTTPClient(&http.Client{Transport: transport}
), | 207 » » » option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre
ds)), |
| 207 }, | 208 }, |
| 208 }) | 209 }) |
| 209 if err != nil { | 210 if err != nil { |
| 210 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") | 211 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") |
| 211 return nil, err | 212 return nil, err |
| 212 } | 213 } |
| 213 return st, nil | 214 return st, nil |
| 214 } | 215 } |
| 215 | 216 |
| 216 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { | 217 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 265 // value as a sentinel that the archival index has wrapped. | 266 // value as a sentinel that the archival index has wrapped. |
| 266 // | 267 // |
| 267 // This is reasonable, as it is very unlikely that a single request will
issue | 268 // This is reasonable, as it is very unlikely that a single request will
issue |
| 268 // more than MaxInt32 archival tasks. | 269 // more than MaxInt32 archival tasks. |
| 269 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 270 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
| 270 if v < 0 { | 271 if v < 0 { |
| 271 panic("archival index has wrapped") | 272 panic("archival index has wrapped") |
| 272 } | 273 } |
| 273 return uint64(v) | 274 return uint64(v) |
| 274 } | 275 } |
| OLD | NEW |