| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "sync" | 8 "sync" |
| 9 "sync/atomic" | 9 "sync/atomic" |
| 10 "time" | 10 "time" |
| 11 | 11 |
| 12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" | 12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" |
| 13 "github.com/luci/luci-go/common/clock" | 13 "github.com/luci/luci-go/common/clock" |
| 14 luciConfig "github.com/luci/luci-go/common/config" | |
| 15 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/gcloud/gs" | 15 "github.com/luci/luci-go/common/gcloud/gs" |
| 17 "github.com/luci/luci-go/common/gcloud/pubsub" | 16 "github.com/luci/luci-go/common/gcloud/pubsub" |
| 18 log "github.com/luci/luci-go/common/logging" | 17 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 18 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 20 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 19 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 21 "github.com/luci/luci-go/logdog/common/storage" | 20 "github.com/luci/luci-go/logdog/common/storage" |
| 22 "github.com/luci/luci-go/logdog/common/storage/archive" | 21 "github.com/luci/luci-go/logdog/common/storage/archive" |
| 23 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 22 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 24 "github.com/luci/luci-go/logdog/common/storage/caching" | 23 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 24 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 25 "github.com/luci/luci-go/server/auth" | 25 "github.com/luci/luci-go/server/auth" |
| 26 "github.com/luci/luci-go/server/router" | 26 "github.com/luci/luci-go/server/router" |
| 27 | 27 |
| 28 gcps "cloud.google.com/go/pubsub" | 28 gcps "cloud.google.com/go/pubsub" |
| 29 gcst "cloud.google.com/go/storage" | 29 gcst "cloud.google.com/go/storage" |
| 30 "golang.org/x/net/context" | 30 "golang.org/x/net/context" |
| 31 "google.golang.org/api/option" | 31 "google.golang.org/api/option" |
| 32 "google.golang.org/appengine" | 32 "google.golang.org/appengine" |
| 33 "google.golang.org/grpc" | 33 "google.golang.org/grpc" |
| 34 "google.golang.org/grpc/metadata" | 34 "google.golang.org/grpc/metadata" |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 67 // The production instance will cache the results for the duration of th
e | 67 // The production instance will cache the results for the duration of th
e |
| 68 // request. | 68 // request. |
| 69 Config(context.Context) (*config.Config, error) | 69 Config(context.Context) (*config.Config, error) |
| 70 | 70 |
| 71 // ProjectConfig returns the project configuration for the named project
. | 71 // ProjectConfig returns the project configuration for the named project
. |
| 72 // | 72 // |
| 73 // The production instance will cache the results for the duration of th
e | 73 // The production instance will cache the results for the duration of th
e |
| 74 // request. | 74 // request. |
| 75 // | 75 // |
| 76 // Returns the same error codes as config.ProjectConfig. | 76 // Returns the same error codes as config.ProjectConfig. |
| 77 » ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje
ctConfig, error) | 77 » ProjectConfig(context.Context, cfgtypes.ProjectName) (*svcconfig.Project
Config, error) |
| 78 | 78 |
| 79 // Storage returns a Storage instance for the supplied log stream. | 79 // Storage returns a Storage instance for the supplied log stream. |
| 80 // | 80 // |
| 81 // The caller must close the returned instance if successful. | 81 // The caller must close the returned instance if successful. |
| 82 StorageForStream(context.Context, *LogStreamState) (Storage, error) | 82 StorageForStream(context.Context, *LogStreamState) (Storage, error) |
| 83 | 83 |
| 84 // ArchivalPublisher returns an ArchivalPublisher instance. | 84 // ArchivalPublisher returns an ArchivalPublisher instance. |
| 85 ArchivalPublisher(context.Context) (ArchivalPublisher, error) | 85 ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| 86 } | 86 } |
| 87 | 87 |
| (...skipping 13 matching lines...) Expand all Loading... |
| 101 // prodServicesInst is a Service exposing production faciliites. A unique | 101 // prodServicesInst is a Service exposing production faciliites. A unique |
| 102 // instance is bound to each each request. | 102 // instance is bound to each each request. |
| 103 type prodServicesInst struct { | 103 type prodServicesInst struct { |
| 104 sync.Mutex | 104 sync.Mutex |
| 105 | 105 |
| 106 // aeCtx is an AppEngine Context initialized for the current request. | 106 // aeCtx is an AppEngine Context initialized for the current request. |
| 107 aeCtx context.Context | 107 aeCtx context.Context |
| 108 | 108 |
| 109 // gcfg is the cached global configuration. | 109 // gcfg is the cached global configuration. |
| 110 gcfg *config.Config | 110 gcfg *config.Config |
| 111 » projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig | 111 » projectConfigs map[cfgtypes.ProjectName]*cachedProjectConfig |
| 112 | 112 |
| 113 // archivalIndex is the atomically-manipulated archival index for the | 113 // archivalIndex is the atomically-manipulated archival index for the |
| 114 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces | 114 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces |
| 115 // from this service. | 115 // from this service. |
| 116 archivalIndex int32 | 116 archivalIndex int32 |
| 117 | 117 |
| 118 // pubSubClients is a map of Pub/Sub client singletons generated during
this | 118 // pubSubClients is a map of Pub/Sub client singletons generated during
this |
| 119 // request. Each client is associated with its project, and will be | 119 // request. Each client is associated with its project, and will be |
| 120 // initialized the first time it is requested by getPubSubClient. | 120 // initialized the first time it is requested by getPubSubClient. |
| 121 // | 121 // |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 155 | 155 |
| 156 return s.gcfg, nil | 156 return s.gcfg, nil |
| 157 } | 157 } |
| 158 | 158 |
| 159 // cachedProjectConfig is a singleton instance that holds a project config | 159 // cachedProjectConfig is a singleton instance that holds a project config |
| 160 // state. It is populated when resolve is called, and is goroutine-safe for | 160 // state. It is populated when resolve is called, and is goroutine-safe for |
| 161 // read-only operations. | 161 // read-only operations. |
| 162 type cachedProjectConfig struct { | 162 type cachedProjectConfig struct { |
| 163 sync.Once | 163 sync.Once |
| 164 | 164 |
| 165 » project luciConfig.ProjectName | 165 » project cfgtypes.ProjectName |
| 166 pcfg *svcconfig.ProjectConfig | 166 pcfg *svcconfig.ProjectConfig |
| 167 err error | 167 err error |
| 168 } | 168 } |
| 169 | 169 |
| 170 func (cp *cachedProjectConfig) resolve(c context.Context) (*svcconfig.ProjectCon
fig, error) { | 170 func (cp *cachedProjectConfig) resolve(c context.Context) (*svcconfig.ProjectCon
fig, error) { |
| 171 // Load the project config exactly once. This will be cached for the rem
ainder | 171 // Load the project config exactly once. This will be cached for the rem
ainder |
| 172 // of this request. | 172 // of this request. |
| 173 // | 173 // |
| 174 // If multiple goroutines attempt to load it, exactly one will, and the
rest | 174 // If multiple goroutines attempt to load it, exactly one will, and the
rest |
| 175 // will block. All operations after this Once must be read-only. | 175 // will block. All operations after this Once must be read-only. |
| 176 cp.Do(func() { | 176 cp.Do(func() { |
| 177 cp.pcfg, cp.err = config.ProjectConfig(c, cp.project) | 177 cp.pcfg, cp.err = config.ProjectConfig(c, cp.project) |
| 178 }) | 178 }) |
| 179 return cp.pcfg, cp.err | 179 return cp.pcfg, cp.err |
| 180 } | 180 } |
| 181 | 181 |
| 182 func (s *prodServicesInst) getOrCreateCachedProjectConfig(project luciConfig.Pro
jectName) *cachedProjectConfig { | 182 func (s *prodServicesInst) getOrCreateCachedProjectConfig(project cfgtypes.Proje
ctName) *cachedProjectConfig { |
| 183 s.Lock() | 183 s.Lock() |
| 184 defer s.Unlock() | 184 defer s.Unlock() |
| 185 | 185 |
| 186 if s.projectConfigs == nil { | 186 if s.projectConfigs == nil { |
| 187 » » s.projectConfigs = make(map[luciConfig.ProjectName]*cachedProjec
tConfig) | 187 » » s.projectConfigs = make(map[cfgtypes.ProjectName]*cachedProjectC
onfig) |
| 188 } | 188 } |
| 189 cp := s.projectConfigs[project] | 189 cp := s.projectConfigs[project] |
| 190 if cp == nil { | 190 if cp == nil { |
| 191 cp = &cachedProjectConfig{ | 191 cp = &cachedProjectConfig{ |
| 192 project: project, | 192 project: project, |
| 193 } | 193 } |
| 194 s.projectConfigs[project] = cp | 194 s.projectConfigs[project] = cp |
| 195 } | 195 } |
| 196 return cp | 196 return cp |
| 197 } | 197 } |
| 198 | 198 |
| 199 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P
rojectName) (*svcconfig.ProjectConfig, error) { | 199 func (s *prodServicesInst) ProjectConfig(c context.Context, project cfgtypes.Pro
jectName) (*svcconfig.ProjectConfig, error) { |
| 200 return s.getOrCreateCachedProjectConfig(project).resolve(c) | 200 return s.getOrCreateCachedProjectConfig(project).resolve(c) |
| 201 } | 201 } |
| 202 | 202 |
| 203 func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamSta
te) (Storage, error) { | 203 func (s *prodServicesInst) StorageForStream(c context.Context, lst *LogStreamSta
te) (Storage, error) { |
| 204 if !lst.ArchivalState().Archived() { | 204 if !lst.ArchivalState().Archived() { |
| 205 log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") | 205 log.Debugf(c, "Log is not archived. Fetching from intermediate s
torage.") |
| 206 return s.newBigTableStorage(c) | 206 return s.newBigTableStorage(c) |
| 207 } | 207 } |
| 208 | 208 |
| 209 log.Fields{ | 209 log.Fields{ |
| (...skipping 338 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 548 | 548 |
| 549 // Sign index URL. | 549 // Sign index URL. |
| 550 if req.Index { | 550 if req.Index { |
| 551 if resp.Index, err = doSign(si.index); err != nil { | 551 if resp.Index, err = doSign(si.index); err != nil { |
| 552 return nil, errors.Annotate(err).InternalReason("failed
to sign index URL").Err() | 552 return nil, errors.Annotate(err).InternalReason("failed
to sign index URL").Err() |
| 553 } | 553 } |
| 554 } | 554 } |
| 555 | 555 |
| 556 return &resp, nil | 556 return &resp, nil |
| 557 } | 557 } |
| OLD | NEW |