| Index: logdog/server/service/service.go
|
| diff --git a/logdog/server/service/service.go b/logdog/server/service/service.go
|
| index 42c57dda01899930bb6b25fa0773c6dc17785ed2..5746fceadb051214ca22415c5e1bc54c2b9becc7 100644
|
| --- a/logdog/server/service/service.go
|
| +++ b/logdog/server/service/service.go
|
| @@ -5,10 +5,10 @@
|
| package service
|
|
|
| import (
|
| - "errors"
|
| "flag"
|
| "fmt"
|
| "net/http"
|
| + "net/url"
|
| "os"
|
| "os/signal"
|
| "path/filepath"
|
| @@ -18,10 +18,14 @@ import (
|
|
|
| "github.com/luci/luci-go/client/authcli"
|
| "github.com/luci/luci-go/common/auth"
|
| + "github.com/luci/luci-go/common/clock/clockflag"
|
| + "github.com/luci/luci-go/common/config/impl/filesystem"
|
| "github.com/luci/luci-go/common/data/caching/proccache"
|
| + "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/gcloud/gs"
|
| log "github.com/luci/luci-go/common/logging"
|
| "github.com/luci/luci-go/common/logging/gologger"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| "github.com/luci/luci-go/common/tsmon"
|
| "github.com/luci/luci-go/common/tsmon/target"
|
| "github.com/luci/luci-go/grpc/prpc"
|
| @@ -32,6 +36,10 @@ import (
|
| "github.com/luci/luci-go/logdog/server/retryServicesClient"
|
| "github.com/luci/luci-go/logdog/server/service/config"
|
| "github.com/luci/luci-go/luci_config/common/cfgtypes"
|
| + "github.com/luci/luci-go/luci_config/server/cfgclient"
|
| + "github.com/luci/luci-go/luci_config/server/cfgclient/backend/client"
|
| + "github.com/luci/luci-go/luci_config/server/cfgclient/backend/testconfig"
|
| + "github.com/luci/luci-go/luci_config/server/cfgclient/textproto"
|
|
|
| "cloud.google.com/go/compute/metadata"
|
| "golang.org/x/net/context"
|
| @@ -53,7 +61,13 @@ var (
|
|
|
| // projectConfigCacheDuration is the amount of time to cache a project's
|
| // configuration before reloading.
|
| -const projectConfigCacheDuration = 30 * time.Minute
|
| +const (
|
| + projectConfigCacheDuration = 30 * time.Minute
|
| +
|
| + // minAuthTokenLifetime is the amount of time that an access token has before
|
| + // expiring.
|
| + minAuthTokenLifetime = 2 * time.Minute
|
| +)
|
|
|
| // Service is a base class full of common LogDog service application parameters.
|
| type Service struct {
|
| @@ -69,20 +83,38 @@ type Service struct {
|
|
|
| loggingFlags log.Config
|
| authFlags authcli.Flags
|
| - configFlags config.Flags
|
| tsMonFlags tsmon.Flags
|
|
|
| coordinatorHost string
|
| coordinatorInsecure bool
|
| - serviceID string
|
| storageCredentialJSONPath string
|
| cpuProfilePath string
|
| heapProfilePath string
|
|
|
| - coord logdog.ServicesClient
|
| - config *config.Manager
|
| -
|
| + // onGCE is true if we're on GCE. We probe this once during Run.
|
| onGCE bool
|
| +
|
| + // killCheckInterval is the amount of time in between service configuration
|
| + // checks. If set, this service will periodically reload its service
|
| + // configuration. If that configuration has changed, the service will kill
|
| + // itself.
|
| + //
|
| + // Since, in production, this is running under an execution harness such as
|
| + // Kubernetes, the service will restart and load the new configuration. This
|
| + // is easier than implementing in-process configuration updating.
|
| + killCheckInterval clockflag.Duration
|
| + // testConfigFilePath is the path to a local configuration service filesystem
|
| + // (impl/filesystem) root. This is used for testing.
|
| + testConfigFilePath string
|
| + // serviceConfig is the cached service configuration.
|
| + serviceConfig svcconfig.Config
|
| +
|
| + // serviceID is the cloud project ID, which is also this service's unique
|
| + // ID. This can be specified by flag or, if on GCE, will automatically be
|
| + // probed from metadata.
|
| + serviceID string
|
| +
|
| + coord logdog.ServicesClient
|
| }
|
|
|
| // Run performs service-wide initialization and invokes the specified run
|
| @@ -150,17 +182,14 @@ func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
|
| }()
|
| }
|
|
|
| - // Are we running on a GCE intance?
|
| - if err := s.probeGCEEnvironment(c); err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to probe GCE environment.")
|
| - return err
|
| - }
|
| -
|
| // Validate the runtime environment.
|
| if s.serviceID == "" {
|
| - return errors.New("no service ID was configured")
|
| + return errors.New("no service ID was configured (-service-id)")
|
| }
|
|
|
| + // Install a process-wide cache.
|
| + c = proccache.Use(c, &proccache.Cache{})
|
| +
|
| // Configure our signal handler. It will listen for terminating signals and
|
| // issue a shutdown signal if one is received.
|
| signalC := make(chan os.Signal)
|
| @@ -196,24 +225,21 @@ func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
|
|
|
| // Initialize our Client instantiations.
|
| var err error
|
| - s.coord, err = s.initCoordinatorClient(c)
|
| - if err != nil {
|
| + if s.coord, err = s.initCoordinatorClient(c); err != nil {
|
| log.WithError(err).Errorf(c, "Failed to setup Coordinator client.")
|
| return err
|
| }
|
|
|
| - s.config, err = s.initConfig(c)
|
| - if err != nil {
|
| + // Initialize and install our config service and caching layers, and load our
|
| + // initial service config.
|
| + if err := s.initConfig(&c); err != nil {
|
| log.WithError(err).Errorf(c, "Failed to setup configuration.")
|
| return err
|
| }
|
| - defer s.config.Close()
|
|
|
| + // Clear our shutdown function on termination.
|
| defer s.SetShutdownFunc(nil)
|
|
|
| - // Install a process-wide cache.
|
| - c = proccache.Use(c, &proccache.Cache{})
|
| -
|
| // Run main service function.
|
| return f(c)
|
| }
|
| @@ -231,7 +257,6 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) {
|
| s.tsMonFlags.Register(fs)
|
|
|
| s.authFlags.Register(fs, auth.Options{})
|
| - s.configFlags.AddToFlagSet(fs)
|
|
|
| fs.StringVar(&s.serviceID, "service-id", "",
|
| "Specify the service ID that this instance is supporting. If empty, the service ID "+
|
| @@ -247,6 +272,10 @@ func (s *Service) addFlags(c context.Context, fs *flag.FlagSet) {
|
| "If supplied, enable CPU profiling and write the profile here.")
|
| fs.StringVar(&s.heapProfilePath, "heap-profile-path", "",
|
| "If supplied, enable CPU profiling and write the profile here.")
|
| + fs.Var(&s.killCheckInterval, "config-kill-interval",
|
| + "If non-zero, poll for configuration changes and kill the application if one is detected.")
|
| + fs.StringVar(&s.testConfigFilePath, "test-config-file-path", "",
|
| + "(Testing) If set, load configuration from a local filesystem rooted here.")
|
| }
|
|
|
| // probeGCEEnvironment fills in any parameters that can be probed from Google
|
| @@ -301,24 +330,113 @@ func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
|
| return retryServicesClient.New(sc, nil), nil
|
| }
|
|
|
| -func (s *Service) initConfig(c context.Context) (*config.Manager, error) {
|
| - rt, err := s.AuthenticatedTransport(c, nil)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to create config client.")
|
| - return nil, err
|
| +func (s *Service) initConfig(c *context.Context) error {
|
| + opts := config.CacheOptions{
|
| + CacheExpiration: projectConfigCacheDuration,
|
| }
|
|
|
| - s.configFlags.RoundTripper = rt
|
| - o, err := s.configFlags.CoordinatorOptions(c, s.coord)
|
| - if err != nil {
|
| - log.WithError(err).Errorf(c, "Failed to load configuration parameters.")
|
| - return nil, err
|
| + // If a testConfigFilePath was specified, use a mock configuration service
|
| + // that loads from a local file.
|
| + var p client.Provider
|
| + if s.testConfigFilePath == "" {
|
| + ccfg, err := s.coord.GetConfig(*c, &google.Empty{})
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + // Determine our config service host.
|
| + //
|
| + // Older Coordinator instances may provide the full URL instead of the host,
|
| + // in which case we will extract the host from the URL.
|
| + host := ccfg.ConfigServiceHost
|
| + if host == "" {
|
| + if ccfg.ConfigServiceUrl == "" {
|
| + return errors.New("coordinator does not specify a config service")
|
| + }
|
| + u, err := url.Parse(ccfg.ConfigServiceUrl)
|
| + if err != nil {
|
| + return errors.Annotate(err).Reason("failed to parse config service URL").Err()
|
| + }
|
| + host = u.Host
|
| + }
|
| +
|
| + if ccfg.ConfigSet == "" {
|
| + return errors.New("coordinator does not specify a config set")
|
| + }
|
| +
|
| + log.Fields{
|
| + "host": host,
|
| + }.Debugf(*c, "Using remote configuration service client.")
|
| + p = &client.RemoteProvider{
|
| + Host: host,
|
| + }
|
| + } else {
|
| + // Test / Local: use filesystem config path.
|
| + ci, err := filesystem.New(s.testConfigFilePath)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + p = &testconfig.Provider{Base: ci}
|
| }
|
| - o.ServiceID = s.serviceID
|
| - o.ProjectConfigCacheDuration = projectConfigCacheDuration
|
| - o.KillFunc = s.shutdown
|
|
|
| - return config.NewManager(c, *o)
|
| + // Add config caching layers.
|
| + *c = opts.WrapBackend(*c, &client.Backend{
|
| + Provider: p,
|
| + })
|
| +
|
| + // Load our service configuration.
|
| + var meta cfgclient.Meta
|
| + cset, path := s.ServiceConfigPath()
|
| + if err := cfgclient.Get(*c, cfgclient.AsService, cset, path, textproto.Message(&s.serviceConfig), &meta); err != nil {
|
| + return errors.Annotate(err).Reason("failed to load service config").Err()
|
| + }
|
| +
|
| + // Create a poller for our service config.
|
| + if s.killCheckInterval > 0 {
|
| + pollerC, pollerCancelFunc := context.WithCancel(*c)
|
| +
|
| + poller := config.ChangePoller{
|
| + ConfigSet: cset,
|
| + Path: path,
|
| + Period: time.Duration(s.killCheckInterval),
|
| + OnChange: func() {
|
| + // When a configuration change is detected, stop future polling and call
|
| + // our shutdown function.
|
| + pollerCancelFunc()
|
| + s.shutdown()
|
| + },
|
| + ContentHash: meta.ContentHash,
|
| + }
|
| + go poller.Run(pollerC)
|
| + }
|
| + return nil
|
| +}
|
| +
|
| +// ServiceConfigPath returns the ConfigSet and path to the current service's
|
| +// configuration.
|
| +func (s *Service) ServiceConfigPath() (cfgtypes.ConfigSet, string) {
|
| + return cfgtypes.ServiceConfigSet(s.serviceID), svcconfig.ServiceConfigPath
|
| +}
|
| +
|
| +// ServiceConfig returns the configuration data for the current service.
|
| +func (s *Service) ServiceConfig() *svcconfig.Config { return &s.serviceConfig }
|
| +
|
| +// ProjectConfigPath returns the ConfigSet and path to the current service's
|
| +// project configuration for proj.
|
| +func (s *Service) ProjectConfigPath(proj cfgtypes.ProjectName) (cfgtypes.ConfigSet, string) {
|
| + return cfgtypes.ProjectConfigSet(proj), svcconfig.ProjectConfigPath(s.serviceID)
|
| +}
|
| +
|
| +// ProjectConfig returns the current service's project configuration for proj.
|
| +func (s *Service) ProjectConfig(c context.Context, proj cfgtypes.ProjectName) (*svcconfig.ProjectConfig, error) {
|
| + cset, path := s.ProjectConfigPath(proj)
|
| +
|
| + var pcfg svcconfig.ProjectConfig
|
| + if err := cfgclient.Get(c, cfgclient.AsService, cset, path, textproto.Message(&pcfg), nil); err != nil {
|
| + return nil, errors.Annotate(err).Reason("failed to load project config from %(cset)s.%(path)s").
|
| + D("cset", cset).D("path", path).Err()
|
| + }
|
| + return &pcfg, nil
|
| }
|
|
|
| // SetShutdownFunc sets the service shutdown function.
|
| @@ -339,18 +457,6 @@ func (s *Service) shutdownImmediately() {
|
| os.Exit(1)
|
| }
|
|
|
| -// Config returns the cached service configuration.
|
| -func (s *Service) Config() *svcconfig.Config {
|
| - return s.config.Config()
|
| -}
|
| -
|
| -// ProjectConfig returns the cached project configuration.
|
| -//
|
| -// If the project configuration is not available, nil will be returned.
|
| -func (s *Service) ProjectConfig(c context.Context, proj cfgtypes.ProjectName) (*svcconfig.ProjectConfig, error) {
|
| - return s.config.ProjectConfig(c, proj)
|
| -}
|
| -
|
| // Coordinator returns the cached Coordinator client.
|
| func (s *Service) Coordinator() logdog.ServicesClient {
|
| return s.coord
|
| @@ -366,7 +472,7 @@ func (s *Service) ServiceID() string {
|
| // IntermediateStorage instantiates the configured intermediate Storage
|
| // instance.
|
| func (s *Service) IntermediateStorage(c context.Context) (storage.Storage, error) {
|
| - cfg := s.config.Config()
|
| + cfg := s.ServiceConfig()
|
| if cfg.GetStorage() == nil {
|
| log.Errorf(c, "Missing storage configuration.")
|
| return nil, ErrInvalidConfig
|
|
|