OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package service | 5 package service |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "flag" | 9 "flag" |
10 "fmt" | 10 "fmt" |
11 "net/http" | 11 "net/http" |
12 "os" | 12 "os" |
13 "os/signal" | 13 "os/signal" |
14 "path/filepath" | 14 "path/filepath" |
15 "runtime/pprof" | 15 "runtime/pprof" |
16 "sync/atomic" | 16 "sync/atomic" |
17 "time" | |
17 | 18 |
18 "github.com/luci/luci-go/client/authcli" | 19 "github.com/luci/luci-go/client/authcli" |
19 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 20 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
20 "github.com/luci/luci-go/common/auth" | 21 "github.com/luci/luci-go/common/auth" |
22 luciConfig "github.com/luci/luci-go/common/config" | |
21 "github.com/luci/luci-go/common/gcloud/gs" | 23 "github.com/luci/luci-go/common/gcloud/gs" |
22 log "github.com/luci/luci-go/common/logging" | 24 log "github.com/luci/luci-go/common/logging" |
23 "github.com/luci/luci-go/common/logging/gologger" | 25 "github.com/luci/luci-go/common/logging/gologger" |
24 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 26 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
25 "github.com/luci/luci-go/common/prpc" | 27 "github.com/luci/luci-go/common/prpc" |
26 "github.com/luci/luci-go/common/tsmon" | 28 "github.com/luci/luci-go/common/tsmon" |
27 "github.com/luci/luci-go/common/tsmon/metric" | 29 "github.com/luci/luci-go/common/tsmon/metric" |
28 "github.com/luci/luci-go/common/tsmon/target" | 30 "github.com/luci/luci-go/common/tsmon/target" |
29 "github.com/luci/luci-go/server/internal/logdog/retryServicesClient" | 31 "github.com/luci/luci-go/server/internal/logdog/retryServicesClient" |
30 "github.com/luci/luci-go/server/internal/logdog/service/config" | 32 "github.com/luci/luci-go/server/internal/logdog/service/config" |
31 "github.com/luci/luci-go/server/logdog/storage" | 33 "github.com/luci/luci-go/server/logdog/storage" |
32 "github.com/luci/luci-go/server/logdog/storage/bigtable" | 34 "github.com/luci/luci-go/server/logdog/storage/bigtable" |
33 "golang.org/x/net/context" | 35 "golang.org/x/net/context" |
34 "google.golang.org/cloud" | 36 "google.golang.org/cloud" |
37 "google.golang.org/cloud/compute/metadata" | |
35 ) | 38 ) |
36 | 39 |
37 var ( | 40 var ( |
38 // ErrInvalidConfig is an error that is returned when the supplied | 41 // ErrInvalidConfig is an error that is returned when the supplied |
39 // configuration is invalid. | 42 // configuration is invalid. |
40 ErrInvalidConfig = errors.New("invalid configuration") | 43 ErrInvalidConfig = errors.New("invalid configuration") |
41 | 44 |
42 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina tor | 45 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina tor |
43 // client. | 46 // client. |
44 CoordinatorScopes = []string{ | 47 CoordinatorScopes = []string{ |
45 auth.OAuthScopeEmail, | 48 auth.OAuthScopeEmail, |
46 } | 49 } |
47 | 50 |
48 presenceUpMetric = metric.NewBool("presence/up", | 51 presenceUpMetric = metric.NewBool("presence/up", |
49 "Set to one when service is present and ready. Alert on missing values.") | 52 "Set to one when service is present and ready. Alert on missing values.") |
50 ) | 53 ) |
51 | 54 |
55 // projectConfigCacheDuration is the amount of time to cache a project's | |
56 // configuration before reloading. | |
57 const projectConfigCacheDuration = 30 * time.Minute | |
58 | |
52 // Service is a base class full of common LogDog service application parameters. | 59 // Service is a base class full of common LogDog service application parameters. |
53 type Service struct { | 60 type Service struct { |
54 // Name is the name of this service. It is used for logging, metrics, an d | 61 // Name is the name of this service. It is used for logging, metrics, an d |
55 // user agent string generation. | 62 // user agent string generation. |
56 // | 63 // |
57 // If empty, a service name will be inferred from the command-line argum ents. | 64 // If empty, a service name will be inferred from the command-line argum ents. |
58 Name string | 65 Name string |
59 // Flags is the set of flags that will be used by the Service. | 66 // Flags is the set of flags that will be used by the Service. |
60 Flags flag.FlagSet | 67 Flags flag.FlagSet |
61 | 68 |
62 shutdownFunc atomic.Value | 69 shutdownFunc atomic.Value |
63 | 70 |
64 loggingFlags log.Config | 71 loggingFlags log.Config |
65 authFlags authcli.Flags | 72 authFlags authcli.Flags |
66 configFlags config.Flags | 73 configFlags config.Flags |
67 tsMonFlags tsmon.Flags | 74 tsMonFlags tsmon.Flags |
68 | 75 |
69 coordinatorHost string | 76 coordinatorHost string |
70 coordinatorInsecure bool | 77 coordinatorInsecure bool |
78 projectID string | |
71 storageCredentialJSONPath string | 79 storageCredentialJSONPath string |
72 cpuProfilePath string | 80 cpuProfilePath string |
73 heapProfilePath string | 81 heapProfilePath string |
74 | 82 |
75 coord logdog.ServicesClient | 83 coord logdog.ServicesClient |
76 config *config.Manager | 84 config *config.Manager |
85 | |
86 onGCE bool | |
77 } | 87 } |
78 | 88 |
79 // Run performs service-wide initialization and invokes the specified run | 89 // Run performs service-wide initialization and invokes the specified run |
80 // function. | 90 // function. |
81 func (s *Service) Run(c context.Context, f func(context.Context) error) { | 91 func (s *Service) Run(c context.Context, f func(context.Context) error) { |
82 c = gologger.StdConfig.Use(c) | 92 c = gologger.StdConfig.Use(c) |
83 | 93 |
84 // If a service name isn't specified, default to the base of the current | 94 // If a service name isn't specified, default to the base of the current |
85 // executable. | 95 // executable. |
86 if s.Name == "" { | 96 if s.Name == "" { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
134 | 144 |
135 if err := pprof.WriteHeapProfile(fd); err != nil { | 145 if err := pprof.WriteHeapProfile(fd); err != nil { |
136 log.Fields{ | 146 log.Fields{ |
137 log.ErrorKey: err, | 147 log.ErrorKey: err, |
138 "path": p, | 148 "path": p, |
139 }.Warningf(c, "Failed to write heap profile.") | 149 }.Warningf(c, "Failed to write heap profile.") |
140 } | 150 } |
141 }() | 151 }() |
142 } | 152 } |
143 | 153 |
154 // Are we running on a GCE intance? | |
155 if err := s.probeGCEEnvironment(c); err != nil { | |
156 log.WithError(err).Errorf(c, "Failed to probe GCE environment.") | |
157 return err | |
158 } | |
159 | |
160 // Validate the runtime environment. | |
161 if s.projectID == "" { | |
162 return errors.New("no project ID was configured") | |
163 } | |
164 | |
144 // Configure our signal handler. It will listen for terminating signals and | 165 // Configure our signal handler. It will listen for terminating signals and |
145 // issue a shutdown signal if one is received. | 166 // issue a shutdown signal if one is received. |
146 signalC := make(chan os.Signal) | 167 signalC := make(chan os.Signal) |
147 go func() { | 168 go func() { |
148 hasShutdownAlready := false | 169 hasShutdownAlready := false |
149 for sig := range signalC { | 170 for sig := range signalC { |
150 if !hasShutdownAlready { | 171 if !hasShutdownAlready { |
151 hasShutdownAlready = true | 172 hasShutdownAlready = true |
152 | 173 |
153 log.Warningf(log.SetField(c, "signal", sig), "Re ceived close signal. Send again to terminate immediately.") | 174 log.Warningf(log.SetField(c, "signal", sig), "Re ceived close signal. Send again to terminate immediately.") |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
214 // Initialize tsmon flags. | 235 // Initialize tsmon flags. |
215 s.tsMonFlags = tsmon.NewFlags() | 236 s.tsMonFlags = tsmon.NewFlags() |
216 s.tsMonFlags.Flush = tsmon.FlushAuto | 237 s.tsMonFlags.Flush = tsmon.FlushAuto |
217 s.tsMonFlags.Target.TargetType = target.TaskType | 238 s.tsMonFlags.Target.TargetType = target.TaskType |
218 s.tsMonFlags.Target.TaskJobName = s.Name | 239 s.tsMonFlags.Target.TaskJobName = s.Name |
219 s.tsMonFlags.Register(fs) | 240 s.tsMonFlags.Register(fs) |
220 | 241 |
221 s.authFlags.Register(fs, auth.Options{}) | 242 s.authFlags.Register(fs, auth.Options{}) |
222 s.configFlags.AddToFlagSet(fs) | 243 s.configFlags.AddToFlagSet(fs) |
223 | 244 |
245 fs.StringVar(&s.projectID, "project-id", "", | |
246 "Specify the cloud project ID that this instance is supporting. If empty, the project ID "+ | |
nodir
2016/05/18 16:25:51
Please rename to "app-id". Gcp SDK calls it "app"
dnj (Google)
2016/05/18 16:50:31
I'm using the luci-config concept here, not the GA
nodir
2016/05/18 17:02:19
Then why it says cloud project and why this flag i
| |
247 "will attempt to be resolved by probing the local enviro nment. This probably will match the "+ | |
248 "App ID of the Coordinator.") | |
224 fs.StringVar(&s.coordinatorHost, "coordinator", "", | 249 fs.StringVar(&s.coordinatorHost, "coordinator", "", |
225 "The Coordinator service's [host][:port].") | 250 "The Coordinator service's [host][:port].") |
226 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, | 251 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, |
227 "Connect to Coordinator over HTTP (instead of HTTPS).") | 252 "Connect to Coordinator over HTTP (instead of HTTPS).") |
228 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path ", "", | 253 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path ", "", |
229 "If supplied, the path of a JSON credential file to load and use for storage operations.") | 254 "If supplied, the path of a JSON credential file to load and use for storage operations.") |
230 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", | 255 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", |
231 "If supplied, enable CPU profiling and write the profile here.") | 256 "If supplied, enable CPU profiling and write the profile here.") |
232 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", | 257 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", |
233 "If supplied, enable CPU profiling and write the profile here.") | 258 "If supplied, enable CPU profiling and write the profile here.") |
234 } | 259 } |
235 | 260 |
261 // probeGCEEnvironment fills in any parameters that can be probed from Google | |
262 // Compute Engine metadata. | |
263 // | |
264 // If we're not running on GCE, this will return nil. An error will only be | |
265 // returned if an operation that is expected to work fails. | |
266 func (s *Service) probeGCEEnvironment(c context.Context) error { | |
267 s.onGCE = metadata.OnGCE() | |
268 if !s.onGCE { | |
269 return nil | |
270 } | |
271 | |
272 // Determine our project ID. | |
273 if s.projectID == "" { | |
274 var err error | |
275 if s.projectID, err = metadata.ProjectID(); err != nil { | |
276 log.WithError(err).Errorf(c, "Failed to probe GCE projec t ID.") | |
277 return err | |
278 } | |
279 } | |
280 return nil | |
281 } | |
282 | |
236 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien t, error) { | 283 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien t, error) { |
237 if s.coordinatorHost == "" { | 284 if s.coordinatorHost == "" { |
238 log.Errorf(c, "Missing Coordinator URL (-coordinator).") | 285 log.Errorf(c, "Missing Coordinator URL (-coordinator).") |
239 return nil, ErrInvalidConfig | 286 return nil, ErrInvalidConfig |
240 } | 287 } |
241 | 288 |
242 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { | 289 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { |
243 o.Scopes = CoordinatorScopes | 290 o.Scopes = CoordinatorScopes |
244 }) | 291 }) |
245 if err != nil { | 292 if err != nil { |
(...skipping 22 matching lines...) Expand all Loading... | |
268 log.WithError(err).Errorf(c, "Failed to create config client.") | 315 log.WithError(err).Errorf(c, "Failed to create config client.") |
269 return nil, err | 316 return nil, err |
270 } | 317 } |
271 | 318 |
272 s.configFlags.RoundTripper = rt | 319 s.configFlags.RoundTripper = rt |
273 o, err := s.configFlags.CoordinatorOptions(c, s.coord) | 320 o, err := s.configFlags.CoordinatorOptions(c, s.coord) |
274 if err != nil { | 321 if err != nil { |
275 log.WithError(err).Errorf(c, "Failed to load configuration param eters.") | 322 log.WithError(err).Errorf(c, "Failed to load configuration param eters.") |
276 return nil, err | 323 return nil, err |
277 } | 324 } |
325 o.ProjectID = s.projectID | |
326 o.ProjectConfigCacheDuration = projectConfigCacheDuration | |
278 o.KillFunc = s.shutdown | 327 o.KillFunc = s.shutdown |
279 | 328 |
280 return config.NewManager(c, *o) | 329 return config.NewManager(c, *o) |
281 } | 330 } |
282 | 331 |
283 // SetShutdownFunc sets the service shutdown function. | 332 // SetShutdownFunc sets the service shutdown function. |
284 func (s *Service) SetShutdownFunc(f func()) { | 333 func (s *Service) SetShutdownFunc(f func()) { |
285 s.shutdownFunc.Store(f) | 334 s.shutdownFunc.Store(f) |
286 } | 335 } |
287 | 336 |
288 func (s *Service) shutdown() { | 337 func (s *Service) shutdown() { |
289 v := s.shutdownFunc.Load() | 338 v := s.shutdownFunc.Load() |
290 if f, ok := v.(func()); ok { | 339 if f, ok := v.(func()); ok { |
291 f() | 340 f() |
292 } else { | 341 } else { |
293 s.shutdownImmediately() | 342 s.shutdownImmediately() |
294 } | 343 } |
295 } | 344 } |
296 | 345 |
297 func (s *Service) shutdownImmediately() { | 346 func (s *Service) shutdownImmediately() { |
298 os.Exit(1) | 347 os.Exit(1) |
299 } | 348 } |
300 | 349 |
301 // Config returns the cached service configuration. | 350 // Config returns the cached service configuration. |
302 func (s *Service) Config() *svcconfig.Config { | 351 func (s *Service) Config() *svcconfig.Config { |
303 return s.config.Config() | 352 return s.config.Config() |
304 } | 353 } |
305 | 354 |
355 // ProjectConfig returns the cached project configuration. | |
356 // | |
357 // If the project configuration is not available, nil will be returned. | |
358 func (s *Service) ProjectConfig(c context.Context, proj luciConfig.ProjectName) (*svcconfig.ProjectConfig, error) { | |
359 return s.config.ProjectConfig(c, proj) | |
360 } | |
361 | |
306 // Coordinator returns the cached Coordinator client. | 362 // Coordinator returns the cached Coordinator client. |
307 func (s *Service) Coordinator() logdog.ServicesClient { | 363 func (s *Service) Coordinator() logdog.ServicesClient { |
308 return s.coord | 364 return s.coord |
309 } | 365 } |
310 | 366 |
367 // ProjectID returns the project ID. | |
368 // | |
369 // This is synonymous with the AppEngine "app ID". | |
370 func (s *Service) ProjectID() string { | |
371 return s.projectID | |
372 } | |
373 | |
311 // IntermediateStorage instantiates the configured intermediate Storage | 374 // IntermediateStorage instantiates the configured intermediate Storage |
312 // instance. | 375 // instance. |
313 func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error ) { | 376 func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error ) { |
314 cfg := s.config.Config() | 377 cfg := s.config.Config() |
315 if cfg.GetStorage() == nil { | 378 if cfg.GetStorage() == nil { |
316 log.Errorf(c, "Missing storage configuration.") | 379 log.Errorf(c, "Missing storage configuration.") |
317 return nil, ErrInvalidConfig | 380 return nil, ErrInvalidConfig |
318 } | 381 } |
319 | 382 |
320 btcfg := cfg.GetStorage().GetBigtable() | 383 btcfg := cfg.GetStorage().GetBigtable() |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
402 // | 465 // |
403 // An optional permutation functon can be provided to modify those Options | 466 // An optional permutation functon can be provided to modify those Options |
404 // before the Authenticator is created. | 467 // before the Authenticator is created. |
405 func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options) ) (*http.Client, error) { | 468 func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options) ) (*http.Client, error) { |
406 a, err := s.Authenticator(c, f) | 469 a, err := s.Authenticator(c, f) |
407 if err != nil { | 470 if err != nil { |
408 return nil, err | 471 return nil, err |
409 } | 472 } |
410 return a.Client() | 473 return a.Client() |
411 } | 474 } |
OLD | NEW |