Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | |
| 9 "sync" | 8 "sync" |
| 10 "sync/atomic" | 9 "sync/atomic" |
| 11 "time" | 10 "time" |
| 12 | 11 |
| 13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" | 12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" |
| 14 "github.com/luci/luci-go/common/clock" | 13 "github.com/luci/luci-go/common/clock" |
| 15 luciConfig "github.com/luci/luci-go/common/config" | 14 luciConfig "github.com/luci/luci-go/common/config" |
| 16 "github.com/luci/luci-go/common/errors" | 15 "github.com/luci/luci-go/common/errors" |
| 17 "github.com/luci/luci-go/common/gcloud/gs" | 16 "github.com/luci/luci-go/common/gcloud/gs" |
| 18 "github.com/luci/luci-go/common/gcloud/pubsub" | 17 "github.com/luci/luci-go/common/gcloud/pubsub" |
| (...skipping 64 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 83 StorageForStream(context.Context, *LogStreamState) (Storage, error) | 82 StorageForStream(context.Context, *LogStreamState) (Storage, error) |
| 84 | 83 |
| 85 // ArchivalPublisher returns an ArchivalPublisher instance. | 84 // ArchivalPublisher returns an ArchivalPublisher instance. |
| 86 ArchivalPublisher(context.Context) (ArchivalPublisher, error) | 85 ArchivalPublisher(context.Context) (ArchivalPublisher, error) |
| 87 } | 86 } |
| 88 | 87 |
| 89 // ProdCoordinatorService is Middleware used by Coordinator services. | 88 // ProdCoordinatorService is Middleware used by Coordinator services. |
| 90 // | 89 // |
| 91 // It installs a production Services instance into the Context. | 90 // It installs a production Services instance into the Context. |
| 92 func ProdCoordinatorService(c *router.Context, next router.Handler) { | 91 func ProdCoordinatorService(c *router.Context, next router.Handler) { |
| 93 » c.Context = WithServices(c.Context, &prodServicesInst{ | 92 » services := prodServicesInst{ |
| 94 » » req: c.Request, | 93 » » aeCtx: appengine.WithContext(c.Context, c.Request), |
| 95 » }) | 94 » } |
| 95 » defer services.close(c.Context) | |
| 96 | |
| 97 » c.Context = WithServices(c.Context, &services) | |
| 96 next(c) | 98 next(c) |
| 97 } | 99 } |
| 98 | 100 |
| 99 // prodServicesInst is a Service exposing production faciliites. A unique | 101 // prodServicesInst is a Service exposing production faciliites. A unique |
| 100 // instance is bound to each each request. | 102 // instance is bound to each each request. |
| 101 type prodServicesInst struct { | 103 type prodServicesInst struct { |
| 102 sync.Mutex | 104 sync.Mutex |
| 103 | 105 |
| 104 » // req is the request that is being serviced. This is populated when the | 106 » // aeCtx is an AppEngine Context initialized for the current request. |
| 105 » // service is installed via ProdCoordinatorService. | 107 » aeCtx context.Context |
| 106 » req *http.Request | |
| 107 | 108 |
| 108 // gcfg is the cached global configuration. | 109 // gcfg is the cached global configuration. |
| 109 gcfg *config.Config | 110 gcfg *config.Config |
| 110 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig | 111 projectConfigs map[luciConfig.ProjectName]*cachedProjectConfig |
| 111 | 112 |
| 112 // archivalIndex is the atomically-manipulated archival index for the | 113 // archivalIndex is the atomically-manipulated archival index for the |
| 113 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces | 114 // ArchivalPublisher. This is shared between all ArchivalPublisher insta nces |
| 114 // from this service. | 115 // from this service. |
| 115 archivalIndex int32 | 116 archivalIndex int32 |
| 116 | 117 |
| 118 // pubSubClients is a map of Pub/Sub client singletons generated during this | |
| 119 // request. Each client is associated with its project, and will be | |
| 120 // initialized the first time it is requested by getPubSubClient. | |
| 121 // | |
| 122 // All clients will be closed on "close". | |
| 123 pubSubClients map[string]*gcps.Client | |
| 124 | |
| 117 // signer is the signer instance to use. | 125 // signer is the signer instance to use. |
| 118 signer gaesigner.Signer | 126 signer gaesigner.Signer |
| 119 } | 127 } |
| 120 | 128 |
| 129 func (s *prodServicesInst) close(c context.Context) { | |
| 130 for proj, client := range s.pubSubClients { | |
|
Vadim Sh.
2016/12/19 19:59:45
nit: do it under the mutex for consistency.
dnj
2016/12/19 21:43:55
Done.
| |
| 131 if err := client.Close(); err != nil { | |
| 132 log.Fields{ | |
| 133 log.ErrorKey: err, | |
| 134 "project": proj, | |
| 135 }.Errorf(c, "Failed to close Pub/Sub client singleton.") | |
| 136 } | |
| 137 } | |
| 138 } | |
| 139 | |
| 121 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { | 140 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { |
| 122 s.Lock() | 141 s.Lock() |
| 123 defer s.Unlock() | 142 defer s.Unlock() |
| 124 | 143 |
| 125 // Load/cache the global config. | 144 // Load/cache the global config. |
| 126 if s.gcfg == nil { | 145 if s.gcfg == nil { |
| 127 var err error | 146 var err error |
| 128 s.gcfg, err = config.Load(c) | 147 s.gcfg, err = config.Load(c) |
| 129 if err != nil { | 148 if err != nil { |
| 130 return nil, err | 149 return nil, err |
| (...skipping 195 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 326 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) | 345 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) |
| 327 if err := fullTopic.Validate(); err != nil { | 346 if err := fullTopic.Validate(); err != nil { |
| 328 log.Fields{ | 347 log.Fields{ |
| 329 log.ErrorKey: err, | 348 log.ErrorKey: err, |
| 330 "topic": fullTopic, | 349 "topic": fullTopic, |
| 331 }.Errorf(c, "Failed to validate archival topic.") | 350 }.Errorf(c, "Failed to validate archival topic.") |
| 332 return nil, errors.New("invalid archival topic") | 351 return nil, errors.New("invalid archival topic") |
| 333 } | 352 } |
| 334 project, topic := fullTopic.Split() | 353 project, topic := fullTopic.Split() |
| 335 | 354 |
| 355 // Get a Pub/Sub client (maybe re-use). | |
| 356 psClient, err := s.getPubSubClient(project) | |
| 357 if err != nil { | |
| 358 log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.") | |
| 359 return nil, err | |
| 360 } | |
| 361 | |
| 362 return &pubsubArchivalPublisher{ | |
| 363 client: psClient, | |
| 364 topic: psClient.Topic(topic), | |
| 365 publishIndexFunc: s.nextArchiveIndex, | |
| 366 }, nil | |
| 367 } | |
| 368 | |
| 369 func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) { | |
| 370 // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince | |
| 371 // we don't want any caller RPC to be forwarded to the backend service. | |
| 372 c := metadata.NewContext(s.aeCtx, nil) | |
| 373 | |
| 374 s.Lock() | |
| 375 defer s.Unlock() | |
| 376 | |
| 377 client := s.pubSubClients[proj] | |
| 378 if client != nil { | |
| 379 return client, nil | |
| 380 } | |
| 381 | |
| 336 // Create an authenticated Pub/Sub client. | 382 // Create an authenticated Pub/Sub client. |
| 337 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...)) | 383 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub sub.PublisherScopes...)) |
| 338 if err != nil { | 384 if err != nil { |
| 339 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.") | 385 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia ls.") |
| 340 return nil, errors.New("failed to create Pub/Sub credentials") | 386 return nil, errors.New("failed to create Pub/Sub credentials") |
| 341 } | 387 } |
| 342 | 388 |
| 343 » // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s ince | 389 » psClient, err := gcps.NewClient(c, proj, |
| 344 » // we don't want any caller RPC to be forwarded to the backend service. | |
| 345 » aeCtx := appengine.WithContext(metadata.NewContext(c, nil), s.req) | |
| 346 | |
| 347 » psClient, err := gcps.NewClient(aeCtx, project, | |
| 348 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) | 390 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) |
| 349 if err != nil { | 391 if err != nil { |
| 350 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 392 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 351 return nil, errors.New("failed to create Pub/Sub client") | 393 return nil, errors.New("failed to create Pub/Sub client") |
| 352 } | 394 } |
| 353 | 395 |
| 354 » return &pubsubArchivalPublisher{ | 396 » // Retain this client for the duration of our request. |
| 355 » » client: psClient, | 397 » if s.pubSubClients == nil { |
| 356 » » topic: psClient.Topic(topic), | 398 » » s.pubSubClients = make(map[string]*gcps.Client) |
| 357 » » publishIndexFunc: s.nextArchiveIndex, | 399 » } |
| 358 » }, nil | 400 » s.pubSubClients[proj] = psClient |
| 401 » return psClient, nil | |
| 359 } | 402 } |
| 360 | 403 |
| 361 func (s *prodServicesInst) nextArchiveIndex() uint64 { | 404 func (s *prodServicesInst) nextArchiveIndex() uint64 { |
| 362 // We use a 32-bit value for this because it avoids atomic memory bounar y | 405 // We use a 32-bit value for this because it avoids atomic memory bounar y |
| 363 // issues. Furthermore, we constrain it to be positive, using a negative | 406 // issues. Furthermore, we constrain it to be positive, using a negative |
| 364 // value as a sentinel that the archival index has wrapped. | 407 // value as a sentinel that the archival index has wrapped. |
| 365 // | 408 // |
| 366 // This is reasonable, as it is very unlikely that a single request will issue | 409 // This is reasonable, as it is very unlikely that a single request will issue |
| 367 // more than MaxInt32 archival tasks. | 410 // more than MaxInt32 archival tasks. |
| 368 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 | 411 v := atomic.AddInt32(&s.archivalIndex, 1) - 1 |
| (...skipping 133 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 502 | 545 |
| 503 // Sign index URL. | 546 // Sign index URL. |
| 504 if req.Index { | 547 if req.Index { |
| 505 if resp.Index, err = doSign(si.index); err != nil { | 548 if resp.Index, err = doSign(si.index); err != nil { |
| 506 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err() | 549 return nil, errors.Annotate(err).InternalReason("failed to sign index URL").Err() |
| 507 } | 550 } |
| 508 } | 551 } |
| 509 | 552 |
| 510 return &resp, nil | 553 return &resp, nil |
| 511 } | 554 } |
| OLD | NEW |