OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package coordinator | 5 package coordinator |
6 | 6 |
7 import ( | 7 import ( |
8 "net/http" | 8 "net/http" |
9 "sync" | 9 "sync" |
10 "sync/atomic" | 10 "sync/atomic" |
11 | 11 |
12 "github.com/julienschmidt/httprouter" | 12 "github.com/julienschmidt/httprouter" |
13 gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client" | 13 gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client" |
14 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | 14 "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| 15 luciConfig "github.com/luci/luci-go/common/config" |
15 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
16 "github.com/luci/luci-go/common/gcloud/gs" | 17 "github.com/luci/luci-go/common/gcloud/gs" |
17 "github.com/luci/luci-go/common/gcloud/pubsub" | 18 "github.com/luci/luci-go/common/gcloud/pubsub" |
18 log "github.com/luci/luci-go/common/logging" | 19 log "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
19 "github.com/luci/luci-go/server/logdog/storage" | 21 "github.com/luci/luci-go/server/logdog/storage" |
20 "github.com/luci/luci-go/server/logdog/storage/bigtable" | 22 "github.com/luci/luci-go/server/logdog/storage/bigtable" |
21 "github.com/luci/luci-go/server/middleware" | 23 "github.com/luci/luci-go/server/middleware" |
22 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
23 "google.golang.org/cloud" | 25 "google.golang.org/cloud" |
24 gcps "google.golang.org/cloud/pubsub" | 26 gcps "google.golang.org/cloud/pubsub" |
25 "google.golang.org/grpc/metadata" | 27 "google.golang.org/grpc/metadata" |
26 ) | 28 ) |
27 | 29 |
28 // Services is a set of support services used by Coordinator. | 30 // Services is a set of support services used by Coordinator. |
29 // | 31 // |
30 // Each Services instance is valid for a singel request, but can be re-used | 32 // Each Services instance is valid for a singel request, but can be re-used |
31 // throughout that request. This is advised, as the Services instance may | 33 // throughout that request. This is advised, as the Services instance may |
32 // optionally cache values. | 34 // optionally cache values. |
33 // | 35 // |
34 // Services methods are goroutine-safe. | 36 // Services methods are goroutine-safe. |
35 // | 37 // |
36 // By default, a production set of services will be used. However, this can be | 38 // By default, a production set of services will be used. However, this can be |
37 // overridden for testing to mock the service layer. | 39 // overridden for testing to mock the service layer. |
38 type Services interface { | 40 type Services interface { |
39 // Config returns the current instance and application configuration | 41 // Config returns the current instance and application configuration |
40 // instances. | 42 // instances. |
41 // | 43 // |
42 // The production instance will cache the results for the duration of th
e | 44 // The production instance will cache the results for the duration of th
e |
43 // request. | 45 // request. |
44 Config(context.Context) (*config.Config, error) | 46 Config(context.Context) (*config.Config, error) |
45 | 47 |
| 48 // ProjectConfig returns the project configuration for the named project
. |
| 49 // |
| 50 // The production instance will cache the results for the duration of th
e |
| 51 // request. |
| 52 ProjectConfig(context.Context, luciConfig.ProjectName) (*svcconfig.Proje
ctConfig, error) |
| 53 |
46 // Storage returns an intermediate storage instance for use by this serv
ice. | 54 // Storage returns an intermediate storage instance for use by this serv
ice. |
47 // | 55 // |
48 // The caller must close the returned instance if successful. | 56 // The caller must close the returned instance if successful. |
49 IntermediateStorage(context.Context) (storage.Storage, error) | 57 IntermediateStorage(context.Context) (storage.Storage, error) |
50 | 58 |
51 // GSClient instantiates a Google Storage client. | 59 // GSClient instantiates a Google Storage client. |
52 GSClient(context.Context) (gs.Client, error) | 60 GSClient(context.Context) (gs.Client, error) |
53 | 61 |
54 // ArchivalPublisher returns an ArchivalPublisher instance. | 62 // ArchivalPublisher returns an ArchivalPublisher instance. |
55 ArchivalPublisher(context.Context) (ArchivalPublisher, error) | 63 ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
(...skipping 13 matching lines...) Expand all Loading... |
69 func UseProdServices(c context.Context) context.Context { | 77 func UseProdServices(c context.Context) context.Context { |
70 return WithServices(c, &prodServicesInst{}) | 78 return WithServices(c, &prodServicesInst{}) |
71 } | 79 } |
72 | 80 |
73 // prodServicesInst is a Service exposing production faciliites. A unique | 81 // prodServicesInst is a Service exposing production faciliites. A unique |
74 // instance is bound to each each request. | 82 // instance is bound to each each request. |
75 type prodServicesInst struct { | 83 type prodServicesInst struct { |
76 sync.Mutex | 84 sync.Mutex |
77 | 85 |
78 // gcfg is the cached global configuration. | 86 // gcfg is the cached global configuration. |
79 » gcfg *config.Config | 87 » gcfg *config.Config |
| 88 » projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig |
80 | 89 |
81 // archivalIndex is the atomically-manipulated archival index for the | 90 // archivalIndex is the atomically-manipulated archival index for the |
82 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces | 91 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces |
83 // from this service. | 92 // from this service. |
84 archivalIndex int32 | 93 archivalIndex int32 |
85 } | 94 } |
86 | 95 |
87 // Config returns the current instance and application configuration instances. | |
88 // | |
89 // After a success, successive calls will return a cached result. | |
90 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { | 96 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { |
91 s.Lock() | 97 s.Lock() |
92 defer s.Unlock() | 98 defer s.Unlock() |
93 | 99 |
94 // Load/cache the global config. | 100 // Load/cache the global config. |
95 if s.gcfg == nil { | 101 if s.gcfg == nil { |
96 var err error | 102 var err error |
97 s.gcfg, err = config.Load(c) | 103 s.gcfg, err = config.Load(c) |
98 if err != nil { | 104 if err != nil { |
99 return nil, err | 105 return nil, err |
100 } | 106 } |
101 } | 107 } |
102 | 108 |
103 return s.gcfg, nil | 109 return s.gcfg, nil |
104 } | 110 } |
105 | 111 |
| 112 // cachedProjectConfig is a singleton instance that holds a project config |
| 113 // state. It is populated when resolve is called, and is goroutine-safe for |
| 114 // read-only operations. |
| 115 type cachedProjectConfig struct { |
| 116 sync.Once |
| 117 |
| 118 project luciConfig.ProjectName |
| 119 pcfg *svcconfig.ProjectConfig |
| 120 err error |
| 121 } |
| 122 |
| 123 func (cp *cachedProjectConfig) resolve(c context.Context) (*svcconfig.ProjectCon
fig, error) { |
| 124 // Load the project config exactly once. This will be cached for the rem
ainder |
| 125 // of this request. |
| 126 // |
| 127 // If multiple goroutines attempt to load it, exactly one will, and the
rest |
| 128 // will block. All operations after this Once must be read-only. |
| 129 cp.Do(func() { |
| 130 cp.pcfg, cp.err = config.ProjectConfig(c, cp.project) |
| 131 }) |
| 132 return cp.pcfg, cp.err |
| 133 } |
| 134 |
| 135 func (s *prodServicesInst) getOrCreateCachedProjectConfig(project luciConfig.Pro
jectName) *cachedProjectConfig { |
| 136 s.Lock() |
| 137 defer s.Unlock() |
| 138 |
| 139 if s.projectConfigs == nil { |
| 140 s.projectConfigs = make(map[luciConfig.ProjectName]*cachedProjec
tConfig) |
| 141 } |
| 142 cp := s.projectConfigs[project] |
| 143 if cp == nil { |
| 144 cp = &cachedProjectConfig{ |
| 145 project: project, |
| 146 } |
| 147 s.projectConfigs[project] = cp |
| 148 } |
| 149 return cp |
| 150 } |
| 151 |
| 152 func (s *prodServicesInst) ProjectConfig(c context.Context, project luciConfig.P
rojectName) (*svcconfig.ProjectConfig, error) { |
| 153 return s.getOrCreateCachedProjectConfig(project).resolve(c) |
| 154 } |
| 155 |
106 func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora
ge, error) { | 156 func (s *prodServicesInst) IntermediateStorage(c context.Context) (storage.Stora
ge, error) { |
107 cfg, err := s.Config(c) | 157 cfg, err := s.Config(c) |
108 if err != nil { | 158 if err != nil { |
109 return nil, err | 159 return nil, err |
110 } | 160 } |
111 | 161 |
112 // Is BigTable configured? | 162 // Is BigTable configured? |
113 if cfg.Storage == nil { | 163 if cfg.Storage == nil { |
114 return nil, errors.New("no storage configuration") | 164 return nil, errors.New("no storage configuration") |
115 } | 165 } |
(...skipping 114 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
230 // value as a sentinel that the archival index has wrapped. | 280 // value as a sentinel that the archival index has wrapped. |
231 // | 281 // |
232 // This is reasonable, as it is very unlikely that a single request will
issue | 282 // This is reasonable, as it is very unlikely that a single request will
issue |
233 // more than MaxInt32 archival tasks. | 283 // more than MaxInt32 archival tasks. |
234 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 284 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
235 if v < 0 { | 285 if v < 0 { |
236 panic("archival index has wrapped") | 286 panic("archival index has wrapped") |
237 } | 287 } |
238 return uint64(v) | 288 return uint64(v) |
239 } | 289 } |
OLD | NEW |