| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package coordinator | 5 package coordinator |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "net/http" | |
| 9 "sync" | 8 "sync" |
| 10 "sync/atomic" | 9 "sync/atomic" |
| 11 "time" | 10 "time" |
| 12 | 11 |
| 13 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" | 12 "github.com/luci/luci-go/appengine/gaeauth/server/gaesigner" |
| 14 "github.com/luci/luci-go/appengine/gaemiddleware" | 13 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 15 "github.com/luci/luci-go/common/clock" | 14 "github.com/luci/luci-go/common/clock" |
| 16 luciConfig "github.com/luci/luci-go/common/config" | 15 luciConfig "github.com/luci/luci-go/common/config" |
| 17 "github.com/luci/luci-go/common/errors" | 16 "github.com/luci/luci-go/common/errors" |
| 18 "github.com/luci/luci-go/common/gcloud/gs" | 17 "github.com/luci/luci-go/common/gcloud/gs" |
| (...skipping 305 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 324 if err := fullTopic.Validate(); err != nil { | 323 if err := fullTopic.Validate(); err != nil { |
| 325 log.Fields{ | 324 log.Fields{ |
| 326 log.ErrorKey: err, | 325 log.ErrorKey: err, |
| 327 "topic": fullTopic, | 326 "topic": fullTopic, |
| 328 }.Errorf(c, "Failed to validate archival topic.") | 327 }.Errorf(c, "Failed to validate archival topic.") |
| 329 return nil, errors.New("invalid archival topic") | 328 return nil, errors.New("invalid archival topic") |
| 330 } | 329 } |
| 331 project, topic := fullTopic.Split() | 330 project, topic := fullTopic.Split() |
| 332 | 331 |
| 333 // Create an authenticated Pub/Sub client. | 332 // Create an authenticated Pub/Sub client. |
| 334 » transport, err := auth.GetRPCTransport(c, auth.AsSelf, auth.WithScopes(p
ubsub.PublisherScopes...)) | 333 » creds, err := auth.GetPerRPCCredentials(auth.AsSelf, auth.WithScopes(pub
sub.PublisherScopes...)) |
| 335 if err != nil { | 334 if err != nil { |
| 336 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub authentic
ator.") | 335 » » log.WithError(err).Errorf(c, "Failed to create Pub/Sub credentia
ls.") |
| 337 » » return nil, errors.New("failed to create Pub/Sub authenticator") | 336 » » return nil, errors.New("failed to create Pub/Sub credentials") |
| 338 } | 337 } |
| 339 » client := &http.Client{Transport: transport} | 338 » // Don't pass gRPC metadata to PubSub. |
| 340 » psClient, err := gcps.NewClient(c, project, option.WithHTTPClient(client
)) | 339 » psClient, err := gcps.NewClient( |
| 340 » » metadata.NewContext(c, nil), project, |
| 341 » » option.WithGRPCDialOption(grpc.WithPerRPCCredentials(creds))) |
| 341 if err != nil { | 342 if err != nil { |
| 342 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 343 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 343 return nil, errors.New("failed to create Pub/Sub client") | 344 return nil, errors.New("failed to create Pub/Sub client") |
| 344 } | 345 } |
| 345 | 346 |
| 346 return &pubsubArchivalPublisher{ | 347 return &pubsubArchivalPublisher{ |
| 348 client: psClient, |
| 347 topic: psClient.Topic(topic), | 349 topic: psClient.Topic(topic), |
| 348 publishIndexFunc: s.nextArchiveIndex, | 350 publishIndexFunc: s.nextArchiveIndex, |
| 349 }, nil | 351 }, nil |
| 350 } | 352 } |
| 351 | 353 |
| 352 func (s *prodServicesInst) nextArchiveIndex() uint64 { | 354 func (s *prodServicesInst) nextArchiveIndex() uint64 { |
| 353 // We use a 32-bit value for this because it avoids atomic memory bounar
y | 355 // We use a 32-bit value for this because it avoids atomic memory bounar
y |
| 354 // issues. Furthermore, we constrain it to be positive, using a negative | 356 // issues. Furthermore, we constrain it to be positive, using a negative |
| 355 // value as a sentinel that the archival index has wrapped. | 357 // value as a sentinel that the archival index has wrapped. |
| 356 // | 358 // |
| (...skipping 136 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 493 | 495 |
| 494 // Sign index URL. | 496 // Sign index URL. |
| 495 if req.Index { | 497 if req.Index { |
| 496 if resp.Index, err = doSign(si.index); err != nil { | 498 if resp.Index, err = doSign(si.index); err != nil { |
| 497 return nil, errors.Annotate(err).InternalReason("failed
to sign index URL").Err() | 499 return nil, errors.Annotate(err).InternalReason("failed
to sign index URL").Err() |
| 498 } | 500 } |
| 499 } | 501 } |
| 500 | 502 |
| 501 return &resp, nil | 503 return &resp, nil |
| 502 } | 504 } |
| OLD | NEW |