| OLD | NEW |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package service | 5 package service |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "flag" | 9 "flag" |
| 10 "net/http" | 10 "net/http" |
| 11 "os" | 11 "os" |
| 12 "os/signal" | 12 "os/signal" |
| 13 "runtime/pprof" |
| 13 "sync/atomic" | 14 "sync/atomic" |
| 14 | 15 |
| 15 "github.com/luci/luci-go/client/authcli" | 16 "github.com/luci/luci-go/client/authcli" |
| 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 17 "github.com/luci/luci-go/common/auth" | 18 "github.com/luci/luci-go/common/auth" |
| 18 "github.com/luci/luci-go/common/gcloud/gs" | 19 "github.com/luci/luci-go/common/gcloud/gs" |
| 19 log "github.com/luci/luci-go/common/logging" | 20 log "github.com/luci/luci-go/common/logging" |
| 20 "github.com/luci/luci-go/common/logging/gologger" | 21 "github.com/luci/luci-go/common/logging/gologger" |
| 21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 22 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| 22 "github.com/luci/luci-go/common/prpc" | 23 "github.com/luci/luci-go/common/prpc" |
| (...skipping 24 matching lines...) Expand all Loading... |
| 47 | 48 |
| 48 shutdownFunc atomic.Value | 49 shutdownFunc atomic.Value |
| 49 | 50 |
| 50 loggingFlags log.Config | 51 loggingFlags log.Config |
| 51 authFlags authcli.Flags | 52 authFlags authcli.Flags |
| 52 configFlags config.Flags | 53 configFlags config.Flags |
| 53 | 54 |
| 54 coordinatorHost string | 55 coordinatorHost string |
| 55 coordinatorInsecure bool | 56 coordinatorInsecure bool |
| 56 storageCredentialJSONPath string | 57 storageCredentialJSONPath string |
| 58 cpuProfilePath string |
| 57 | 59 |
| 58 coord logdog.ServicesClient | 60 coord logdog.ServicesClient |
| 59 config *config.Manager | 61 config *config.Manager |
| 60 } | 62 } |
| 61 | 63 |
| 62 // Run performs service-wide initialization and invokes the specified run | 64 // Run performs service-wide initialization and invokes the specified run |
| 63 // function. | 65 // function. |
| 64 func (s *Service) Run(c context.Context, f func(context.Context) error) { | 66 func (s *Service) Run(c context.Context, f func(context.Context) error) { |
| 65 c = gologger.Use(c) | 67 c = gologger.Use(c) |
| 66 | 68 |
| 67 rc := 0 | 69 rc := 0 |
| 68 if err := s.runImpl(c, f); err != nil { | 70 if err := s.runImpl(c, f); err != nil { |
| 69 log.WithError(err).Errorf(c, "Application exiting with error.") | 71 log.WithError(err).Errorf(c, "Application exiting with error.") |
| 70 rc = 1 | 72 rc = 1 |
| 71 } | 73 } |
| 72 os.Exit(rc) | 74 os.Exit(rc) |
| 73 } | 75 } |
| 74 | 76 |
| 75 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
r { | 77 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro
r { |
| 76 s.addFlags(c, &s.Flags) | 78 s.addFlags(c, &s.Flags) |
| 77 if err := s.Flags.Parse(os.Args[1:]); err != nil { | 79 if err := s.Flags.Parse(os.Args[1:]); err != nil { |
| 78 log.WithError(err).Errorf(c, "Failed to parse command-line.") | 80 log.WithError(err).Errorf(c, "Failed to parse command-line.") |
| 79 return err | 81 return err |
| 80 } | 82 } |
| 81 | 83 |
| 82 // Install logging configuration. | 84 // Install logging configuration. |
| 83 c = s.loggingFlags.Set(c) | 85 c = s.loggingFlags.Set(c) |
| 84 | 86 |
| 87 if p := s.cpuProfilePath; p != "" { |
| 88 fd, err := os.Create(p) |
| 89 if err != nil { |
| 90 log.Fields{ |
| 91 log.ErrorKey: err, |
| 92 "path": p, |
| 93 }.Errorf(c, "Failed to create CPU profile output file.") |
| 94 return err |
| 95 } |
| 96 defer fd.Close() |
| 97 |
| 98 pprof.StartCPUProfile(fd) |
| 99 defer pprof.StopCPUProfile() |
| 100 } |
| 101 |
| 85 // Configure our signal handler. It will listen for terminating signals
and | 102 // Configure our signal handler. It will listen for terminating signals
and |
| 86 // issue a shutdown signal if one is received. | 103 // issue a shutdown signal if one is received. |
| 87 signalC := make(chan os.Signal) | 104 signalC := make(chan os.Signal) |
| 88 go func() { | 105 go func() { |
| 89 hasShutdownAlready := false | 106 hasShutdownAlready := false |
| 90 for sig := range signalC { | 107 for sig := range signalC { |
| 91 if !hasShutdownAlready { | 108 if !hasShutdownAlready { |
| 92 hasShutdownAlready = true | 109 hasShutdownAlready = true |
| 93 | 110 |
| 94 log.Warningf(log.SetField(c, "signal", sig), "Re
ceived close signal. Send again to terminate immediately.") | 111 log.Warningf(log.SetField(c, "signal", sig), "Re
ceived close signal. Send again to terminate immediately.") |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 135 Logger: log.Get(c), | 152 Logger: log.Get(c), |
| 136 }) | 153 }) |
| 137 s.configFlags.AddToFlagSet(fs) | 154 s.configFlags.AddToFlagSet(fs) |
| 138 | 155 |
| 139 fs.StringVar(&s.coordinatorHost, "coordinator", "", | 156 fs.StringVar(&s.coordinatorHost, "coordinator", "", |
| 140 "The Coordinator service's [host][:port].") | 157 "The Coordinator service's [host][:port].") |
| 141 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, | 158 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, |
| 142 "Connect to Coordinator over HTTP (instead of HTTPS).") | 159 "Connect to Coordinator over HTTP (instead of HTTPS).") |
| 143 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", | 160 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path
", "", |
| 144 "If supplied, the path of a JSON credential file to load and use
for storage operations.") | 161 "If supplied, the path of a JSON credential file to load and use
for storage operations.") |
| 162 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "", |
| 163 "If supplied, enable CPU profiling and write the profile here.") |
| 145 } | 164 } |
| 146 | 165 |
| 147 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { | 166 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien
t, error) { |
| 148 if s.coordinatorHost == "" { | 167 if s.coordinatorHost == "" { |
| 149 log.Errorf(c, "Missing Coordinator URL (-coordinator).") | 168 log.Errorf(c, "Missing Coordinator URL (-coordinator).") |
| 150 return nil, ErrInvalidConfig | 169 return nil, ErrInvalidConfig |
| 151 } | 170 } |
| 152 | 171 |
| 153 httpClient, err := s.AuthenticatedClient(func(o *auth.Options) { | 172 httpClient, err := s.AuthenticatedClient(func(o *auth.Options) { |
| 154 o.Scopes = CoordinatorScopes | 173 o.Scopes = CoordinatorScopes |
| (...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 238 o.Scopes = bigtable.StorageScopes | 257 o.Scopes = bigtable.StorageScopes |
| 239 if s.storageCredentialJSONPath != "" { | 258 if s.storageCredentialJSONPath != "" { |
| 240 o.ServiceAccountJSONPath = s.storageCredentialJSONPath | 259 o.ServiceAccountJSONPath = s.storageCredentialJSONPath |
| 241 } | 260 } |
| 242 }) | 261 }) |
| 243 if err != nil { | 262 if err != nil { |
| 244 log.WithError(err).Errorf(c, "Failed to create BigTable Authenti
cator.") | 263 log.WithError(err).Errorf(c, "Failed to create BigTable Authenti
cator.") |
| 245 return nil, err | 264 return nil, err |
| 246 } | 265 } |
| 247 | 266 |
| 248 » return bigtable.New(c, bigtable.Options{ | 267 » bt, err := bigtable.New(c, bigtable.Options{ |
| 249 Project: btcfg.Project, | 268 Project: btcfg.Project, |
| 250 Zone: btcfg.Zone, | 269 Zone: btcfg.Zone, |
| 251 Cluster: btcfg.Cluster, | 270 Cluster: btcfg.Cluster, |
| 252 LogTable: btcfg.LogTableName, | 271 LogTable: btcfg.LogTableName, |
| 253 ClientOptions: []cloud.ClientOption{ | 272 ClientOptions: []cloud.ClientOption{ |
| 254 cloud.WithTokenSource(a.TokenSource()), | 273 cloud.WithTokenSource(a.TokenSource()), |
| 255 }, | 274 }, |
| 256 » }), nil | 275 » }) |
| 276 » if err != nil { |
| 277 » » return nil, err |
| 278 » } |
| 279 » return bt, nil |
| 257 } | 280 } |
| 258 | 281 |
| 259 // GSClient returns an authenticated Google Storage client instance. | 282 // GSClient returns an authenticated Google Storage client instance. |
| 260 func (s *Service) GSClient(c context.Context) (gs.Client, error) { | 283 func (s *Service) GSClient(c context.Context) (gs.Client, error) { |
| 261 rt, err := s.AuthenticatedTransport(func(o *auth.Options) { | 284 rt, err := s.AuthenticatedTransport(func(o *auth.Options) { |
| 262 o.Scopes = gs.ReadWriteScopes | 285 o.Scopes = gs.ReadWriteScopes |
| 263 }) | 286 }) |
| 264 if err != nil { | 287 if err != nil { |
| 265 log.WithError(err).Errorf(c, "Failed to create authenticated GS
transport.") | 288 log.WithError(err).Errorf(c, "Failed to create authenticated GS
transport.") |
| 266 return nil, err | 289 return nil, err |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 308 // | 331 // |
| 309 // An optional permutation functon can be provided to modify those Options | 332 // An optional permutation functon can be provided to modify those Options |
| 310 // before the Authenticator is created. | 333 // before the Authenticator is created. |
| 311 func (s *Service) AuthenticatedClient(f func(o *auth.Options)) (*http.Client, er
ror) { | 334 func (s *Service) AuthenticatedClient(f func(o *auth.Options)) (*http.Client, er
ror) { |
| 312 a, err := s.Authenticator(f) | 335 a, err := s.Authenticator(f) |
| 313 if err != nil { | 336 if err != nil { |
| 314 return nil, err | 337 return nil, err |
| 315 } | 338 } |
| 316 return a.Client() | 339 return a.Client() |
| 317 } | 340 } |
| OLD | NEW |