| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "time" | 9 "time" |
| 10 | 10 |
| 11 "github.com/luci/luci-go/common/auth" | |
| 12 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/errors" | 12 "github.com/luci/luci-go/common/errors" |
| 14 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 13 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 15 log "github.com/luci/luci-go/common/logging" | 14 log "github.com/luci/luci-go/common/logging" |
| 16 "github.com/luci/luci-go/common/sync/parallel" | 15 "github.com/luci/luci-go/common/sync/parallel" |
| 17 "github.com/luci/luci-go/common/tsmon/distribution" | 16 "github.com/luci/luci-go/common/tsmon/distribution" |
| 18 "github.com/luci/luci-go/common/tsmon/field" | 17 "github.com/luci/luci-go/common/tsmon/field" |
| 19 "github.com/luci/luci-go/common/tsmon/metric" | 18 "github.com/luci/luci-go/common/tsmon/metric" |
| 20 "github.com/luci/luci-go/common/tsmon/types" | 19 "github.com/luci/luci-go/common/tsmon/types" |
| 21 "github.com/luci/luci-go/logdog/server/collector" | 20 "github.com/luci/luci-go/logdog/server/collector" |
| 22 "github.com/luci/luci-go/logdog/server/collector/coordinator" | 21 "github.com/luci/luci-go/logdog/server/collector/coordinator" |
| 23 "github.com/luci/luci-go/logdog/server/service" | 22 "github.com/luci/luci-go/logdog/server/service" |
| 24 "golang.org/x/net/context" | 23 "golang.org/x/net/context" |
| 25 | 24 |
| 26 "cloud.google.com/go/pubsub" | 25 "cloud.google.com/go/pubsub" |
| 27 "google.golang.org/api/iterator" | 26 "google.golang.org/api/iterator" |
| 28 "google.golang.org/api/option" | |
| 29 ) | 27 ) |
| 30 | 28 |
| 31 var ( | 29 var ( |
| 32 errInvalidConfig = errors.New("invalid configuration") | 30 errInvalidConfig = errors.New("invalid configuration") |
| 33 ) | 31 ) |
| 34 | 32 |
| 35 const ( | 33 const ( |
| 36 pubsubPullErrorDelay = 10 * time.Second | 34 pubsubPullErrorDelay = 10 * time.Second |
| 37 ) | 35 ) |
| 38 | 36 |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 74 return errors.New("missing Pub/Sub configuration") | 72 return errors.New("missing Pub/Sub configuration") |
| 75 } | 73 } |
| 76 | 74 |
| 77 // Our Subscription must be a valid one. | 75 // Our Subscription must be a valid one. |
| 78 sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription) | 76 sub := gcps.NewSubscription(pscfg.Project, pscfg.Subscription) |
| 79 if err := sub.Validate(); err != nil { | 77 if err := sub.Validate(); err != nil { |
| 80 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) | 78 return fmt.Errorf("invalid Pub/Sub subscription %q: %v", sub, er
r) |
| 81 } | 79 } |
| 82 | 80 |
| 83 // New PubSub instance with the authenticated client. | 81 // New PubSub instance with the authenticated client. |
| 84 » tokenSource, err := a.TokenSource(c, func(o *auth.Options) { | 82 » psClient, err := a.Service.PubSubSubscriberClient(c, pscfg.Project) |
| 85 » » o.Scopes = gcps.SubscriberScopes | |
| 86 » }) | |
| 87 » if err != nil { | |
| 88 » » log.WithError(err).Errorf(c, "Failed to get Pub/Sub token source
.") | |
| 89 » » return err | |
| 90 » } | |
| 91 » psClient, err := pubsub.NewClient(c, pscfg.Project, option.WithTokenSour
ce(tokenSource)) | |
| 92 if err != nil { | 83 if err != nil { |
| 93 log.Fields{ | 84 log.Fields{ |
| 94 log.ErrorKey: err, | 85 log.ErrorKey: err, |
| 95 "subscription": sub, | 86 "subscription": sub, |
| 96 }.Errorf(c, "Failed to create Pub/Sub client.") | 87 }.Errorf(c, "Failed to create Pub/Sub client.") |
| 97 return err | 88 return err |
| 98 } | 89 } |
| 99 | 90 |
| 100 psSub := psClient.Subscription(pscfg.Subscription) | 91 psSub := psClient.Subscription(pscfg.Subscription) |
| 101 exists, err := psSub.Exists(c) | 92 exists, err := psSub.Exists(c) |
| (...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 221 | 212 |
| 222 // Entry point. | 213 // Entry point. |
| 223 func main() { | 214 func main() { |
| 224 a := application{ | 215 a := application{ |
| 225 Service: service.Service{ | 216 Service: service.Service{ |
| 226 Name: "collector", | 217 Name: "collector", |
| 227 }, | 218 }, |
| 228 } | 219 } |
| 229 a.Run(context.Background(), a.runCollector) | 220 a.Run(context.Background(), a.runCollector) |
| 230 } | 221 } |
| OLD | NEW |