OLD | NEW |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package service | 5 package service |
6 | 6 |
7 import ( | 7 import ( |
8 "errors" | 8 "errors" |
9 "flag" | 9 "flag" |
10 "fmt" | 10 "fmt" |
11 "net/http" | 11 "net/http" |
12 "os" | 12 "os" |
13 "os/signal" | 13 "os/signal" |
14 "path/filepath" | 14 "path/filepath" |
15 "runtime/pprof" | 15 "runtime/pprof" |
16 "sync/atomic" | 16 "sync/atomic" |
| 17 "time" |
17 | 18 |
18 "github.com/luci/luci-go/client/authcli" | 19 "github.com/luci/luci-go/client/authcli" |
19 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 20 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
20 "github.com/luci/luci-go/common/auth" | 21 "github.com/luci/luci-go/common/auth" |
| 22 luciConfig "github.com/luci/luci-go/common/config" |
21 "github.com/luci/luci-go/common/gcloud/gs" | 23 "github.com/luci/luci-go/common/gcloud/gs" |
22 log "github.com/luci/luci-go/common/logging" | 24 log "github.com/luci/luci-go/common/logging" |
23 "github.com/luci/luci-go/common/logging/gologger" | 25 "github.com/luci/luci-go/common/logging/gologger" |
24 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 26 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
25 "github.com/luci/luci-go/common/prpc" | 27 "github.com/luci/luci-go/common/prpc" |
26 "github.com/luci/luci-go/common/tsmon" | 28 "github.com/luci/luci-go/common/tsmon" |
27 "github.com/luci/luci-go/common/tsmon/metric" | 29 "github.com/luci/luci-go/common/tsmon/metric" |
28 "github.com/luci/luci-go/common/tsmon/target" | 30 "github.com/luci/luci-go/common/tsmon/target" |
29 "github.com/luci/luci-go/server/internal/logdog/retryServicesClient" | 31 "github.com/luci/luci-go/server/internal/logdog/retryServicesClient" |
30 "github.com/luci/luci-go/server/internal/logdog/service/config" | 32 "github.com/luci/luci-go/server/internal/logdog/service/config" |
31 "github.com/luci/luci-go/server/logdog/storage" | 33 "github.com/luci/luci-go/server/logdog/storage" |
32 "github.com/luci/luci-go/server/logdog/storage/bigtable" | 34 "github.com/luci/luci-go/server/logdog/storage/bigtable" |
33 "golang.org/x/net/context" | 35 "golang.org/x/net/context" |
34 "google.golang.org/cloud" | 36 "google.golang.org/cloud" |
| 37 "google.golang.org/cloud/compute/metadata" |
35 ) | 38 ) |
36 | 39 |
37 var ( | 40 var ( |
38 // ErrInvalidConfig is an error that is returned when the supplied | 41 // ErrInvalidConfig is an error that is returned when the supplied |
39 // configuration is invalid. | 42 // configuration is invalid. |
40 ErrInvalidConfig = errors.New("invalid configuration") | 43 ErrInvalidConfig = errors.New("invalid configuration") |
41 | 44 |
42 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor | 45 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor |
43 // client. | 46 // client. |
44 CoordinatorScopes = []string{ | 47 CoordinatorScopes = []string{ |
45 auth.OAuthScopeEmail, | 48 auth.OAuthScopeEmail, |
46 } | 49 } |
47 | 50 |
48 presenceUpMetric = metric.NewBool("presence/up", | 51 presenceUpMetric = metric.NewBool("presence/up", |
49 "Set to one when service is present and ready. Alert on missing
values.") | 52 "Set to one when service is present and ready. Alert on missing
values.") |
50 ) | 53 ) |
51 | 54 |
| 55 // projectConfigCacheDuration is the amount of time to cache a project's |
| 56 // configuration before reloading. |
| 57 const projectConfigCacheDuration = 30 * time.Minute |
| 58 |
52 // Service is a base class full of common LogDog service application parameters. | 59 // Service is a base class full of common LogDog service application parameters. |
53 type Service struct { | 60 type Service struct { |
54 // Name is the name of this service. It is used for logging, metrics, an
d | 61 // Name is the name of this service. It is used for logging, metrics, an
d |
55 // user agent string generation. | 62 // user agent string generation. |
56 // | 63 // |
57 // If empty, a service name will be inferred from the command-line argum
ents. | 64 // If empty, a service name will be inferred from the command-line argum
ents. |
58 Name string | 65 Name string |
59 // Flags is the set of flags that will be used by the Service. | 66 // Flags is the set of flags that will be used by the Service. |
60 Flags flag.FlagSet | 67 Flags flag.FlagSet |
61 | 68 |
62 shutdownFunc atomic.Value | 69 shutdownFunc atomic.Value |
63 | 70 |
64 loggingFlags log.Config | 71 loggingFlags log.Config |
65 authFlags authcli.Flags | 72 authFlags authcli.Flags |
66 configFlags config.Flags | 73 configFlags config.Flags |
67 tsMonFlags tsmon.Flags | 74 tsMonFlags tsmon.Flags |
68 | 75 |
69 coordinatorHost string | 76 coordinatorHost string |
70 coordinatorInsecure bool | 77 coordinatorInsecure bool |
| 78 serviceID string |
71 storageCredentialJSONPath string | 79 storageCredentialJSONPath string |
72 cpuProfilePath string | 80 cpuProfilePath string |
73 heapProfilePath string | 81 heapProfilePath string |
74 | 82 |
75 coord logdog.ServicesClient | 83 coord logdog.ServicesClient |
76 config *config.Manager | 84 config *config.Manager |
| 85 |
| 86 onGCE bool |
77 } | 87 } |
78 | 88 |
79 // Run performs service-wide initialization and invokes the specified run | 89 // Run performs service-wide initialization and invokes the specified run |
80 // function. | 90 // function. |
81 func (s *Service) Run(c context.Context, f func(context.Context) error) { | 91 func (s *Service) Run(c context.Context, f func(context.Context) error) { |
82 c = gologger.StdConfig.Use(c) | 92 c = gologger.StdConfig.Use(c) |
83 | 93 |
84 // If a service name isn't specified, default to the base of the current | 94 // If a service name isn't specified, default to the base of the current |
85 // executable. | 95 // executable. |
86 if s.Name == "" { | 96 if s.Name == "" { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
134 | 144 |
135 if err := pprof.WriteHeapProfile(fd); err != nil { | 145 if err := pprof.WriteHeapProfile(fd); err != nil { |
136 log.Fields{ | 146 log.Fields{ |
137 log.ErrorKey: err, | 147 log.ErrorKey: err, |
138 "path": p, | 148 "path": p, |
139 }.Warningf(c, "Failed to write heap profile.") | 149 }.Warningf(c, "Failed to write heap profile.") |
140 } | 150 } |
141 }() | 151 }() |
142 } | 152 } |
143 | 153 |
| 154 // Are we running on a GCE intance? |
| 155 if err := s.probeGCEEnvironment(c); err != nil { |
| 156 log.WithError(err).Errorf(c, "Failed to probe GCE environment.") |
| 157 return err |
| 158 } |
| 159 |
| 160 // Validate the runtime environment. |
| 161 if s.serviceID == "" { |
| 162 return errors.New("no service ID was configured") |
| 163 } |
| 164 |
144 // Configure our signal handler. It will listen for terminating signals
and | 165 // Configure our signal handler. It will listen for terminating signals
and |
145 // issue a shutdown signal if one is received. | 166 // issue a shutdown signal if one is received. |
146 signalC := make(chan os.Signal) | 167 signalC := make(chan os.Signal) |
147 go func() { | 168 go func() { |
148 hasShutdownAlready := false | 169 hasShutdownAlready := false |
149 for sig := range signalC { | 170 for sig := range signalC { |
150 if !hasShutdownAlready { | 171 if !hasShutdownAlready { |
151 hasShutdownAlready = true | 172 hasShutdownAlready = true |
152 | 173 |
153 log.Warningf(log.SetField(c, "signal", sig), "Re
ceived close signal. Send again to terminate immediately.") | 174 log.Warningf(log.SetField(c, "signal", sig), "Re
ceived close signal. Send again to terminate immediately.") |
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
214 // Initialize tsmon flags. | 235 // Initialize tsmon flags. |
215 s.tsMonFlags = tsmon.NewFlags() | 236 s.tsMonFlags = tsmon.NewFlags() |
216 s.tsMonFlags.Flush = tsmon.FlushAuto | 237 s.tsMonFlags.Flush = tsmon.FlushAuto |
217 s.tsMonFlags.Target.TargetType = target.TaskType | 238 s.tsMonFlags.Target.TargetType = target.TaskType |
218 s.tsMonFlags.Target.TaskJobName = s.Name | 239 s.tsMonFlags.Target.TaskJobName = s.Name |
219 s.tsMonFlags.Register(fs) | 240 s.tsMonFlags.Register(fs) |
220 | 241 |
221 s.authFlags.Register(fs, auth.Options{}) | 242 s.authFlags.Register(fs, auth.Options{}) |
222 s.configFlags.AddToFlagSet(fs) | 243 s.configFlags.AddToFlagSet(fs) |
223 | 244 |
| 245 fs.StringVar(&s.serviceID, "service-id", "", |
| 246 "Specify the service ID that this instance is supporting. If emp
ty, the service ID "+ |
| 247 "will attempt to be resolved by probing the local enviro
nment. This probably will match the "+ |
| 248 "App ID of the Coordinator.") |
224 fs.StringVar(&s.coordinatorHost, "coordinator", "", | 249 fs.StringVar(&s.coordinatorHost, "coordinator", "", |
225 "The Coordinator service's [host][:port].") | 250 "The Coordinator service's [host][:port].") |
226 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, | 251 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, |
227 "Connect to Coordinator over HTTP (instead of HTTPS).") | 252 "Connect to Coordinator over HTTP (instead of HTTPS).") |
228 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", | 253 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", |
229 "If supplied, the path of a JSON credential file to load and use
for storage operations.") | 254 "If supplied, the path of a JSON credential file to load and use
for storage operations.") |
230 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", | 255 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", |
231 "If supplied, enable CPU profiling and write the profile here.") | 256 "If supplied, enable CPU profiling and write the profile here.") |
232 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", | 257 fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", |
233 "If supplied, enable CPU profiling and write the profile here.") | 258 "If supplied, enable CPU profiling and write the profile here.") |
234 } | 259 } |
235 | 260 |
| 261 // probeGCEEnvironment fills in any parameters that can be probed from Google |
| 262 // Compute Engine metadata. |
| 263 // |
| 264 // If we're not running on GCE, this will return nil. An error will only be |
| 265 // returned if an operation that is expected to work fails. |
| 266 func (s *Service) probeGCEEnvironment(c context.Context) error { |
| 267 s.onGCE = metadata.OnGCE() |
| 268 if !s.onGCE { |
| 269 return nil |
| 270 } |
| 271 |
| 272 // Determine our service ID from metadata. The service ID will equal the
cloud |
| 273 // project ID. |
| 274 if s.serviceID == "" { |
| 275 var err error |
| 276 if s.serviceID, err = metadata.ProjectID(); err != nil { |
| 277 log.WithError(err).Errorf(c, "Failed to probe GCE projec
t ID.") |
| 278 return err |
| 279 } |
| 280 } |
| 281 return nil |
| 282 } |
| 283 |
236 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { | 284 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { |
237 if s.coordinatorHost == "" { | 285 if s.coordinatorHost == "" { |
238 log.Errorf(c, "Missing Coordinator URL (-coordinator).") | 286 log.Errorf(c, "Missing Coordinator URL (-coordinator).") |
239 return nil, ErrInvalidConfig | 287 return nil, ErrInvalidConfig |
240 } | 288 } |
241 | 289 |
242 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { | 290 httpClient, err := s.AuthenticatedClient(c, func(o *auth.Options) { |
243 o.Scopes = CoordinatorScopes | 291 o.Scopes = CoordinatorScopes |
244 }) | 292 }) |
245 if err != nil { | 293 if err != nil { |
(...skipping 22 matching lines...) Expand all Loading... |
268 log.WithError(err).Errorf(c, "Failed to create config client.") | 316 log.WithError(err).Errorf(c, "Failed to create config client.") |
269 return nil, err | 317 return nil, err |
270 } | 318 } |
271 | 319 |
272 s.configFlags.RoundTripper = rt | 320 s.configFlags.RoundTripper = rt |
273 o, err := s.configFlags.CoordinatorOptions(c, s.coord) | 321 o, err := s.configFlags.CoordinatorOptions(c, s.coord) |
274 if err != nil { | 322 if err != nil { |
275 log.WithError(err).Errorf(c, "Failed to load configuration param
eters.") | 323 log.WithError(err).Errorf(c, "Failed to load configuration param
eters.") |
276 return nil, err | 324 return nil, err |
277 } | 325 } |
| 326 o.ServiceID = s.serviceID |
| 327 o.ProjectConfigCacheDuration = projectConfigCacheDuration |
278 o.KillFunc = s.shutdown | 328 o.KillFunc = s.shutdown |
279 | 329 |
280 return config.NewManager(c, *o) | 330 return config.NewManager(c, *o) |
281 } | 331 } |
282 | 332 |
283 // SetShutdownFunc sets the service shutdown function. | 333 // SetShutdownFunc sets the service shutdown function. |
284 func (s *Service) SetShutdownFunc(f func()) { | 334 func (s *Service) SetShutdownFunc(f func()) { |
285 s.shutdownFunc.Store(f) | 335 s.shutdownFunc.Store(f) |
286 } | 336 } |
287 | 337 |
288 func (s *Service) shutdown() { | 338 func (s *Service) shutdown() { |
289 v := s.shutdownFunc.Load() | 339 v := s.shutdownFunc.Load() |
290 if f, ok := v.(func()); ok { | 340 if f, ok := v.(func()); ok { |
291 f() | 341 f() |
292 } else { | 342 } else { |
293 s.shutdownImmediately() | 343 s.shutdownImmediately() |
294 } | 344 } |
295 } | 345 } |
296 | 346 |
297 func (s *Service) shutdownImmediately() { | 347 func (s *Service) shutdownImmediately() { |
298 os.Exit(1) | 348 os.Exit(1) |
299 } | 349 } |
300 | 350 |
301 // Config returns the cached service configuration. | 351 // Config returns the cached service configuration. |
302 func (s *Service) Config() *svcconfig.Config { | 352 func (s *Service) Config() *svcconfig.Config { |
303 return s.config.Config() | 353 return s.config.Config() |
304 } | 354 } |
305 | 355 |
| 356 // ProjectConfig returns the cached project configuration. |
| 357 // |
| 358 // If the project configuration is not available, nil will be returned. |
| 359 func (s *Service) ProjectConfig(c context.Context, proj luciConfig.ProjectName)
(*svcconfig.ProjectConfig, error) { |
| 360 return s.config.ProjectConfig(c, proj) |
| 361 } |
| 362 |
306 // Coordinator returns the cached Coordinator client. | 363 // Coordinator returns the cached Coordinator client. |
307 func (s *Service) Coordinator() logdog.ServicesClient { | 364 func (s *Service) Coordinator() logdog.ServicesClient { |
308 return s.coord | 365 return s.coord |
309 } | 366 } |
310 | 367 |
| 368 // ServiceID returns the service ID. |
| 369 // |
| 370 // This is synonymous with the cloud "project ID" and the AppEngine "app ID". |
| 371 func (s *Service) ServiceID() string { |
| 372 return s.serviceID |
| 373 } |
| 374 |
311 // IntermediateStorage instantiates the configured intermediate Storage | 375 // IntermediateStorage instantiates the configured intermediate Storage |
312 // instance. | 376 // instance. |
313 func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
) { | 377 func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error
) { |
314 cfg := s.config.Config() | 378 cfg := s.config.Config() |
315 if cfg.GetStorage() == nil { | 379 if cfg.GetStorage() == nil { |
316 log.Errorf(c, "Missing storage configuration.") | 380 log.Errorf(c, "Missing storage configuration.") |
317 return nil, ErrInvalidConfig | 381 return nil, ErrInvalidConfig |
318 } | 382 } |
319 | 383 |
320 btcfg := cfg.GetStorage().GetBigtable() | 384 btcfg := cfg.GetStorage().GetBigtable() |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
402 // | 466 // |
403 // An optional permutation functon can be provided to modify those Options | 467 // An optional permutation functon can be provided to modify those Options |
404 // before the Authenticator is created. | 468 // before the Authenticator is created. |
405 func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options)
) (*http.Client, error) { | 469 func (s *Service) AuthenticatedClient(c context.Context, f func(o *auth.Options)
) (*http.Client, error) { |
406 a, err := s.Authenticator(c, f) | 470 a, err := s.Authenticator(c, f) |
407 if err != nil { | 471 if err != nil { |
408 return nil, err | 472 return nil, err |
409 } | 473 } |
410 return a.Client() | 474 return a.Client() |
411 } | 475 } |
OLD | NEW |