| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. | 1 // Copyright 2015 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 17 matching lines...) Expand all Loading... |
| 28 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 28 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 29 "github.com/luci/luci-go/logdog/appengine/coordinator/config" | 29 "github.com/luci/luci-go/logdog/appengine/coordinator/config" |
| 30 "github.com/luci/luci-go/logdog/common/storage" | 30 "github.com/luci/luci-go/logdog/common/storage" |
| 31 "github.com/luci/luci-go/logdog/common/storage/archive" | 31 "github.com/luci/luci-go/logdog/common/storage/archive" |
| 32 "github.com/luci/luci-go/logdog/common/storage/bigtable" | 32 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 33 "github.com/luci/luci-go/logdog/common/storage/caching" | 33 "github.com/luci/luci-go/logdog/common/storage/caching" |
| 34 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 34 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 35 "github.com/luci/luci-go/server/auth" | 35 "github.com/luci/luci-go/server/auth" |
| 36 "github.com/luci/luci-go/server/router" | 36 "github.com/luci/luci-go/server/router" |
| 37 | 37 |
| 38 » gcps "cloud.google.com/go/pubsub" | 38 » vkit "cloud.google.com/go/pubsub/apiv1" |
| 39 gcst "cloud.google.com/go/storage" | 39 gcst "cloud.google.com/go/storage" |
| 40 "golang.org/x/net/context" | |
| 41 "google.golang.org/api/option" | 40 "google.golang.org/api/option" |
| 42 "google.golang.org/appengine" | 41 "google.golang.org/appengine" |
| 43 "google.golang.org/grpc" | 42 "google.golang.org/grpc" |
| 44 "google.golang.org/grpc/metadata" | 43 "google.golang.org/grpc/metadata" |
| 44 |
| 45 "golang.org/x/net/context" |
| 45 ) | 46 ) |
| 46 | 47 |
| 47 const ( | 48 const ( |
| 48 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. | 49 // maxSignedURLLifetime is the maximum allowed signed URL lifetime. |
| 49 maxSignedURLLifetime = 1 * time.Hour | 50 maxSignedURLLifetime = 1 * time.Hour |
| 50 | 51 |
| 51 // maxGSFetchSize is the maximum amount of data we can fetch from a sing
le | 52 // maxGSFetchSize is the maximum amount of data we can fetch from a sing
le |
| 52 // Google Storage RPC call. | 53 // Google Storage RPC call. |
| 53 // | 54 // |
| 54 // AppEngine's "urlfetch" has a limit of 32MB: | 55 // AppEngine's "urlfetch" has a limit of 32MB: |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 118 | 119 |
| 119 // gcfg is the cached global configuration. | 120 // gcfg is the cached global configuration. |
| 120 gcfg *config.Config | 121 gcfg *config.Config |
| 121 projectConfigs map[cfgtypes.ProjectName]*cachedProjectConfig | 122 projectConfigs map[cfgtypes.ProjectName]*cachedProjectConfig |
| 122 | 123 |
| 123 // archivalIndex is the atomically-manipulated archival index for the | 124 // archivalIndex is the atomically-manipulated archival index for the |
| 124 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces | 125 // ArchivalPublisher. This is shared between all ArchivalPublisher insta
nces |
| 125 // from this service. | 126 // from this service. |
| 126 archivalIndex int32 | 127 archivalIndex int32 |
| 127 | 128 |
| 128 » // pubSubClients is a map of Pub/Sub client singletons generated during
this | 129 » // pubSubClient is a Pub/Sub client generated during this request. |
| 129 » // request. Each client is associated with its project, and will be | |
| 130 » // initialized the first time it is requested by getPubSubClient. | |
| 131 // | 130 // |
| 132 » // All clients will be closed on "close". | 131 » // It will be closed on "close". |
| 133 » pubSubClients map[string]*gcps.Client | 132 » pubSubClient *vkit.PublisherClient |
| 134 | |
| 135 // signer is the signer instance to use. | 133 // signer is the signer instance to use. |
| 136 signer gaesigner.Signer | 134 signer gaesigner.Signer |
| 137 } | 135 } |
| 138 | 136 |
| 139 func (s *prodServicesInst) close(c context.Context) { | 137 func (s *prodServicesInst) close(c context.Context) { |
| 140 s.Lock() | 138 s.Lock() |
| 141 defer s.Unlock() | 139 defer s.Unlock() |
| 142 | 140 |
| 143 » for proj, client := range s.pubSubClients { | 141 » if client := s.pubSubClient; client != nil { |
| 144 if err := client.Close(); err != nil { | 142 if err := client.Close(); err != nil { |
| 145 » » » log.Fields{ | 143 » » » log.WithError(err).Errorf(c, "Failed to close Pub/Sub cl
ient singleton.") |
| 146 » » » » log.ErrorKey: err, | |
| 147 » » » » "project": proj, | |
| 148 » » » }.Errorf(c, "Failed to close Pub/Sub client singleton.") | |
| 149 } | 144 } |
| 145 s.pubSubClient = nil |
| 150 } | 146 } |
| 151 } | 147 } |
| 152 | |
| 153 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { | 148 func (s *prodServicesInst) Config(c context.Context) (*config.Config, error) { |
| 154 s.Lock() | 149 s.Lock() |
| 155 defer s.Unlock() | 150 defer s.Unlock() |
| 156 | 151 |
| 157 // Load/cache the global config. | 152 // Load/cache the global config. |
| 158 if s.gcfg == nil { | 153 if s.gcfg == nil { |
| 159 var err error | 154 var err error |
| 160 s.gcfg, err = config.Load(c) | 155 s.gcfg, err = config.Load(c) |
| 161 if err != nil { | 156 if err != nil { |
| 162 return nil, err | 157 return nil, err |
| (...skipping 193 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 356 } | 351 } |
| 357 | 352 |
| 358 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) | 353 fullTopic := pubsub.Topic(cfg.Coordinator.ArchiveTopic) |
| 359 if err := fullTopic.Validate(); err != nil { | 354 if err := fullTopic.Validate(); err != nil { |
| 360 log.Fields{ | 355 log.Fields{ |
| 361 log.ErrorKey: err, | 356 log.ErrorKey: err, |
| 362 "topic": fullTopic, | 357 "topic": fullTopic, |
| 363 }.Errorf(c, "Failed to validate archival topic.") | 358 }.Errorf(c, "Failed to validate archival topic.") |
| 364 return nil, errors.New("invalid archival topic") | 359 return nil, errors.New("invalid archival topic") |
| 365 } | 360 } |
| 366 project, topic := fullTopic.Split() | |
| 367 | 361 |
| 368 » // Get a Pub/Sub client (maybe re-use). | 362 » psClient, err := s.getPubSubClient() |
| 369 » psClient, err := s.getPubSubClient(project) | |
| 370 if err != nil { | 363 if err != nil { |
| 371 log.WithError(err).Errorf(c, "Failed to get Pub/Sub client.") | |
| 372 return nil, err | 364 return nil, err |
| 373 } | 365 } |
| 374 | 366 |
| 375 // Create a Topic, and configure it to not bundle messages. | 367 // Create a Topic, and configure it to not bundle messages. |
| 376 psTopic := psClient.Topic(topic) | |
| 377 pubsub.DisableTopicBundling(psTopic) | |
| 378 | |
| 379 return &pubsubArchivalPublisher{ | 368 return &pubsubArchivalPublisher{ |
| 380 » » client: psClient, | 369 » » publisher: &pubsub.UnbufferedPublisher{ |
| 381 » » topic: psTopic, | 370 » » » Topic: fullTopic, |
| 371 » » » Client: psClient, |
| 372 » » }, |
| 382 publishIndexFunc: s.nextArchiveIndex, | 373 publishIndexFunc: s.nextArchiveIndex, |
| 383 }, nil | 374 }, nil |
| 384 } | 375 } |
| 385 | 376 |
| 386 func (s *prodServicesInst) getPubSubClient(proj string) (*gcps.Client, error) { | 377 func (s *prodServicesInst) getPubSubClient() (*vkit.PublisherClient, error) { |
| 378 » s.Lock() |
| 379 » defer s.Unlock() |
| 380 |
| 381 » if s.pubSubClient != nil { |
| 382 » » return s.pubSubClient, nil |
| 383 » } |
| 384 |
| 387 // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s
ince | 385 // Create a new AppEngine context. Don't pass gRPC metadata to PubSub, s
ince |
| 388 // we don't want any caller RPC to be forwarded to the backend service. | 386 // we don't want any caller RPC to be forwarded to the backend service. |
| 389 c := metadata.NewContext(s.aeCtx, nil) | 387 c := metadata.NewContext(s.aeCtx, nil) |
| 390 | 388 |
| 391 » s.Lock() | 389 » // Create an authenticated unbuffered Pub/Sub Publisher. |
| 392 » defer s.Unlock() | |
| 393 | |
| 394 » client := s.pubSubClients[proj] | |
| 395 » if client != nil { | |
| 396 » » return client, nil | |
| 397 » } | |
| 398 | |
| 399 » // Create an authenticated Pub/Sub client. | |
| 400 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub
sub.PublisherScopes...)) | 390 creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub
sub.PublisherScopes...)) |
| 401 if err != nil { | 391 if err != nil { |
| 402 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia
ls.") | 392 log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia
ls.") |
| 403 return nil, errors.New("failed to create Pub/Sub credentials") | 393 return nil, errors.New("failed to create Pub/Sub credentials") |
| 404 } | 394 } |
| 405 | 395 |
| 406 » psClient, err := gcps.NewClient(c, proj, | 396 » psClient, err := vkit.NewPublisherClient(c, |
| 407 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) | 397 option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) |
| 408 if err != nil { | 398 if err != nil { |
| 409 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 399 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 410 return nil, errors.New("failed to create Pub/Sub client") | 400 return nil, errors.New("failed to create Pub/Sub client") |
| 411 } | 401 } |
| 412 | 402 |
| 413 » // Retain this client for the duration of our request. | 403 » s.pubSubClient = psClient |
| 414 » if s.pubSubClients == nil { | |
| 415 » » s.pubSubClients = make(map[string]*gcps.Client) | |
| 416 » } | |
| 417 » s.pubSubClients[proj] = psClient | |
| 418 return psClient, nil | 404 return psClient, nil |
| 419 } | 405 } |
| 420 | 406 |
| 421 func (s *prodServicesInst) nextArchiveIndex() uint64 { | 407 func (s *prodServicesInst) nextArchiveIndex() uint64 { |
| 422 // We use a 32-bit value for this because it avoids atomic memory bounar
y | 408 // We use a 32-bit value for this because it avoids atomic memory bounar
y |
| 423 // issues. Furthermore, we constrain it to be positive, using a negative | 409 // issues. Furthermore, we constrain it to be positive, using a negative |
| 424 // value as a sentinel that the archival index has wrapped. | 410 // value as a sentinel that the archival index has wrapped. |
| 425 // | 411 // |
| 426 // This is reasonable, as it is very unlikely that a single request will
issue | 412 // This is reasonable, as it is very unlikely that a single request will
issue |
| 427 // more than MaxInt32 archival tasks. | 413 // more than MaxInt32 archival tasks. |
| (...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 562 | 548 |
| 563 // Sign index URL. | 549 // Sign index URL. |
| 564 if req.Index { | 550 if req.Index { |
| 565 if resp.Index, err = doSign(si.index); err != nil { | 551 if resp.Index, err = doSign(si.index); err != nil { |
| 566 return nil, errors.Annotate(err, "").InternalReason("fai
led to sign index URL").Err() | 552 return nil, errors.Annotate(err, "").InternalReason("fai
led to sign index URL").Err() |
| 567 } | 553 } |
| 568 } | 554 } |
| 569 | 555 |
| 570 return &resp, nil | 556 return &resp, nil |
| 571 } | 557 } |
| OLD | NEW |