| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 "github.com/luci/luci-go/common/auth" | |
| 11 "github.com/luci/luci-go/common/clock" | 10 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/errors" | 11 "github.com/luci/luci-go/common/errors" |
| 13 "github.com/luci/luci-go/common/gcloud/gs" | 12 "github.com/luci/luci-go/common/gcloud/gs" |
| 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 13 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/sync/parallel" | 15 "github.com/luci/luci-go/common/sync/parallel" |
| 17 "github.com/luci/luci-go/common/tsmon/distribution" | 16 "github.com/luci/luci-go/common/tsmon/distribution" |
| 18 "github.com/luci/luci-go/common/tsmon/field" | 17 "github.com/luci/luci-go/common/tsmon/field" |
| 19 "github.com/luci/luci-go/common/tsmon/metric" | 18 "github.com/luci/luci-go/common/tsmon/metric" |
| 20 "github.com/luci/luci-go/common/tsmon/types" | 19 "github.com/luci/luci-go/common/tsmon/types" |
| 21 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 20 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 22 "github.com/luci/luci-go/logdog/server/archivist" | 21 "github.com/luci/luci-go/logdog/server/archivist" |
| 23 "github.com/luci/luci-go/logdog/server/service" | 22 "github.com/luci/luci-go/logdog/server/service" |
| 24 "github.com/luci/luci-go/luci_config/common/cfgtypes" | 23 "github.com/luci/luci-go/luci_config/common/cfgtypes" |
| 25 | 24 |
| 26 "cloud.google.com/go/pubsub" | 25 "cloud.google.com/go/pubsub" |
| 27 "golang.org/x/net/context" | 26 "golang.org/x/net/context" |
| 28 "google.golang.org/api/iterator" | 27 "google.golang.org/api/iterator" |
| 29 "google.golang.org/api/option" | |
| 30 ) | 28 ) |
| 31 | 29 |
| 32 var ( | 30 var ( |
| 33 errInvalidConfig = errors.New("invalid configuration") | 31 errInvalidConfig = errors.New("invalid configuration") |
| 34 | 32 |
| 35 // tsTaskProcessingTime measures the amount of time spent processing a s
ingle | 33 // tsTaskProcessingTime measures the amount of time spent processing a s
ingle |
| 36 // task. | 34 // task. |
| 37 // | 35 // |
| 38 // The "consumed" field is true if the underlying task was consumed and | 36 // The "consumed" field is true if the underlying task was consumed and |
| 39 // false if it was not. | 37 // false if it was not. |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 78 taskSub := gcps.Subscription(acfg.Subscription) | 76 taskSub := gcps.Subscription(acfg.Subscription) |
| 79 if err := taskSub.Validate(); err != nil { | 77 if err := taskSub.Validate(); err != nil { |
| 80 log.Fields{ | 78 log.Fields{ |
| 81 log.ErrorKey: err, | 79 log.ErrorKey: err, |
| 82 "value": taskSub, | 80 "value": taskSub, |
| 83 }.Errorf(c, "Task subscription did not validate.") | 81 }.Errorf(c, "Task subscription did not validate.") |
| 84 return errors.New("invalid task subscription name") | 82 return errors.New("invalid task subscription name") |
| 85 } | 83 } |
| 86 psProject, psSubscriptionName := taskSub.Split() | 84 psProject, psSubscriptionName := taskSub.Split() |
| 87 | 85 |
| 88 » tokenSource, err := a.TokenSource(c, func(o *auth.Options) { | 86 » // New PubSub instance with the authenticated client. |
| 89 » » o.Scopes = gcps.SubscriberScopes | 87 » psClient, err := a.Service.PubSubSubscriberClient(c, psProject) |
| 90 » }) | |
| 91 » if err != nil { | |
| 92 » » log.WithError(err).Errorf(c, "Failed to get Pub/Sub token source
.") | |
| 93 » » return err | |
| 94 » } | |
| 95 | |
| 96 » // Pub/Sub: TokenSource => Client | |
| 97 » psClient, err := pubsub.NewClient(c, psProject, option.WithTokenSource(t
okenSource)) | |
| 98 if err != nil { | 88 if err != nil { |
| 99 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 89 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 100 return err | 90 return err |
| 101 } | 91 } |
| 102 sub := psClient.Subscription(psSubscriptionName) | 92 sub := psClient.Subscription(psSubscriptionName) |
| 103 | 93 |
| 104 // Initialize our Storage. | 94 // Initialize our Storage. |
| 105 st, err := a.IntermediateStorage(c) | 95 st, err := a.IntermediateStorage(c) |
| 106 if err != nil { | 96 if err != nil { |
| 107 log.WithError(err).Errorf(c, "Failed to get storage instance.") | 97 log.WithError(err).Errorf(c, "Failed to get storage instance.") |
| (...skipping 160 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 268 | 258 |
| 269 // Entry point. | 259 // Entry point. |
| 270 func main() { | 260 func main() { |
| 271 a := application{ | 261 a := application{ |
| 272 Service: service.Service{ | 262 Service: service.Service{ |
| 273 Name: "archivist", | 263 Name: "archivist", |
| 274 }, | 264 }, |
| 275 } | 265 } |
| 276 a.Run(context.Background(), a.runArchivist) | 266 a.Run(context.Background(), a.runArchivist) |
| 277 } | 267 } |
| OLD | NEW |