| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package main | 5 package main |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "time" | 8 "time" |
| 9 | 9 |
| 10 "github.com/luci/luci-go/common/auth" | 10 "github.com/luci/luci-go/common/auth" |
| 11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
| 12 "github.com/luci/luci-go/common/config" | 12 "github.com/luci/luci-go/common/config" |
| 13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
| 14 "github.com/luci/luci-go/common/gcloud/gs" | 14 "github.com/luci/luci-go/common/gcloud/gs" |
| 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
| 16 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
| 17 "github.com/luci/luci-go/common/sync/parallel" | 17 "github.com/luci/luci-go/common/sync/parallel" |
| 18 "github.com/luci/luci-go/common/tsmon/distribution" | 18 "github.com/luci/luci-go/common/tsmon/distribution" |
| 19 "github.com/luci/luci-go/common/tsmon/field" | 19 "github.com/luci/luci-go/common/tsmon/field" |
| 20 "github.com/luci/luci-go/common/tsmon/metric" | 20 "github.com/luci/luci-go/common/tsmon/metric" |
| 21 "github.com/luci/luci-go/common/tsmon/types" | 21 "github.com/luci/luci-go/common/tsmon/types" |
| 22 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 22 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
| 23 "github.com/luci/luci-go/logdog/server/archivist" | 23 "github.com/luci/luci-go/logdog/server/archivist" |
| 24 "github.com/luci/luci-go/logdog/server/service" | 24 "github.com/luci/luci-go/logdog/server/service" |
| 25 |
| 26 "cloud.google.com/go/pubsub" |
| 25 "golang.org/x/net/context" | 27 "golang.org/x/net/context" |
| 26 » "google.golang.org/cloud" | 28 » "google.golang.org/api/option" |
| 27 » "google.golang.org/cloud/pubsub" | |
| 28 ) | 29 ) |
| 29 | 30 |
| 30 var ( | 31 var ( |
| 31 errInvalidConfig = errors.New("invalid configuration") | 32 errInvalidConfig = errors.New("invalid configuration") |
| 32 | 33 |
| 33 // tsTaskProcessingTime measures the amount of time spent processing a s
ingle | 34 // tsTaskProcessingTime measures the amount of time spent processing a s
ingle |
| 34 // task. | 35 // task. |
| 35 // | 36 // |
| 36 // The "consumed" field is true if the underlying task was consumed and | 37 // The "consumed" field is true if the underlying task was consumed and |
| 37 // false if it was not. | 38 // false if it was not. |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 84 psProject, psSubscriptionName := taskSub.Split() | 85 psProject, psSubscriptionName := taskSub.Split() |
| 85 | 86 |
| 86 psAuth, err := a.Authenticator(c, func(o *auth.Options) { | 87 psAuth, err := a.Authenticator(c, func(o *auth.Options) { |
| 87 o.Scopes = gcps.SubscriberScopes | 88 o.Scopes = gcps.SubscriberScopes |
| 88 }) | 89 }) |
| 89 if err != nil { | 90 if err != nil { |
| 90 log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato
r.") | 91 log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato
r.") |
| 91 return err | 92 return err |
| 92 } | 93 } |
| 93 | 94 |
| 94 // Pub/Sub: HTTP Client => Context | |
| 95 psHTTPClient, err := psAuth.Client() | |
| 96 if err != nil { | |
| 97 log.WithError(err).Errorf(c, "Failed to create authenticated Pub
/Sub transport.") | |
| 98 return err | |
| 99 } | |
| 100 psContext := cloud.WithContext(c, psProject, psHTTPClient) | |
| 101 | |
| 102 // Pub/Sub: TokenSource => Client | 95 // Pub/Sub: TokenSource => Client |
| 103 » psClient, err := pubsub.NewClient(c, psProject, cloud.WithTokenSource(ps
Auth.TokenSource())) | 96 » psClient, err := pubsub.NewClient(c, psProject, option.WithTokenSource(p
sAuth.TokenSource())) |
| 104 if err != nil { | 97 if err != nil { |
| 105 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 98 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
| 106 return err | 99 return err |
| 107 } | 100 } |
| 108 sub := psClient.Subscription(psSubscriptionName) | 101 sub := psClient.Subscription(psSubscriptionName) |
| 109 | 102 |
| 110 // Initialize our Storage. | 103 // Initialize our Storage. |
| 111 st, err := a.IntermediateStorage(c) | 104 st, err := a.IntermediateStorage(c) |
| 112 if err != nil { | 105 if err != nil { |
| 113 log.WithError(err).Errorf(c, "Failed to get storage instance.") | 106 log.WithError(err).Errorf(c, "Failed to get storage instance.") |
| (...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 183 } else { | 176 } else { |
| 184 log.Fields{ | 177 log.Fields{ |
| 185 "duration": dura
tion, | 178 "duration": dura
tion, |
| 186 }.Infof(c, "Task process
ing incomplete. Not deleting.") | 179 }.Infof(c, "Task process
ing incomplete. Not deleting.") |
| 187 } | 180 } |
| 188 | 181 |
| 189 // Add to our processing time me
tric. | 182 // Add to our processing time me
tric. |
| 190 tsTaskProcessingTime.Add(c, dura
tion.Seconds()*1000, deleteTask) | 183 tsTaskProcessingTime.Add(c, dura
tion.Seconds()*1000, deleteTask) |
| 191 }() | 184 }() |
| 192 | 185 |
| 193 » » » » » task, err := makePubSubArchivistTask(psC
ontext, psSubscriptionName, msg) | 186 » » » » » task, err := makePubSubArchivistTask(psS
ubscriptionName, msg) |
| 194 if err != nil { | 187 if err != nil { |
| 195 log.WithError(err).Errorf(c, "Fa
iled to unmarshal archive task from message.") | 188 log.WithError(err).Errorf(c, "Fa
iled to unmarshal archive task from message.") |
| 196 deleteTask = true | 189 deleteTask = true |
| 197 return nil | 190 return nil |
| 198 } | 191 } |
| 199 | 192 |
| 200 ar.ArchiveTask(c, task) | 193 ar.ArchiveTask(c, task) |
| 201 deleteTask = task.consumed | 194 deleteTask = task.consumed |
| 202 | 195 |
| 203 return nil | 196 return nil |
| (...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 275 | 268 |
| 276 // Entry point. | 269 // Entry point. |
| 277 func main() { | 270 func main() { |
| 278 a := application{ | 271 a := application{ |
| 279 Service: service.Service{ | 272 Service: service.Service{ |
| 280 Name: "archivist", | 273 Name: "archivist", |
| 281 }, | 274 }, |
| 282 } | 275 } |
| 283 a.Run(context.Background(), a.runArchivist) | 276 a.Run(context.Background(), a.runArchivist) |
| 284 } | 277 } |
| OLD | NEW |