Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(378)

Side by Side Diff: server/internal/logdog/service/service.go

Issue 1838803002: LogDog: BigTable batching schema. (Closed) Base URL: https://github.com/luci/luci-go@recordio-split
Patch Set: Minor comments and quality of code tweaks. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package service 5 package service
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "flag" 9 "flag"
10 "net/http" 10 "net/http"
11 "os" 11 "os"
12 "os/signal" 12 "os/signal"
13 "runtime/pprof"
13 "sync/atomic" 14 "sync/atomic"
14 15
15 "github.com/luci/luci-go/client/authcli" 16 "github.com/luci/luci-go/client/authcli"
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
17 "github.com/luci/luci-go/common/auth" 18 "github.com/luci/luci-go/common/auth"
18 "github.com/luci/luci-go/common/gcloud/gs" 19 "github.com/luci/luci-go/common/gcloud/gs"
19 log "github.com/luci/luci-go/common/logging" 20 log "github.com/luci/luci-go/common/logging"
20 "github.com/luci/luci-go/common/logging/gologger" 21 "github.com/luci/luci-go/common/logging/gologger"
21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" 22 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
22 "github.com/luci/luci-go/common/prpc" 23 "github.com/luci/luci-go/common/prpc"
(...skipping 24 matching lines...) Expand all
47 48
48 shutdownFunc atomic.Value 49 shutdownFunc atomic.Value
49 50
50 loggingFlags log.Config 51 loggingFlags log.Config
51 authFlags authcli.Flags 52 authFlags authcli.Flags
52 configFlags config.Flags 53 configFlags config.Flags
53 54
54 coordinatorHost string 55 coordinatorHost string
55 coordinatorInsecure bool 56 coordinatorInsecure bool
56 storageCredentialJSONPath string 57 storageCredentialJSONPath string
58 cpuProfilePath string
57 59
58 coord logdog.ServicesClient 60 coord logdog.ServicesClient
59 config *config.Manager 61 config *config.Manager
60 } 62 }
61 63
62 // Run performs service-wide initialization and invokes the specified run 64 // Run performs service-wide initialization and invokes the specified run
63 // function. 65 // function.
64 func (s *Service) Run(c context.Context, f func(context.Context) error) { 66 func (s *Service) Run(c context.Context, f func(context.Context) error) {
65 c = gologger.Use(c) 67 c = gologger.Use(c)
66 68
67 rc := 0 69 rc := 0
68 if err := s.runImpl(c, f); err != nil { 70 if err := s.runImpl(c, f); err != nil {
69 log.WithError(err).Errorf(c, "Application exiting with error.") 71 log.WithError(err).Errorf(c, "Application exiting with error.")
70 rc = 1 72 rc = 1
71 } 73 }
72 os.Exit(rc) 74 os.Exit(rc)
73 } 75 }
74 76
75 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro r { 77 func (s *Service) runImpl(c context.Context, f func(context.Context) error) erro r {
76 s.addFlags(c, &s.Flags) 78 s.addFlags(c, &s.Flags)
77 if err := s.Flags.Parse(os.Args[1:]); err != nil { 79 if err := s.Flags.Parse(os.Args[1:]); err != nil {
78 log.WithError(err).Errorf(c, "Failed to parse command-line.") 80 log.WithError(err).Errorf(c, "Failed to parse command-line.")
79 return err 81 return err
80 } 82 }
81 83
82 // Install logging configuration. 84 // Install logging configuration.
83 c = s.loggingFlags.Set(c) 85 c = s.loggingFlags.Set(c)
84 86
87 if p := s.cpuProfilePath; p != "" {
88 fd, err := os.Create(p)
89 if err != nil {
90 log.Fields{
91 log.ErrorKey: err,
92 "path": p,
93 }.Errorf(c, "Failed to create CPU profile output file.")
94 return err
95 }
96 defer fd.Close()
97
98 pprof.StartCPUProfile(fd)
99 defer pprof.StopCPUProfile()
100 }
101
85 // Configure our signal handler. It will listen for terminating signals and 102 // Configure our signal handler. It will listen for terminating signals and
86 // issue a shutdown signal if one is received. 103 // issue a shutdown signal if one is received.
87 signalC := make(chan os.Signal) 104 signalC := make(chan os.Signal)
88 go func() { 105 go func() {
89 hasShutdownAlready := false 106 hasShutdownAlready := false
90 for sig := range signalC { 107 for sig := range signalC {
91 if !hasShutdownAlready { 108 if !hasShutdownAlready {
92 hasShutdownAlready = true 109 hasShutdownAlready = true
93 110
94 log.Warningf(log.SetField(c, "signal", sig), "Re ceived close signal. Send again to terminate immediately.") 111 log.Warningf(log.SetField(c, "signal", sig), "Re ceived close signal. Send again to terminate immediately.")
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
135 Logger: log.Get(c), 152 Logger: log.Get(c),
136 }) 153 })
137 s.configFlags.AddToFlagSet(fs) 154 s.configFlags.AddToFlagSet(fs)
138 155
139 fs.StringVar(&s.coordinatorHost, "coordinator", "", 156 fs.StringVar(&s.coordinatorHost, "coordinator", "",
140 "The Coordinator service's [host][:port].") 157 "The Coordinator service's [host][:port].")
141 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false, 158 fs.BoolVar(&s.coordinatorInsecure, "coordinator-insecure", false,
142 "Connect to Coordinator over HTTP (instead of HTTPS).") 159 "Connect to Coordinator over HTTP (instead of HTTPS).")
143 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path ", "", 160 fs.StringVar(&s.storageCredentialJSONPath, "storage-credential-json-path ", "",
144 "If supplied, the path of a JSON credential file to load and use for storage operations.") 161 "If supplied, the path of a JSON credential file to load and use for storage operations.")
162 fs.StringVar(&s.cpuProfilePath, "cpu-profile-path", "",
163 "If supplied, enable CPU profiling and write the profile here.")
145 } 164 }
146 165
147 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien t, error) { 166 func (s *Service) initCoordinatorClient(c context.Context) (logdog.ServicesClien t, error) {
148 if s.coordinatorHost == "" { 167 if s.coordinatorHost == "" {
149 log.Errorf(c, "Missing Coordinator URL (-coordinator).") 168 log.Errorf(c, "Missing Coordinator URL (-coordinator).")
150 return nil, ErrInvalidConfig 169 return nil, ErrInvalidConfig
151 } 170 }
152 171
153 httpClient, err := s.AuthenticatedClient(func(o *auth.Options) { 172 httpClient, err := s.AuthenticatedClient(func(o *auth.Options) {
154 o.Scopes = CoordinatorScopes 173 o.Scopes = CoordinatorScopes
(...skipping 83 matching lines...) Expand 10 before | Expand all | Expand 10 after
238 o.Scopes = bigtable.StorageScopes 257 o.Scopes = bigtable.StorageScopes
239 if s.storageCredentialJSONPath != "" { 258 if s.storageCredentialJSONPath != "" {
240 o.ServiceAccountJSONPath = s.storageCredentialJSONPath 259 o.ServiceAccountJSONPath = s.storageCredentialJSONPath
241 } 260 }
242 }) 261 })
243 if err != nil { 262 if err != nil {
244 log.WithError(err).Errorf(c, "Failed to create BigTable Authenti cator.") 263 log.WithError(err).Errorf(c, "Failed to create BigTable Authenti cator.")
245 return nil, err 264 return nil, err
246 } 265 }
247 266
248 » return bigtable.New(c, bigtable.Options{ 267 » bt, err := bigtable.New(c, bigtable.Options{
249 Project: btcfg.Project, 268 Project: btcfg.Project,
250 Zone: btcfg.Zone, 269 Zone: btcfg.Zone,
251 Cluster: btcfg.Cluster, 270 Cluster: btcfg.Cluster,
252 LogTable: btcfg.LogTableName, 271 LogTable: btcfg.LogTableName,
253 ClientOptions: []cloud.ClientOption{ 272 ClientOptions: []cloud.ClientOption{
254 cloud.WithTokenSource(a.TokenSource()), 273 cloud.WithTokenSource(a.TokenSource()),
255 }, 274 },
256 » }), nil 275 » })
276 » if err != nil {
277 » » return nil, err
278 » }
279 » return bt, nil
257 } 280 }
258 281
259 // GSClient returns an authenticated Google Storage client instance. 282 // GSClient returns an authenticated Google Storage client instance.
260 func (s *Service) GSClient(c context.Context) (gs.Client, error) { 283 func (s *Service) GSClient(c context.Context) (gs.Client, error) {
261 rt, err := s.AuthenticatedTransport(func(o *auth.Options) { 284 rt, err := s.AuthenticatedTransport(func(o *auth.Options) {
262 o.Scopes = gs.ReadWriteScopes 285 o.Scopes = gs.ReadWriteScopes
263 }) 286 })
264 if err != nil { 287 if err != nil {
265 log.WithError(err).Errorf(c, "Failed to create authenticated GS transport.") 288 log.WithError(err).Errorf(c, "Failed to create authenticated GS transport.")
266 return nil, err 289 return nil, err
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
308 // 331 //
309 // An optional permutation functon can be provided to modify those Options 332 // An optional permutation functon can be provided to modify those Options
310 // before the Authenticator is created. 333 // before the Authenticator is created.
311 func (s *Service) AuthenticatedClient(f func(o *auth.Options)) (*http.Client, er ror) { 334 func (s *Service) AuthenticatedClient(f func(o *auth.Options)) (*http.Client, er ror) {
312 a, err := s.Authenticator(f) 335 a, err := s.Authenticator(f)
313 if err != nil { 336 if err != nil {
314 return nil, err 337 return nil, err
315 } 338 }
316 return a.Client() 339 return a.Client()
317 } 340 }
OLDNEW
« no previous file with comments | « server/internal/logdog/collector/utils_test.go ('k') | server/logdog/storage/archive/storage.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698