Chromium Code Reviews| Index: logdog/server/service/service.go |
| diff --git a/logdog/server/service/service.go b/logdog/server/service/service.go |
| index 42c57dda01899930bb6b25fa0773c6dc17785ed2..c40480784ead406b0b2019ab6f5de3feca3b9974 100644 |
| --- a/logdog/server/service/service.go |
| +++ b/logdog/server/service/service.go |
| @@ -5,10 +5,10 @@ |
| package service |
| import ( |
| - "errors" |
| "flag" |
| "fmt" |
| "net/http" |
| + "net/url" |
| "os" |
| "os/signal" |
| "path/filepath" |
| @@ -18,10 +18,14 @@ import ( |
| "github.com/luci/luci-go/client/authcli" |
| "github.com/luci/luci-go/common/auth" |
| + "github.com/luci/luci-go/common/clock/clockflag" |
| + "github.com/luci/luci-go/common/config/impl/filesystem" |
| "github.com/luci/luci-go/common/data/caching/proccache" |
| + "github.com/luci/luci-go/common/errors" |
| "github.com/luci/luci-go/common/gcloud/gs" |
| log "github.com/luci/luci-go/common/logging" |
| "github.com/luci/luci-go/common/logging/gologger" |
| + "github.com/luci/luci-go/common/proto/google" |
| "github.com/luci/luci-go/common/tsmon" |
| "github.com/luci/luci-go/common/tsmon/target" |
| "github.com/luci/luci-go/grpc/prpc" |
| @@ -32,6 +36,10 @@ import ( |
| "github.com/luci/luci-go/logdog/server/retryServicesClient" |
| "github.com/luci/luci-go/logdog/server/service/config" |
| "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| + "github.com/luci/luci-go/luci_config/server/cfgclient" |
| + "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client" |
| + "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig" |
| + "github.com/luci/luci-go/luci_config/server/cfgclient/textproto" |
| "cloud.google.com/go/compute/metadata" |
| "golang.org/x/net/context" |
| @@ -53,7 +61,13 @@ var ( |
| // projectConfigCacheDuration is the amount of time to cache a project's |
| // configuration before reloading. |
| -const projectConfigCacheDuration = 30 * time.Minute |
| +const ( |
| + projectConfigCacheDuration = 30 * time.Minute |
| + |
| + // minAuthTokenLifetime is the amount of time that an access token has before |
| + // expiring. |
| + minAuthTokenLifetime = 2 * time.Minute |
| +) |
| // Service is a base class full of common LogDog service application parameters. |
| type Service struct { |
| @@ -69,20 +83,27 @@ type Service struct { |
| loggingFlags log.Config |
| authFlags authcli.Flags |
| - configFlags config.Flags |
| tsMonFlags tsmon.Flags |
| coordinatorHost string |
| coordinatorInsecure bool |
| - serviceID string |
| storageCredentialJSONPath string |
| cpuProfilePath string |
| heapProfilePath string |
| - coord logdog.ServicesClient |
| - config *config.Manager |
| - |
| + // onGCE is true if we're on GCE. We probe this exactly once. |
|
iannucci
2017/01/21 00:23:14
probe it... lazily? on init? from environ?
dnj
2017/01/21 01:37:31
Once during "Run". updated.
|
| onGCE bool |
| + |
| + killCheckInterval clockflag.Duration |
| + configFilePath string |
| + serviceConfig svcconfig.Config |
| + |
| + // serviceID is the cloud project ID, which is also this service's unique |
| + // ID. This can be specified by flag or, if on GCE, will automatically be |
| + // probed from metadata. |
| + serviceID string |
| + |
| + coord logdog.ServicesClient |
| } |
| // Run performs service-wide initialization and invokes the specified run |
| @@ -150,17 +171,14 @@ func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro |
| }() |
| } |
| - // Are we running on a GCE intance? |
| - if err := s.probeGCEEnvironment(c); err != nil { |
| - log.WithError(err).Errorf(c, "Failed to probe GCE environment.") |
| - return err |
| - } |
| - |
| // Validate the runtime environment. |
| if s.serviceID == "" { |
| - return errors.New("no service ID was configured") |
| + return errors.New("no service ID was configured (-service-id)") |
| } |
| + // Install a process-wide cache. |
| + c = proccache.Use(c, &proccache.Cache{}) |
| + |
| // Configure our signal handler. It will listen for terminating signals and |
| // issue a shutdown signal if one is received. |
| signalC := make(chan os.Signal) |
| @@ -196,24 +214,21 @@ func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro |
| // Initialize our Client instantiations. |
| var err error |
| - s.coord, err = s.initCoordinatorClient(c) |
| - if err != nil { |
| + if s.coord, err = s.initCoordinatorClient(c); err != nil { |
| log.WithError(err).Errorf(c, "Failed to setup Coordinator client.") |
| return err |
| } |
| - s.config, err = s.initConfig(c) |
| - if err != nil { |
| + // Initialize and install our config service and caching layers, and load our |
| + // initial service config. |
| + if err := s.initConfig(&c); err != nil { |
| log.WithError(err).Errorf(c, "Failed to setup configuration.") |
| return err |
| } |
| - defer s.config.Close() |
| + // Clear our shutdown function on termination. |
| defer s.SetShutdownFunc(nil) |
| - // Install a process-wide cache. |
| - c = proccache.Use(c, &proccache.Cache{}) |
| - |
| // Run main service function. |
| return f(c) |
| } |
| @@ -231,7 +246,6 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) { |
| s.tsMonFlags.Register(fs) |
| s.authFlags.Register(fs, auth.Options{}) |
| - s.configFlags.AddToFlagSet(fs) |
| fs.StringVar(&s.serviceID, "service-id", "", |
| "Specify the service ID that this instance is supporting. If empty, the service ID "+ |
| @@ -247,6 +261,10 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) { |
| "If supplied, enable CPU profiling and write the profile here.") |
| fs.StringVar(&s.heapProfilePath, "heap-profile-path", "", |
| "If supplied, enable CPU profiling and write the profile here.") |
| + fs.Var(&s.killCheckInterval, "config-kill-interval", |
| + "If non-zero, poll for configuration changes and kill the application if one is detected.") |
|
iannucci
2017/01/21 00:23:14
ah, got it... that's why it was killfunc before. C
dnj
2017/01/21 01:37:31
Done.
|
| + fs.StringVar(&s.configFilePath, "config-file-path", "", |
| + "If set, load configuration from a local filesystem rooted here.") |
|
iannucci
2017/01/21 00:23:13
for testing? or will this be used in production?
dnj
2017/01/21 01:37:31
test
|
| } |
| // probeGCEEnvironment fills in any parameters that can be probed from Google |
| @@ -301,24 +319,106 @@ func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien |
| return retryServicesClient.New(sc, nil), nil |
| } |
| -func (s *Service) initConfig(c context.Context) (*config.Manager, error) { |
| - rt, err := s.AuthenticatedTransport(c, nil) |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to create config client.") |
| - return nil, err |
| +func (s *Service) initConfig(c *context.Context) error { |
| + opts := config.CacheOptions{ |
| + CacheExpiration: projectConfigCacheDuration, |
| } |
| - s.configFlags.RoundTripper = rt |
| - o, err := s.configFlags.CoordinatorOptions(c, s.coord) |
| - if err != nil { |
| - log.WithError(err).Errorf(c, "Failed to load configuration parameters.") |
| - return nil, err |
| + // If a configFilePath was specified, use a mock configuration service that |
| + // loads from a local file. |
| + var p client.Provider |
| + if s.configFilePath == "" { |
| + ccfg, err := s.coord.GetConfig(*c, &google.Empty{}) |
| + if err != nil { |
| + return err |
| + } |
| + |
| + // Determine our config service host. |
| + // |
| + // Older Coordinator instances may provide the full URL instead of the host, |
| + // in which case we will extract the host from the URL. |
| + host := ccfg.ConfigServiceHost |
| + if host == "" { |
| + if ccfg.ConfigServiceUrl == "" { |
| + return errors.New("coordinator does not specify a config service") |
| + } |
| + u, err := url.Parse(ccfg.ConfigServiceUrl) |
| + if err != nil { |
| + return errors.Annotate(err).Reason("failed to parse config service URL").Err() |
| + } |
| + host = u.Host |
| + } |
| + |
| + if ccfg.ConfigSet == "" { |
| + return errors.New("coordinator does not specify a config set") |
| + } |
| + |
| + log.Fields{ |
| + "host": host, |
| + }.Debugf(*c, "Using remote configuration service client.") |
| + p = &client.RemoteProvider{ |
| + Host: host, |
| + } |
| + } else { |
| + // Test / Local: use filesystem config path. |
| + ci, err := filesystem.New(s.configFilePath) |
| + if err != nil { |
| + return err |
| + } |
| + p = &testconfig.Provider{Base: ci} |
| } |
| - o.ServiceID = s.serviceID |
| - o.ProjectConfigCacheDuration = projectConfigCacheDuration |
| - o.KillFunc = s.shutdown |
| - return config.NewManager(c, *o) |
| + // Add config caching layers. |
| + *c = opts.WrapBackend(*c, &client.Backend{ |
| + Provider: p, |
| + }) |
| + |
| + // Load our service configuration. |
| + var meta cfgclient.Meta |
| + cset, path := s.ServiceConfigPath() |
| + if err := cfgclient.Get(*c, cfgclient.AsService, cset, path, textproto.Message(&s.serviceConfig), &meta); err != nil { |
| + return errors.Annotate(err).Reason("failed to load service config").Err() |
| + } |
| + |
| + // Create a poller for our service config. |
| + if s.killCheckInterval > 0 { |
| + poller := config.ChangePoller{ |
| + ConfigSet: cset, |
| + Path: path, |
| + CheckInterval: time.Duration(s.killCheckInterval), |
| + OnChange: s.shutdown, |
|
iannucci
2017/01/21 00:23:14
os.Exit :p
*chants*
crash! crash! crash! crash! c
dnj
2017/01/21 01:37:31
I want to flush :(
|
| + ContentHash: meta.ContentHash, |
| + } |
| + go poller.Run(*c) |
| + } |
| + return nil |
| +} |
| + |
| +// ServiceConfigPath returns the ConfigSet and path to the current service's |
| +// configuration. |
| +func (s *Service) ServiceConfigPath() (cfgtypes.ConfigSet, string) { |
| + return cfgtypes.ServiceConfigSet(s.serviceID), svcconfig.ServiceConfigPath |
| +} |
| + |
| +// ServiceConfig returns the configuration data for the current service. |
| +func (s *Service) ServiceConfig() *svcconfig.Config { return &s.serviceConfig } |
| + |
| +// ProjectConfigPath returns the ConfigSet and path to the current service's |
| +// project configuration for proj. |
| +func (s *Service) ProjectConfigPath(proj cfgtypes.ProjectName) (cfgtypes.ConfigSet, string) { |
| + return cfgtypes.ProjectConfigSet(proj), svcconfig.ProjectConfigPath(s.serviceID) |
| +} |
| + |
| +// ProjectConfig returns the current service's project configuration for proj. |
| +func (s *Service) ProjectConfig(c context.Context, proj cfgtypes.ProjectName) (*svcconfig.ProjectConfig, error) { |
| + cset, path := s.ProjectConfigPath(proj) |
| + |
| + var pcfg svcconfig.ProjectConfig |
| + if err := cfgclient.Get(c, cfgclient.AsService, cset, path, textproto.Message(&pcfg), nil); err != nil { |
| + return nil, errors.Annotate(err).Reason("failed to load project config from %(cset)s.%(path)s"). |
| + D("cset", cset).D("path", path).Err() |
| + } |
| + return &pcfg, nil |
| } |
| // SetShutdownFunc sets the service shutdown function. |
| @@ -339,18 +439,6 @@ func (s *Service) shutdownImmediately() { |
| os.Exit(1) |
| } |
| -// Config returns the cached service configuration. |
| -func (s *Service) Config() *svcconfig.Config { |
| - return s.config.Config() |
| -} |
| - |
| -// ProjectConfig returns the cached project configuration. |
| -// |
| -// If the project configuration is not available, nil will be returned. |
| -func (s *Service) ProjectConfig(c context.Context, proj cfgtypes.ProjectName) (*svcconfig.ProjectConfig, error) { |
| - return s.config.ProjectConfig(c, proj) |
| -} |
| - |
| // Coordinator returns the cached Coordinator client. |
| func (s *Service) Coordinator() logdog.ServicesClient { |
| return s.coord |
| @@ -366,7 +454,7 @@ func (s *Service) ServiceID() string { |
| // IntermediateStorage instantiates the configured intermediate Storage |
| // instance. |
| func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error) { |
| - cfg := s.config.Config() |
| + cfg := s.ServiceConfig() |
| if cfg.GetStorage() == nil { |
| log.Errorf(c, "Missing storage configuration.") |
| return nil, ErrInvalidConfig |