| OLD | NEW |
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package service |
| 6 |
| 7 import ( |
| 8 "errors" |
| 9 "flag" |
| 10 "net/http" |
| 11 "os" |
| 12 "os/signal" |
| 13 "sync" |
| 14 |
| 15 "github.com/luci/luci-go/client/authcli" |
| 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 17 "github.com/luci/luci-go/common/auth" |
| 18 log "github.com/luci/luci-go/common/logging" |
| 19 "github.com/luci/luci-go/common/logging/gologger" |
| 20 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| 21 "github.com/luci/luci-go/common/prpc" |
| 22 "github.com/luci/luci-go/server/internal/logdog/config" |
| 23 "github.com/luci/luci-go/server/internal/logdog/retryServicesClient" |
| 24 "github.com/luci/luci-go/server/logdog/storage" |
| 25 "github.com/luci/luci-go/server/logdog/storage/bigtable" |
| 26 "golang.org/x/net/context" |
| 27 "google.golang.org/cloud" |
| 28 ) |
| 29 |
| 30 var ( |
| 31 // ErrInvalidConfig is an error that is returned when the supplied |
| 32 // configuration is invalid. |
| 33 ErrInvalidConfig = errors.New("invalid configuration") |
| 34 |
| 35 // CoordinatorScopes is the set of OAuth2 scopes to use for the Coordina
tor |
| 36 // client. |
| 37 CoordinatorScopes = []string{ |
| 38 auth.OAuthScopeEmail, |
| 39 } |
| 40 ) |
| 41 |
| 42 // Service is a base class full of common LogDog service application parameters. |
| 43 type Service struct { |
| 44 context.Context |
| 45 |
| 46 // UserAgent is the user agent string that will be used for service |
| 47 // communication. |
| 48 UserAgent string |
| 49 |
| 50 // ShutdownFunc, if not nil, is a function that will be called when a sh
utdown |
| 51 // signal is received. |
| 52 ShutdownFunc func() |
| 53 |
| 54 // topCancelFunc is the Context cancel function for the top-level applic
ation |
| 55 // Context. |
| 56 topCancelFunc func() |
| 57 |
| 58 // shutdownMu protects the shutdown variables. |
| 59 shutdownMu sync.Mutex |
| 60 shutdownFunc func() |
| 61 shutdownCount int32 |
| 62 |
| 63 loggingFlags log.Config |
| 64 authFlags authcli.Flags |
| 65 configFlags config.Flags |
| 66 |
| 67 coordinatorHost string |
| 68 coordinatorInsecure bool |
| 69 storageCredentialJSONPath string |
| 70 |
| 71 coord services.ServicesClient |
| 72 config *config.Manager |
| 73 } |
| 74 |
| 75 // New instantiates a new Service. |
| 76 func New(c context.Context) *Service { |
| 77 c, cancelFunc := context.WithCancel(c) |
| 78 c = gologger.Use(c) |
| 79 |
| 80 return &Service{ |
| 81 Context: c, |
| 82 topCancelFunc: cancelFunc, |
| 83 } |
| 84 } |
| 85 |
| 86 // AddFlags adds standard service flags to the supplied FlagSet. |
| 87 func (s *Service) AddFlags(fs *flag.FlagSet) { |
| 88 s.loggingFlags.AddFlags(fs) |
| 89 s.authFlags.Register(fs, auth.Options{ |
| 90 Context: s, |
| 91 Logger: log.Get(s), |
| 92 }) |
| 93 s.configFlags.AddToFlagSet(fs) |
| 94 |
| 95 fs.StringVar(&s.coordinatorHost, "coordinator", "", |
| 96 "The Coordinator service's [host][:port].") |
| 97 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, |
| 98 "Connect to Coordinator over HTTP (instead of HTTPS).") |
| 99 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", |
| 100 "If supplied, the path of a JSON credential file to load and use
for storage operations.") |
| 101 } |
| 102 |
| 103 // Run loads the Service's base runtime and invokes the specified run function. |
| 104 func (s *Service) Run(f func() error) error { |
| 105 s.Context = s.loggingFlags.Set(s.Context) |
| 106 |
| 107 // Configure our signal handler. It will listen for terminating signals
and |
| 108 // issue a shutdown signal if one is received. |
| 109 signalC := make(chan os.Signal) |
| 110 go func() { |
| 111 for sig := range signalC { |
| 112 s.Shutdown() |
| 113 log.Warningf(log.SetField(s, "signal", sig), "Received c
lose signal. Send again to terminate immediately.") |
| 114 } |
| 115 }() |
| 116 signal.Notify(signalC, os.Interrupt) |
| 117 defer func() { |
| 118 signal.Stop(signalC) |
| 119 close(signalC) |
| 120 }() |
| 121 |
| 122 // Initialize our Client instantiations. |
| 123 var err error |
| 124 s.coord, err = s.initCoordinatorClient() |
| 125 if err != nil { |
| 126 log.Errorf(log.SetError(s, err), "Failed to setup Coordinator cl
ient.") |
| 127 return err |
| 128 } |
| 129 |
| 130 s.config, err = s.initConfig() |
| 131 if err != nil { |
| 132 log.Errorf(log.SetError(s, err), "Failed to setup configuration.
") |
| 133 return err |
| 134 } |
| 135 |
| 136 return f() |
| 137 } |
| 138 |
| 139 func (s *Service) initCoordinatorClient() (services.ServicesClient, error) { |
| 140 if s.coordinatorHost == "" { |
| 141 log.Errorf(s, "Missing Coordinator URL (-coordinator).") |
| 142 return nil, ErrInvalidConfig |
| 143 } |
| 144 |
| 145 httpClient, err := s.AuthenticatedClient(func(o *auth.Options) { |
| 146 o.Scopes = CoordinatorScopes |
| 147 }) |
| 148 if err != nil { |
| 149 log.Errorf(log.SetError(s, err), "Failed to create authenticated
client.") |
| 150 return nil, err |
| 151 } |
| 152 |
| 153 prpcClient := prpc.Client{ |
| 154 C: httpClient, |
| 155 Host: s.coordinatorHost, |
| 156 Options: prpc.DefaultOptions(), |
| 157 } |
| 158 if s.coordinatorInsecure { |
| 159 prpcClient.Options.Insecure = true |
| 160 } |
| 161 sc := services.NewServicesPRPCClient(&prpcClient) |
| 162 |
| 163 // Wrap the resulting client in a retry harness. |
| 164 return retryServicesClient.New(sc, nil), nil |
| 165 } |
| 166 |
| 167 func (s *Service) initConfig() (*config.Manager, error) { |
| 168 rt, err := s.AuthenticatedTransport(nil) |
| 169 if err != nil { |
| 170 log.Errorf(log.SetError(s, err), "Failed to create config client
.") |
| 171 return nil, err |
| 172 } |
| 173 |
| 174 s.configFlags.RoundTripper = rt |
| 175 o, err := s.configFlags.CoordinatorOptions(s, s.coord) |
| 176 if err != nil { |
| 177 log.Errorf(log.SetError(s, err), "Failed to load configuration p
arameters.") |
| 178 return nil, err |
| 179 } |
| 180 o.KillFunc = s.Shutdown |
| 181 |
| 182 return config.NewManager(s, *o) |
| 183 } |
| 184 |
| 185 // Shutdown issues a shutdown signal to the service. |
| 186 func (s *Service) Shutdown() { |
| 187 s.shutdownMu.Lock() |
| 188 defer s.shutdownMu.Unlock() |
| 189 |
| 190 if s.shutdownCount > 0 { |
| 191 os.Exit(1) |
| 192 } |
| 193 s.shutdownCount++ |
| 194 |
| 195 if f := s.shutdownFunc; f != nil { |
| 196 f() |
| 197 } else { |
| 198 s.topCancelFunc() |
| 199 } |
| 200 } |
| 201 |
| 202 // SetShutdownFunc sets the service shutdown function. |
| 203 func (s *Service) SetShutdownFunc(f func()) { |
| 204 s.shutdownMu.Lock() |
| 205 defer s.shutdownMu.Unlock() |
| 206 s.shutdownFunc = f |
| 207 } |
| 208 |
| 209 // Config returns the cached service configuration. |
| 210 func (s *Service) Config() *svcconfig.Config { |
| 211 return s.config.Config() |
| 212 } |
| 213 |
| 214 // Coordinator returns the cached Coordinator client. |
| 215 func (s *Service) Coordinator() services.ServicesClient { |
| 216 return s.coord |
| 217 } |
| 218 |
| 219 // Storage instantiates the configured Storage instance. |
| 220 func (s *Service) Storage() (storage.Storage, error) { |
| 221 cfg := s.config.Config() |
| 222 if cfg.GetStorage() == nil { |
| 223 log.Errorf(s, "Missing storage configuration.") |
| 224 return nil, ErrInvalidConfig |
| 225 } |
| 226 |
| 227 btcfg := cfg.GetStorage().GetBigtable() |
| 228 if btcfg == nil { |
| 229 log.Errorf(s, "Missing BigTable storage configuration") |
| 230 return nil, ErrInvalidConfig |
| 231 } |
| 232 |
| 233 // Initialize Storage authentication. |
| 234 a, err := s.Authenticator(func(o *auth.Options) { |
| 235 o.Scopes = bigtable.StorageScopes |
| 236 if s.storageCredentialJSONPath != "" { |
| 237 o.ServiceAccountJSONPath = s.storageCredentialJSONPath |
| 238 } |
| 239 }) |
| 240 if err != nil { |
| 241 log.Errorf(log.SetError(s, err), "Failed to create BigTable Auth
enticator.") |
| 242 return nil, err |
| 243 } |
| 244 |
| 245 bt, err := bigtable.New(s, bigtable.Options{ |
| 246 Project: btcfg.Project, |
| 247 Zone: btcfg.Zone, |
| 248 Cluster: btcfg.Cluster, |
| 249 LogTable: btcfg.LogTableName, |
| 250 ClientOptions: []cloud.ClientOption{ |
| 251 cloud.WithTokenSource(a.TokenSource()), |
| 252 }, |
| 253 }) |
| 254 if err != nil { |
| 255 log.Errorf(log.SetError(s, err), "Failed to create BigTable inst
ance.") |
| 256 return nil, err |
| 257 } |
| 258 return bt, nil |
| 259 } |
| 260 |
| 261 // Authenticator returns an Authenticator instance. The Authenticator is |
| 262 // configured from a base set of Authenticator Options. |
| 263 // |
| 264 // An optional permutation functon can be provided to modify those Options |
| 265 // before the Authenticator is created. |
| 266 func (s *Service) Authenticator(f func(o *auth.Options)) (*auth.Authenticator, e
rror) { |
| 267 authOpts, err := s.authFlags.Options() |
| 268 if err != nil { |
| 269 log.Errorf(log.SetError(s, err), "Failed to create authenticator
options.") |
| 270 return nil, ErrInvalidConfig |
| 271 } |
| 272 if f != nil { |
| 273 f(&authOpts) |
| 274 } |
| 275 return auth.NewAuthenticator(auth.SilentLogin, authOpts), nil |
| 276 } |
| 277 |
| 278 // AuthenticatedTransport returns an authenticated http.RoundTripper transport. |
| 279 // The transport is configured from a base set of Authenticator Options. |
| 280 // |
| 281 // An optional permutation functon can be provided to modify those Options |
| 282 // before the Authenticator is created. |
| 283 func (s *Service) AuthenticatedTransport(f func(o *auth.Options)) (http.RoundTri
pper, error) { |
| 284 a, err := s.Authenticator(f) |
| 285 if err != nil { |
| 286 return nil, err |
| 287 } |
| 288 return a.Transport() |
| 289 } |
| 290 |
| 291 // AuthenticatedClient returns an authenticated http.Client. The Client is |
| 292 // configured from a base set of Authenticator Options. |
| 293 // |
| 294 // An optional permutation functon can be provided to modify those Options |
| 295 // before the Authenticator is created. |
| 296 func (s *Service) AuthenticatedClient(f func(o *auth.Options)) (*http.Client, er
ror) { |
| 297 a, err := s.Authenticator(f) |
| 298 if err != nil { |
| 299 return nil, err |
| 300 } |
| 301 return a.Client() |
| 302 } |
| OLD | NEW |