| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | 8 "net/http" |
| 9 "sync" | 9 "sync" |
| 10 "sync/atomic" | 10 "sync/atomic" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/appengine/gaemiddleware" | 12 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 13 luciConfig "github.com/luci/luci-go/common/config" | 13 luciConfig "github.com/luci/luci-go/common/config" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/gcloud/gs" | 15 "github.com/luci/luci-go/common/gcloud/gs" |
| 16 "github.com/luci/luci-go/common/gcloud/pubsub" | 16 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 17 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 20 "github.com/luci/luci-go/logdog/common/storage" | 20 "github.com/luci/luci-go/logdog/common/storage" |
| 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 21 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 22 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 22 "github.com/luci/luci-go/server/auth" | 23 "github.com/luci/luci-go/server/auth" |
| 23 "github.com/luci/luci-go/server/router" | 24 "github.com/luci/luci-go/server/router" |
| 24 | 25 |
| 25 gcps "cloud.google.com/go/pubsub" | 26 gcps "cloud.google.com/go/pubsub" |
| 26 "golang.org/x/net/context" | 27 "golang.org/x/net/context" |
| 27 "google.golang.org/api/option" | 28 "google.golang.org/api/option" |
| 28 "google.golang.org/grpc" | 29 "google.golang.org/grpc" |
| 29 "google.golang.org/grpc/metadata" | 30 "google.golang.org/grpc/metadata" |
| 30 ) | 31 ) |
| 31 | 32 |
| (...skipping 26 matching lines...) Expand all Loading... |
| 58 // Storage returns an intermediate storage instance for use by this serv
ice. | 59 // Storage returns an intermediate storage instance for use by this serv
ice. |
| 59 // | 60 // |
| 60 // The caller must close the returned instance if successful. | 61 // The caller must close the returned instance if successful. |
| 61 IntermediateStorage(context.Context) (storage.Storage, error) | 62 IntermediateStorage(context.Context) (storage.Storage, error) |
| 62 | 63 |
| 63 // GSClient instantiates a Google Storage client. | 64 // GSClient instantiates a Google Storage client. |
| 64 GSClient(context.Context) (gs.Client, error) | 65 GSClient(context.Context) (gs.Client, error) |
| 65 | 66 |
| 66 // ArchivalPublisher returns an ArchivalPublisher instance. | 67 // ArchivalPublisher returns an ArchivalPublisher instance. |
| 67 ArchivalPublisher(context.Context) (ArchivalPublisher, error) | 68 ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| 69 |
| 70 // StorageCache returns the storage cache instance to use, or nil for no |
| 71 // caching. |
| 72 StorageCache() caching.Cache |
| 68 } | 73 } |
| 69 | 74 |
| 70 // ProdServices is middleware chain used by Coordinator services. | 75 // ProdServices is middleware chain used by Coordinator services. |
| 71 // | 76 // |
| 72 // It sets up basic GAE functionality as well as installs a production Services | 77 // It sets up basic GAE functionality as well as installs a production Services |
| 73 // instance. | 78 // instance. |
| 74 func ProdServices() router.MiddlewareChain { | 79 func ProdServices() router.MiddlewareChain { |
| 75 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout
er.Handler) { | 80 return gaemiddleware.BaseProd().Extend(func(c *router.Context, next rout
er.Handler) { |
| 76 c.Context = WithServices(c.Context, &prodServicesInst{}) | 81 c.Context = WithServices(c.Context, &prodServicesInst{}) |
| 77 next(c) | 82 next(c) |
| (...skipping 123 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 201 // calls. | 206 // calls. |
| 202 c = metadata.NewContext(c, nil) | 207 c = metadata.NewContext(c, nil) |
| 203 | 208 |
| 204 st, err := bigtable.New(c, bigtable.Options{ | 209 st, err := bigtable.New(c, bigtable.Options{ |
| 205 Project: bt.Project, | 210 Project: bt.Project, |
| 206 Instance: bt.Instance, | 211 Instance: bt.Instance, |
| 207 LogTable: bt.LogTableName, | 212 LogTable: bt.LogTableName, |
| 208 ClientOptions: []option.ClientOption{ | 213 ClientOptions: []option.ClientOption{ |
| 209 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre
ds)), | 214 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(cre
ds)), |
| 210 }, | 215 }, |
| 216 Cache: s.StorageCache(), |
| 211 }) | 217 }) |
| 212 if err != nil { | 218 if err != nil { |
| 213 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") | 219 log.WithError(err).Errorf(c, "Failed to create BigTable instance
.") |
| 214 return nil, err | 220 return nil, err |
| 215 } | 221 } |
| 216 return st, nil | 222 return st, nil |
| 217 } | 223 } |
| 218 | 224 |
| 219 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { | 225 func (s *prodServicesInst) GSClient(c context.Context) (gs.Client, error) { |
| 220 // Get an Authenticator bound to the token scopes that we need for | 226 // Get an Authenticator bound to the token scopes that we need for |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 268 // value as a sentinel that the archival index has wrapped. | 274 // value as a sentinel that the archival index has wrapped. |
| 269 // | 275 // |
| 270 // This is reasonable, as it is very unlikely that a single request will
issue | 276 // This is reasonable, as it is very unlikely that a single request will
issue |
| 271 // more than MaxInt32 archival tasks. | 277 // more than MaxInt32 archival tasks. |
| 272 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 278 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
| 273 if v < 0 { | 279 if v < 0 { |
| 274 panic("archival index has wrapped") | 280 panic("archival index has wrapped") |
| 275 } | 281 } |
| 276 return uint64(v) | 282 return uint64(v) |
| 277 } | 283 } |
| 284 |
| 285 var storageCacheSingleton StorageCache |
| 286 |
| 287 func (s *prodServicesInst) StorageCache() caching.Cache { return &storageCacheSi
ngleton } |
| OLD | NEW |