Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 » "errors" | 8 » "sync" |
| 9 | 9 |
| 10 gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client" | 10 gaeauthClient "github.com/luci/luci-go/appengine/gaeauth/client" |
| 11 "github.com/luci/luci-go/appengine/logdog/coordinator/config" | 11 "github.com/luci/luci-go/appengine/logdog/coordinator/config" |
| 12 "github.com/luci/luci-go/common/errors" | |
| 12 "github.com/luci/luci-go/common/gcloud/gs" | 13 "github.com/luci/luci-go/common/gcloud/gs" |
| 14 "github.com/luci/luci-go/common/gcloud/pubsub" | |
| 13 log "github.com/luci/luci-go/common/logging" | 15 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | |
| 14 "github.com/luci/luci-go/server/logdog/storage" | 17 "github.com/luci/luci-go/server/logdog/storage" |
| 18 "github.com/luci/luci-go/server/logdog/storage/bigtable" | |
| 15 "golang.org/x/net/context" | 19 "golang.org/x/net/context" |
| 20 "google.golang.org/cloud" | |
| 21 gcps "google.golang.org/cloud/pubsub" | |
| 22 "google.golang.org/grpc/metadata" | |
| 16 ) | 23 ) |
| 17 | 24 |
| 18 // Service is the base service container for LogDog handlers and endpoints. It | 25 // Services is a set of support services used by Coordinator. |
| 19 // is primarily usable as a means of consistently stubbing out various external | 26 // |
| 20 // components. | 27 // Each Services instance is valid for a singel request, but can be re-used |
| 21 type Service struct { | 28 // throughout that request. This is advised, as the Services instance may |
| 22 » // StorageFunc is a function that generates an intermediate Storage inst ance | 29 // optionally cache values. |
| 23 » // for use by this service. If nil, the production intermediate Storage | 30 // |
| 24 » // instance will be used. | 31 // Services methods are goroutine-safe. |
| 32 // | |
| 33 // By default, a production set of services will be used. However, this can be | |
| 34 // overridden for testing to mock the service layer. | |
| 35 type Services interface { | |
|
dnj
2016/04/11 17:20:04
This is a reconstruction of Service that is more v
| |
| 36 » // Config returns the current instance and application configuration | |
| 37 » // instances. | |
| 25 // | 38 // |
| 26 » // This is provided for testing purposes. | 39 » // The production instance will cache the results for the duration of th e |
| 27 » StorageFunc func(context.Context) (storage.Storage, error) | 40 » // request. |
| 41 » Config(context.Context) (*config.GlobalConfig, *svcconfig.Config, error) | |
| 28 | 42 |
| 29 » // GSClientFunc is a function that generates a Google Storage client ins tance | 43 » // Storage returns an intermediate storage instance for use by this serv ice. |
| 30 » // for use by this service. If nil, the production Google Storage Client will | 44 » // |
| 31 » // be used. | 45 » // The caller must close the returned instance if successful. |
| 32 » GSClientFunc func(context.Context) (gs.Client, error) | 46 » IntermediateStorage(context.Context) (storage.Storage, error) |
| 47 | |
| 48 » // GSClient instantiates a Google Storage client. | |
| 49 » GSClient(context.Context) (gs.Client, error) | |
| 50 | |
| 51 » // ArchivalPublisher returns an ArchivalPublisher instance. | |
| 52 » ArchivalPublisher(context.Context) (ArchivalPublisher, error) | |
| 33 } | 53 } |
| 34 | 54 |
| 35 // Storage retrieves the configured Storage instance. | 55 // ServiceBase is an embeddable struct that offers a production Services |
| 36 func (s *Service) Storage(c context.Context) (storage.Storage, error) { | 56 // implementation. |
| 37 » sf := s.StorageFunc | 57 // |
| 38 » if sf == nil { | 58 // Its Services member can be overridden to provide alternative implementations |
| 39 » » // Production: use BigTable storage. | 59 // for testing. |
| 40 » » sf = config.GetStorage | 60 type ServiceBase struct { |
| 61 » Services | |
| 62 } | |
| 63 | |
| 64 // GetServices returns a new Services instance. | |
| 65 func (s *ServiceBase) GetServices() Services { | |
| 66 » if s.Services != nil { | |
| 67 » » return s.Services | |
| 68 » } | |
| 69 » return &prodServices{} | |
| 70 } | |
| 71 | |
| 72 // Request is a Service context for a given request. | |
| 73 type prodServices struct { | |
| 74 » sync.Mutex | |
| 75 | |
| 76 » // gcfg is the cached global configuration. | |
| 77 » gcfg *config.GlobalConfig | |
| 78 » // cfg is the cached configuration. | |
| 79 » cfg *svcconfig.Config | |
| 80 } | |
| 81 | |
| 82 // Config returns the current instance and application configuration instances. | |
| 83 // | |
| 84 // After a success, successive calls will return a cached result. | |
| 85 func (s *prodServices) Config(c context.Context) (*config.GlobalConfig, *svcconf ig.Config, error) { | |
| 86 » s.Lock() | |
| 87 » defer s.Unlock() | |
| 88 | |
| 89 » // Load/cache the global config. | |
| 90 » if s.gcfg == nil { | |
| 91 » » var err error | |
| 92 » » s.gcfg, err = config.LoadGlobalConfig(c) | |
| 93 » » if err != nil { | |
| 94 » » » return nil, nil, err | |
| 95 » » } | |
| 41 } | 96 } |
| 42 | 97 |
| 43 » st, err := sf(c) | 98 » if s.cfg == nil { |
| 99 » » var err error | |
| 100 » » s.cfg, err = s.gcfg.LoadConfig(c) | |
| 101 » » if err != nil { | |
| 102 » » » return nil, nil, err | |
| 103 » » } | |
| 104 » } | |
| 105 | |
| 106 » return s.gcfg, s.cfg, nil | |
| 107 } | |
| 108 | |
| 109 func (s *prodServices) IntermediateStorage(c context.Context) (storage.Storage, error) { | |
| 110 » gcfg, cfg, err := s.Config(c) | |
| 44 if err != nil { | 111 if err != nil { |
| 45 » » log.Errorf(log.SetError(c, err), "Failed to get Storage instance .") | 112 » » return nil, err |
| 113 » } | |
| 114 | |
| 115 » // Is BigTable configured? | |
| 116 » if cfg.Storage == nil { | |
| 117 » » return nil, errors.New("no storage configuration") | |
| 118 » } | |
| 119 | |
| 120 » bt := cfg.Storage.GetBigtable() | |
| 121 » if bt == nil { | |
| 122 » » return nil, errors.New("no BigTable configuration") | |
| 123 » } | |
| 124 | |
| 125 » // Validate the BigTable configuration. | |
| 126 » log.Fields{ | |
| 127 » » "project": bt.Project, | |
| 128 » » "zone": bt.Zone, | |
| 129 » » "cluster": bt.Cluster, | |
| 130 » » "logTableName": bt.LogTableName, | |
| 131 » }.Debugf(c, "Connecting to BigTable.") | |
| 132 » var merr errors.MultiError | |
| 133 » if bt.Project == "" { | |
| 134 » » merr = append(merr, errors.New("missing project")) | |
| 135 » } | |
| 136 » if bt.Zone == "" { | |
| 137 » » merr = append(merr, errors.New("missing zone")) | |
| 138 » } | |
| 139 » if bt.Cluster == "" { | |
| 140 » » merr = append(merr, errors.New("missing cluster")) | |
| 141 » } | |
| 142 » if bt.LogTableName == "" { | |
| 143 » » merr = append(merr, errors.New("missing log table name")) | |
| 144 » } | |
| 145 » if len(merr) > 0 { | |
| 146 » » return nil, merr | |
| 147 » } | |
| 148 | |
| 149 » // Get an Authenticator bound to the token scopes that we need for BigTa ble. | |
| 150 » a, err := gaeauthClient.Authenticator(c, bigtable.StorageScopes, gcfg.Bi gTableServiceAccountJSON) | |
| 151 » if err != nil { | |
| 152 » » log.WithError(err).Errorf(c, "Failed to create BigTable authenti cator.") | |
| 153 » » return nil, errors.New("failed to create BigTable authenticator" ) | |
| 154 » } | |
| 155 | |
| 156 » // Explicitly clear gRPC metadata from the Context. It is forwarded to | |
| 157 » // delegate calls by default, and standard request metadata can break Bi gTable | |
| 158 » // calls. | |
| 159 » c = metadata.NewContext(c, nil) | |
| 160 | |
| 161 » st, err := bigtable.New(c, bigtable.Options{ | |
| 162 » » Project: bt.Project, | |
| 163 » » Zone: bt.Zone, | |
| 164 » » Cluster: bt.Cluster, | |
| 165 » » LogTable: bt.LogTableName, | |
| 166 » » ClientOptions: []cloud.ClientOption{ | |
| 167 » » » cloud.WithTokenSource(a.TokenSource()), | |
| 168 » » }, | |
| 169 » }) | |
| 170 » if err != nil { | |
| 171 » » log.WithError(err).Errorf(c, "Failed to create BigTable instance .") | |
| 46 return nil, err | 172 return nil, err |
| 47 } | 173 } |
| 48 return st, nil | 174 return st, nil |
| 49 } | 175 } |
| 50 | 176 |
| 51 // GSClient instantiates a Google Storage client. | 177 func (s *prodServices) GSClient(c context.Context) (gs.Client, error) { |
| 52 func (s *Service) GSClient(c context.Context) (gs.Client, error) { | |
| 53 » f := s.GSClientFunc | |
| 54 » if f == nil { | |
| 55 » » f = s.newProdGSClient | |
| 56 » } | |
| 57 | |
| 58 » gsc, err := f(c) | |
| 59 » if err != nil { | |
| 60 » » log.Errorf(log.SetError(c, err), "Failed to get Google Storage c lient.") | |
| 61 » » return nil, err | |
| 62 » } | |
| 63 » return gsc, nil | |
| 64 } | |
| 65 | |
| 66 func (s *Service) newProdGSClient(c context.Context) (gs.Client, error) { | |
| 67 // Get an Authenticator bound to the token scopes that we need for | 178 // Get an Authenticator bound to the token scopes that we need for |
| 68 // authenticated Cloud Storage access. | 179 // authenticated Cloud Storage access. |
| 69 rt, err := gaeauthClient.Transport(c, gs.ReadOnlyScopes, nil) | 180 rt, err := gaeauthClient.Transport(c, gs.ReadOnlyScopes, nil) |
| 70 if err != nil { | 181 if err != nil { |
| 71 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.") | 182 log.WithError(err).Errorf(c, "Failed to create Cloud Storage tra nsport.") |
| 72 return nil, errors.New("failed to create Cloud Storage transport ") | 183 return nil, errors.New("failed to create Cloud Storage transport ") |
| 73 } | 184 } |
| 74 return gs.NewProdClient(c, rt) | 185 return gs.NewProdClient(c, rt) |
| 75 } | 186 } |
| 187 | |
| 188 func (s *prodServices) ArchivalPublisher(c context.Context) (ArchivalPublisher, error) { | |
|
dnj
2016/04/11 17:20:04
(Except this, this is new).
| |
| 189 _, cfg, err := s.Config(c) | |
| 190 if err != nil { | |
| 191 return nil, err | |
| 192 } | |
| 193 | |
| 194 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) | |
| 195 if err := fullTopic.Validate(); err != nil { | |
| 196 log.Fields{ | |
| 197 log.ErrorKey: err, | |
| 198 "topic": fullTopic, | |
| 199 }.Errorf(c, "Failed to validate archival topic.") | |
| 200 return nil, errors.New("invalid archival topic") | |
| 201 } | |
| 202 project, topic := fullTopic.Split() | |
| 203 | |
| 204 // Create an authenticated Pub/Sub client. | |
| 205 // Pub/Sub topic publishing. | |
| 206 auth, err := gaeauthClient.Authenticator(c, pubsub.PublisherScopes, nil) | |
| 207 if err != nil { | |
| 208 log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic ator.") | |
| 209 return nil, errors.New("failed to create Pub/Sub authenticator") | |
| 210 } | |
| 211 | |
| 212 client, err := auth.Client() | |
| 213 if err != nil { | |
| 214 log.WithError(err).Errorf(c, "Failed to create Pub/Sub HTTP clie nt.") | |
| 215 return nil, errors.New("failed to create Pub/Sub HTTP client") | |
| 216 } | |
| 217 | |
| 218 psClient, err := gcps.NewClient(c, project, cloud.WithBaseHTTP(client)) | |
| 219 if err != nil { | |
| 220 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | |
| 221 return nil, errors.New("failed to create Pub/Sub client") | |
| 222 } | |
| 223 | |
| 224 return &pubsubArchivalPublisher{ | |
| 225 topic: psClient.Topic(topic), | |
| 226 }, nil | |
| 227 } | |
| OLD | NEW |