OLD | NEW |
1 // Copyright 2016 The LUCI Authors. All rights reserved. | 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package main | 5 package main |
6 | 6 |
7 import ( | 7 import ( |
8 "time" | 8 "time" |
9 | 9 |
10 "github.com/luci/luci-go/common/auth" | 10 "github.com/luci/luci-go/common/auth" |
11 "github.com/luci/luci-go/common/clock" | 11 "github.com/luci/luci-go/common/clock" |
12 "github.com/luci/luci-go/common/config" | 12 "github.com/luci/luci-go/common/config" |
13 "github.com/luci/luci-go/common/errors" | 13 "github.com/luci/luci-go/common/errors" |
14 "github.com/luci/luci-go/common/gcloud/gs" | 14 "github.com/luci/luci-go/common/gcloud/gs" |
15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" | 15 gcps "github.com/luci/luci-go/common/gcloud/pubsub" |
16 log "github.com/luci/luci-go/common/logging" | 16 log "github.com/luci/luci-go/common/logging" |
17 "github.com/luci/luci-go/common/sync/parallel" | 17 "github.com/luci/luci-go/common/sync/parallel" |
18 "github.com/luci/luci-go/common/tsmon/distribution" | 18 "github.com/luci/luci-go/common/tsmon/distribution" |
19 "github.com/luci/luci-go/common/tsmon/field" | 19 "github.com/luci/luci-go/common/tsmon/field" |
20 "github.com/luci/luci-go/common/tsmon/metric" | 20 "github.com/luci/luci-go/common/tsmon/metric" |
21 "github.com/luci/luci-go/common/tsmon/types" | 21 "github.com/luci/luci-go/common/tsmon/types" |
22 "github.com/luci/luci-go/logdog/api/config/svcconfig" | 22 "github.com/luci/luci-go/logdog/api/config/svcconfig" |
23 "github.com/luci/luci-go/logdog/server/archivist" | 23 "github.com/luci/luci-go/logdog/server/archivist" |
24 "github.com/luci/luci-go/logdog/server/service" | 24 "github.com/luci/luci-go/logdog/server/service" |
| 25 |
| 26 "cloud.google.com/go/pubsub" |
25 "golang.org/x/net/context" | 27 "golang.org/x/net/context" |
26 » "google.golang.org/cloud" | 28 » "google.golang.org/api/option" |
27 » "google.golang.org/cloud/pubsub" | |
28 ) | 29 ) |
29 | 30 |
30 var ( | 31 var ( |
31 errInvalidConfig = errors.New("invalid configuration") | 32 errInvalidConfig = errors.New("invalid configuration") |
32 | 33 |
33 // tsTaskProcessingTime measures the amount of time spent processing a s
ingle | 34 // tsTaskProcessingTime measures the amount of time spent processing a s
ingle |
34 // task. | 35 // task. |
35 // | 36 // |
36 // The "consumed" field is true if the underlying task was consumed and | 37 // The "consumed" field is true if the underlying task was consumed and |
37 // false if it was not. | 38 // false if it was not. |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
84 psProject, psSubscriptionName := taskSub.Split() | 85 psProject, psSubscriptionName := taskSub.Split() |
85 | 86 |
86 psAuth, err := a.Authenticator(c, func(o *auth.Options) { | 87 psAuth, err := a.Authenticator(c, func(o *auth.Options) { |
87 o.Scopes = gcps.SubscriberScopes | 88 o.Scopes = gcps.SubscriberScopes |
88 }) | 89 }) |
89 if err != nil { | 90 if err != nil { |
90 log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato
r.") | 91 log.WithError(err).Errorf(c, "Failed to get Pub/Sub authenticato
r.") |
91 return err | 92 return err |
92 } | 93 } |
93 | 94 |
94 // Pub/Sub: HTTP Client => Context | |
95 psHTTPClient, err := psAuth.Client() | |
96 if err != nil { | |
97 log.WithError(err).Errorf(c, "Failed to create authenticated Pub
/Sub transport.") | |
98 return err | |
99 } | |
100 psContext := cloud.WithContext(c, psProject, psHTTPClient) | |
101 | |
102 // Pub/Sub: TokenSource => Client | 95 // Pub/Sub: TokenSource => Client |
103 » psClient, err := pubsub.NewClient(c, psProject, cloud.WithTokenSource(ps
Auth.TokenSource())) | 96 » psClient, err := pubsub.NewClient(c, psProject, option.WithTokenSource(p
sAuth.TokenSource())) |
104 if err != nil { | 97 if err != nil { |
105 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") | 98 log.WithError(err).Errorf(c, "Failed to create Pub/Sub client.") |
106 return err | 99 return err |
107 } | 100 } |
108 sub := psClient.Subscription(psSubscriptionName) | 101 sub := psClient.Subscription(psSubscriptionName) |
109 | 102 |
110 // Initialize our Storage. | 103 // Initialize our Storage. |
111 st, err := a.IntermediateStorage(c) | 104 st, err := a.IntermediateStorage(c) |
112 if err != nil { | 105 if err != nil { |
113 log.WithError(err).Errorf(c, "Failed to get storage instance.") | 106 log.WithError(err).Errorf(c, "Failed to get storage instance.") |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
183 } else { | 176 } else { |
184 log.Fields{ | 177 log.Fields{ |
185 "duration": dura
tion, | 178 "duration": dura
tion, |
186 }.Infof(c, "Task process
ing incomplete. Not deleting.") | 179 }.Infof(c, "Task process
ing incomplete. Not deleting.") |
187 } | 180 } |
188 | 181 |
189 // Add to our processing time me
tric. | 182 // Add to our processing time me
tric. |
190 tsTaskProcessingTime.Add(c, dura
tion.Seconds()*1000, deleteTask) | 183 tsTaskProcessingTime.Add(c, dura
tion.Seconds()*1000, deleteTask) |
191 }() | 184 }() |
192 | 185 |
193 » » » » » task, err := makePubSubArchivistTask(psC
ontext, psSubscriptionName, msg) | 186 » » » » » task, err := makePubSubArchivistTask(psS
ubscriptionName, msg) |
194 if err != nil { | 187 if err != nil { |
195 log.WithError(err).Errorf(c, "Fa
iled to unmarshal archive task from message.") | 188 log.WithError(err).Errorf(c, "Fa
iled to unmarshal archive task from message.") |
196 deleteTask = true | 189 deleteTask = true |
197 return nil | 190 return nil |
198 } | 191 } |
199 | 192 |
200 ar.ArchiveTask(c, task) | 193 ar.ArchiveTask(c, task) |
201 deleteTask = task.consumed | 194 deleteTask = task.consumed |
202 | 195 |
203 return nil | 196 return nil |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
275 | 268 |
276 // Entry point. | 269 // Entry point. |
277 func main() { | 270 func main() { |
278 a := application{ | 271 a := application{ |
279 Service: service.Service{ | 272 Service: service.Service{ |
280 Name: "archivist", | 273 Name: "archivist", |
281 }, | 274 }, |
282 } | 275 } |
283 a.Run(context.Background(), a.runArchivist) | 276 a.Run(context.Background(), a.runArchivist) |
284 } | 277 } |
OLD | NEW |