Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(11)

Side by Side Diff: appengine/logdog/coordinator/service.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Code review comments, use Pub/Sub, archival staging, quality of life. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package coordinator 5 package coordinator
6 6
7 import ( 7 import (
8 » "errors" 8 » "sync"
9 9
10 gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client" 10 gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client"
11 "github.com/luci/luci-go/appengine/logdog/coordinator/config" 11 "github.com/luci/luci-go/appengine/logdog/coordinator/config"
12 "github.com/luci/luci-go/common/errors"
12 "github.com/luci/luci-go/common/gcloud/gs" 13 "github.com/luci/luci-go/common/gcloud/gs"
14 "github.com/luci/luci-go/common/gcloud/pubsub"
13 log "github.com/luci/luci-go/common/logging" 15 log "github.com/luci/luci-go/common/logging"
16 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
14 "github.com/luci/luci-go/server/logdog/storage" 17 "github.com/luci/luci-go/server/logdog/storage"
18 "github.com/luci/luci-go/server/logdog/storage/bigtable"
15 "golang.org/x/net/context" 19 "golang.org/x/net/context"
20 "google.golang.org/cloud"
21 gcps "google.golang.org/cloud/pubsub"
22 "google.golang.org/grpc/metadata"
16 ) 23 )
17 24
18 // Service is the base service container for LogDog handlers and endpoints. It 25 // Services is a set of support services used by Coordinator.
19 // is primarily usable as a means of consistently stubbing out various external 26 //
20 // components. 27 // Each Services instance is valid for a singel request, but can be re-used
21 type Service struct { 28 // throughout that request. This is advised, as the Services instance may
22 » // StorageFunc is a function that generates an intermediate Storage inst ance 29 // optionally cache values.
23 » // for use by this service. If nil, the production intermediate Storage 30 //
24 » // instance will be used. 31 // Services methods are goroutine-safe.
32 //
33 // By default, a production set of services will be used. However, this can be
34 // overridden for testing to mock the service layer.
35 type Services interface {
dnj 2016/04/11 17:20:04 This is a reconstruction of Service that is more v
36 » // Config returns the current instance and application configuration
37 » // instances.
25 // 38 //
26 » // This is provided for testing purposes. 39 » // The production instance will cache the results for the duration of th e
27 » StorageFunc func(context.Context) (storage.Storage, error) 40 » // request.
41 » Config(context.Context) (*config.GlobalConfig, *svcconfig.Config, error)
28 42
29 » // GSClientFunc is a function that generates a Google Storage client ins tance 43 » // Storage returns an intermediate storage instance for use by this serv ice.
30 » // for use by this service. If nil, the production Google Storage Client will 44 » //
31 » // be used. 45 » // The caller must close the returned instance if successful.
32 » GSClientFunc func(context.Context) (gs.Client, error) 46 » IntermediateStorage(context.Context) (storage.Storage, error)
47
48 » // GSClient instantiates a Google Storage client.
49 » GSClient(context.Context) (gs.Client, error)
50
51 » // ArchivalPublisher returns an ArchivalPublisher instance.
52 » ArchivalPublisher(context.Context) (ArchivalPublisher, error)
33 } 53 }
34 54
35 // Storage retrieves the configured Storage instance. 55 // ServiceBase is an embeddable struct that offers a production Services
36 func (s *Service) Storage(c context.Context) (storage.Storage, error) { 56 // implementation.
37 » sf := s.StorageFunc 57 //
38 » if sf == nil { 58 // Its Services member can be overridden to provide alternative implementations
39 » » // Production: use BigTable storage. 59 // for testing.
40 » » sf = config.GetStorage 60 type ServiceBase struct {
61 » Services
62 }
63
64 // GetServices returns a new Services instance.
65 func (s *ServiceBase) GetServices() Services {
66 » if s.Services != nil {
67 » » return s.Services
68 » }
69 » return &prodServices{}
70 }
71
72 // Request is a Service context for a given request.
73 type prodServices struct {
74 » sync.Mutex
75
76 » // gcfg is the cached global configuration.
77 » gcfg *config.GlobalConfig
78 » // cfg is the cached configuration.
79 » cfg *svcconfig.Config
80 }
81
82 // Config returns the current instance and application configuration instances.
83 //
84 // After a success, successive calls will return a cached result.
85 func (s *prodServices) Config(c context.Context) (*config.GlobalConfig, *svcconf ig.Config, error) {
86 » s.Lock()
87 » defer s.Unlock()
88
89 » // Load/cache the global config.
90 » if s.gcfg == nil {
91 » » var err error
92 » » s.gcfg, err = config.LoadGlobalConfig(c)
93 » » if err != nil {
94 » » » return nil, nil, err
95 » » }
41 } 96 }
42 97
43 » st, err := sf(c) 98 » if s.cfg == nil {
99 » » var err error
100 » » s.cfg, err = s.gcfg.LoadConfig(c)
101 » » if err != nil {
102 » » » return nil, nil, err
103 » » }
104 » }
105
106 » return s.gcfg, s.cfg, nil
107 }
108
109 func (s *prodServices) IntermediateStorage(c context.Context) (storage.Storage, error) {
110 » gcfg, cfg, err := s.Config(c)
44 if err != nil { 111 if err != nil {
45 » » log.Errorf(log.SetError(c, err), "Failed to get Storage instance .") 112 » » return nil, err
113 » }
114
115 » // Is BigTable configured?
116 » if cfg.Storage == nil {
117 » » return nil, errors.New("no storage configuration")
118 » }
119
120 » bt := cfg.Storage.GetBigtable()
121 » if bt == nil {
122 » » return nil, errors.New("no BigTable configuration")
123 » }
124
125 » // Validate the BigTable configuration.
126 » log.Fields{
127 » » "project": bt.Project,
128 » » "zone": bt.Zone,
129 » » "cluster": bt.Cluster,
130 » » "logTableName": bt.LogTableName,
131 » }.Debugf(c, "Connecting to BigTable.")
132 » var merr errors.MultiError
133 » if bt.Project == "" {
134 » » merr = append(merr, errors.New("missing project"))
135 » }
136 » if bt.Zone == "" {
137 » » merr = append(merr, errors.New("missing zone"))
138 » }
139 » if bt.Cluster == "" {
140 » » merr = append(merr, errors.New("missing cluster"))
141 » }
142 » if bt.LogTableName == "" {
143 » » merr = append(merr, errors.New("missing log table name"))
144 » }
145 » if len(merr) > 0 {
146 » » return nil, merr
147 » }
148
149 » // Get an Authenticator bound to the token scopes that we need for BigTa ble.
150 » a, err := gaeauthClient.Authenticator(c, bigtable.StorageScopes, gcfg.Bi gTableServiceAccountJSON)
151 » if err != nil {
152 » » log.WithError(err).Errorf(c, "Failed to create BigTable authenti cator.")
153 » » return nil, errors.New("failed to create BigTable authenticator" )
154 » }
155
156 » // Explicitly clear gRPC metadata from the Context. It is forwarded to
157 » // delegate calls by default, and standard request metadata can break Bi gTable
158 » // calls.
159 » c = metadata.NewContext(c, nil)
160
161 » st, err := bigtable.New(c, bigtable.Options{
162 » » Project: bt.Project,
163 » » Zone: bt.Zone,
164 » » Cluster: bt.Cluster,
165 » » LogTable: bt.LogTableName,
166 » » ClientOptions: []cloud.ClientOption{
167 » » » cloud.WithTokenSource(a.TokenSource()),
168 » » },
169 » })
170 » if err != nil {
171 » » log.WithError(err).Errorf(c, "Failed to create BigTable instance .")
46 return nil, err 172 return nil, err
47 } 173 }
48 return st, nil 174 return st, nil
49 } 175 }
50 176
51 // GSClient instantiates a Google Storage client. 177 func (s *prodServices) GSClient(c context.Context) (gs.Client, error) {
52 func (s *Service) GSClient(c context.Context) (gs.Client, error) {
53 » f := s.GSClientFunc
54 » if f == nil {
55 » » f = s.newProdGSClient
56 » }
57
58 » gsc, err := f(c)
59 » if err != nil {
60 » » log.Errorf(log.SetError(c, err), "Failed to get Google Storage c lient.")
61 » » return nil, err
62 » }
63 » return gsc, nil
64 }
65
66 func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) {
67 // Get an Authenticator bound to the token scopes that we need for 178 // Get an Authenticator bound to the token scopes that we need for
68 // authenticated Cloud Storage access. 179 // authenticated Cloud Storage access.
69 rt, err := gaeauthClient.Transport(c, gs.ReadOnlyScopes, nil) 180 rt, err := gaeauthClient.Transport(c, gs.ReadOnlyScopes, nil)
70 if err != nil { 181 if err != nil {
71 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.") 182 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.")
72 return nil, errors.New("failed to create Cloud Storage transport ") 183 return nil, errors.New("failed to create Cloud Storage transport ")
73 } 184 }
74 return gs.NewProdClient(c, rt) 185 return gs.NewProdClient(c, rt)
75 } 186 }
187
188 func (s *prodServices) ArchivalPublisher(c context.Context) (ArchivalPublisher, error) {
dnj 2016/04/11 17:20:04 (Except this, this is new).
189 _, cfg, err := s.Config(c)
190 if err != nil {
191 return nil, err
192 }
193
194 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic)
195 if err := fullTopic.Validate(); err != nil {
196 log.Fields{
197 log.ErrorKey: err,
198 "topic": fullTopic,
199 }.Errorf(c, "Failed to validate archival topic.")
200 return nil, errors.New("invalid archival topic")
201 }
202 project, topic := fullTopic.Split()
203
204 // Create an authenticated Pub/Sub client.
205 // Pub/Sub topic publishing.
206 auth, err := gaeauthClient.Authenticator(c, pubsub.PublisherScopes, nil)
207 if err != nil {
208 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic ator.")
209 return nil, errors.New("failed to create Pub/Sub authenticator")
210 }
211
212 client, err := auth.Client()
213 if err != nil {
214 log.WithError(err).Errorf(c, "Failed to create Pub/Sub HTTP clie nt.")
215 return nil, errors.New("failed to create Pub/Sub HTTP client")
216 }
217
218 psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client))
219 if err != nil {
220 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.")
221 return nil, errors.New("failed to create Pub/Sub client")
222 }
223
224 return &pubsubArchivalPublisher{
225 topic: psClient.Topic(topic),
226 }, nil
227 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698